肉球でキーボード

MLエンジニアの技術ブログです

Prefect WorkerをKubernetesにデプロイする

Prefect 2.10以降、インフラ管理の方法はAgentが非推奨となりWorkerが推奨となりました。

PrefectのWorkerについてのまとめ、KubernetesにWorkerをデプロイする場合のmanifestを紹介します。
本文中コード: code-for-blogpost/prefect_kubernetes_worker at main · nsakki55/code-for-blogpost · GitHub

Workerとは

PrefectのWorkerは Work Poolとセットで使用し、ワークフローの実行インフラ環境を管理する機能です。

Work Poolはワークフローの実行管理を行うPub/Subのような機能です。
Work Pools & Workers - Prefect Docs
利用するインフラ環境に対応するタイプのWork Poolを作成する必要があります。
Docker、ECS Task、Kubernetesなどのインフラ環境ごとに Work Pool を作成します。ワークフローの実行設定にインフラ環境に対応した Work Poolを選択することで、ワークフローの実行環境を柔軟に変更できます。

WorkerはWork Poolからワークフローを取得し、指定されたインフラ環境でワークフローを実行・制御する役割を持ちます。
ワークフローを実行するインフラ環境にアクセス、またはリソースの作成ができる環境にWorker をデプロイします。
Work Pools & Workers - Prefect Docs

以下はPrefectでワークフローを実行する際の、WorkerとWork Poolの役割を表した図です。
ワークフローの実行設定を Deployment で管理し、実行したいインフラタイプのWork Poolに割り当てます。WorkerはWork Poolに割り当てられたDeploymentを取得し、指定されたインフラ環境でワークフローの実行を行います。

Workers and Work Pools(引用: Workers & Work Pools - Prefect Docs)

公式でサポートされているWorker Typeは7種類あります。

Worker Type(引用: Work Pools & Workers - Prefect Docs)

WorkerとAgent

WorkerはPrefect 2.10から導入されたました。
prefect/RELEASE-NOTES.md at main · PrefectHQ/prefect · GitHub

Prefect 2.10以前のワークフローのインフラ実行管理は、AgentとBlock機能が担っていました。
Worker導入以降、AgentとBlockの利用は非推奨となっています。
Deployments - Prefect Docs

WorkerとAgentの違いをまとめた公式ドキュメントがあります。
Upgrade from Agents to Workers - Prefect Docs

WorkerはAgentとInfrastructure Blockを融合した機能ですが、単なる代替機能ではありません。Workerを使用することでワークフローのデプロイ方法がAgentとInfrastructure Blockを使用する場合と変わります。

AgentからWorkerへの変更点

デプロイコマンドの変更

Prefectが自動的にレポジトリ内のFlowを検知してデプロイを行うようになりました。

リモートストレージ設定方法の変更

Delploymentのpullセクションにリモートストレージの設定を記述するようになりました。Storage Blockの機能自体は残っていて、pullセクションで使用することが可能となっています。
Where to Store Your Flow Code - Prefect Docs

実行インフラ設定方法の変更

  • Agent: Infrastructure Block
  • Worker: 実行インフラタイプのWork Pool

複数Deploymentの管理
Deploymentごとに1つの設定ファイルを作成する必要がありましたが、Prefect2.10以降は複数のprefect.yamlファイルで管理できるようになりました。

AgentとWorkerの類似点

  • Storage Blockを prefect.yamlのpullセクション中に設定可能
  • Work Poolの詳細設定項目とInfrastructure Blockの設定が似ている
  • Deploymentのインフラ設定の上書きを infra_override から job_variable に変更
  • ユーザーのインフラ環境でのAgentとWorkerの起動方法が実質同じ方法
    • Agent:prefect agent start --pool <work pool name>
    • Worker: prefect worker start --pool <work pool name>

Kubernetes Workerのデプロイ

PrefectでワークフローをKubernetesで実行する場合、WorkerをKubernetesにデプロイする方法があります。
公式ではHelmによるデプロイが推奨されているので、デプロイ方法を紹介します。

Helmによるデプロイ

HelmKubernetesのアプリケーションデプロイを簡単に行えるようにするツールです。
PrefectではKubernetes Workerのデプロイを、Prefect公式が提供してるHelmによるデプロイを推奨しています。
prefect-helm/charts/prefect-worker at main · PrefectHQ/prefect-helm · GitHub

Helmを使用したKubernetesへのWorkerのデプロイ方法が公式ドキュメントに紹介されています。
Kubernetes - Prefect Docs

Helmをローカル環境にインストールしておきます。Macの場合はbrewでインストール可能です。
Helm | Helm のインストール

$ brew install helm

Helm clientにPrefect Helm repositoryを追加します。

$ helm repo add prefect https://prefecthq.github.io/prefect-helm
$ helm repo update

prefectnamespaceにPrefectのAPIキーをKubernetesのSecretに作成します。

$ kubectl create secret generic prefect-api-key --namespace=prefect --from-literal=key=your-prefect-cloud-api-key

kubernetes typeのWork PoolとPrefect Cloudのaccount ID, workspace IDを記述した values.yaml を作成します。

worker:
  cloudApiConfig:
    accountId: <target account ID>
    workspaceId: <target workspace ID>
  config:
    workPool: <target work pool name>

自分のPrefect Cloudアカウントの account ID, workspace IDは、Prefect CloudのURLから確認できます。以下のフォーマットとなっているため、対応する値を設定します。

https://app.prefect.cloud/account/<target account ID>/workspaces/<target workspace ID>

Helm releaseを行いWorkerをkubernetes clusterにデプロイします。

$ helm install prefect-worker prefect/prefect-worker --namespace=prefect -f values.yaml

Helmで生成されるmanifest

helm templateコマンドを使って生成されるmanifestを確認することができます。

$ helm template prefect-worker prefect/prefect-worker --namespace=prefect -f values.yaml >> manifest.yaml

以下のmanifestをもとにworkerが起動されます。

Helmを使用せずWorkerの管理をmanifestで行いたい場合は、以下の内容を元に作成するのがいいと思います。

---
# Source: prefect-worker/templates/serviceaccount.yaml
apiVersion: v1
kind: ServiceAccount
metadata:
  name: prefect-worker
  namespace: "prefect"
  labels:
    app.kubernetes.io/name: prefect-worker
    helm.sh/chart: prefect-worker-2023.11.28
    app.kubernetes.io/instance: prefect-worker
    app.kubernetes.io/managed-by: Helm
    app.kubernetes.io/version: "2.14.7"
    app.kubernetes.io/component: worker
    prefect-version: 2.14.7
---
# Source: prefect-worker/templates/role.yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: prefect-worker
  namespace: "prefect"
  labels:
    app.kubernetes.io/name: prefect-worker
    helm.sh/chart: prefect-worker-2023.11.28
    app.kubernetes.io/instance: prefect-worker
    app.kubernetes.io/managed-by: Helm
    app.kubernetes.io/version: "2.14.7"
    app.kubernetes.io/component: worker
    prefect-version: 2.14.7
rules:
- apiGroups: [""]
  resources: ["pods", "pods/log", "pods/status"]
  verbs: ["get", "watch", "list"]
- apiGroups: ["batch"]
  resources: ["jobs"]
  verbs: [ "get", "list", "watch", "create", "update", "patch", "delete" ]
---
# Source: prefect-worker/templates/rolebinding.yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: prefect-worker
  namespace: "prefect"
  labels:
    app.kubernetes.io/name: prefect-worker
    helm.sh/chart: prefect-worker-2023.11.28
    app.kubernetes.io/instance: prefect-worker
    app.kubernetes.io/managed-by: Helm
    app.kubernetes.io/version: "2.14.7"
    app.kubernetes.io/component: worker
    prefect-version: 2.14.7
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: Role
  name: prefect-worker
subjects:
  - kind: ServiceAccount
    name: prefect-worker
    namespace: "prefect"
---
# Source: prefect-worker/templates/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: prefect-worker
  namespace: "prefect"
  labels:
    app.kubernetes.io/name: prefect-worker
    helm.sh/chart: prefect-worker-2023.11.28
    app.kubernetes.io/instance: prefect-worker
    app.kubernetes.io/managed-by: Helm
    app.kubernetes.io/version: "2.14.7"
    app.kubernetes.io/component: worker
    prefect-version: 2.14.7
spec:
  replicas: 1
  selector:
    matchLabels:
      app.kubernetes.io/name: prefect-worker
      app.kubernetes.io/instance: prefect-worker
      app.kubernetes.io/component: worker
  template:
    metadata:
      labels:
        app.kubernetes.io/name: prefect-worker
        helm.sh/chart: prefect-worker-2023.11.28
        app.kubernetes.io/instance: prefect-worker
        app.kubernetes.io/managed-by: Helm
        app.kubernetes.io/version: "2.14.7"
        app.kubernetes.io/component: worker
        prefect-version: 2.14.7
    spec:
      serviceAccountName: prefect-worker
      securityContext:
        fsGroup: 1001
        runAsNonRoot: true
        runAsUser: 1001
      containers:
        - name: prefect-worker
          image: "prefecthq/prefect:2.14.7-python3.11-kubernetes"
          imagePullPolicy: IfNotPresent
          command:
            - /usr/bin/tini
            - -g
            - --
            - /opt/prefect/entrypoint.sh
          args:
            - prefect
            - worker
            - start
            - --type
            - "kubernetes"
            - --pool
            - "<target work pool name>"
            - --install-policy
            - "prompt"
          workingDir: /home/prefect
          env:
            - name: HOME
              value: /home/prefect
            - name: PREFECT_WORKER_PREFETCH_SECONDS
              value: "10"
            - name: PREFECT_WORKER_QUERY_SECONDS
              value: "5"
            - name: PREFECT_API_ENABLE_HTTP2
              value: "true"
            - name: PREFECT_API_URL
              value: "https://api.prefect.cloud/api/accounts/<target account ID>/workspaces/<target workspace ID>"
            - name: PREFECT_KUBERNETES_CLUSTER_UID
              value: ""
            - name: PREFECT_API_KEY
              valueFrom:
                secretKeyRef:
                  name: prefect-api-key
                  key:  key
            - name: PREFECT_DEBUG_MODE
              value: "false"
          envFrom:
          resources:
            limits:
              cpu: 1000m
              memory: 1Gi
            requests:
              cpu: 100m
              memory: 256Mi
          securityContext:
            allowPrivilegeEscalation: false
            readOnlyRootFilesystem: true
            runAsNonRoot: true
            runAsUser: 1001
          volumeMounts:
            - mountPath: /home/prefect
              name: scratch
              subPathExpr: home
            - mountPath: /tmp
              name: scratch
              subPathExpr: tmp
      volumes:
        - name: scratch
          emptyDir: {}

参考