Prefect 2.10以降、インフラ管理の方法はAgentが非推奨となりWorkerが推奨となりました。
PrefectのWorkerについてのまとめ、KubernetesにWorkerをデプロイする場合のmanifestを紹介します。
本文中コード: code-for-blogpost/prefect_kubernetes_worker at main · nsakki55/code-for-blogpost · GitHub
Workerとは
PrefectのWorkerは Work Poolとセットで使用し、ワークフローの実行インフラ環境を管理する機能です。
Work Poolはワークフローの実行管理を行うPub/Subのような機能です。
Work Pools & Workers - Prefect Docs
利用するインフラ環境に対応するタイプのWork Poolを作成する必要があります。
Docker、ECS Task、Kubernetesなどのインフラ環境ごとに Work Pool を作成します。ワークフローの実行設定にインフラ環境に対応した Work Poolを選択することで、ワークフローの実行環境を柔軟に変更できます。
WorkerはWork Poolからワークフローを取得し、指定されたインフラ環境でワークフローを実行・制御する役割を持ちます。
ワークフローを実行するインフラ環境にアクセス、またはリソースの作成ができる環境にWorker をデプロイします。
Work Pools & Workers - Prefect Docs
以下はPrefectでワークフローを実行する際の、WorkerとWork Poolの役割を表した図です。
ワークフローの実行設定を Deployment で管理し、実行したいインフラタイプのWork Poolに割り当てます。WorkerはWork Poolに割り当てられたDeploymentを取得し、指定されたインフラ環境でワークフローの実行を行います。
公式でサポートされているWorker Typeは7種類あります。
WorkerとAgent
WorkerはPrefect 2.10から導入されたました。
prefect/RELEASE-NOTES.md at main · PrefectHQ/prefect · GitHub
Prefect 2.10以前のワークフローのインフラ実行管理は、AgentとBlock機能が担っていました。
Worker導入以降、AgentとBlockの利用は非推奨となっています。
Deployments - Prefect Docs
WorkerとAgentの違いをまとめた公式ドキュメントがあります。
Upgrade from Agents to Workers - Prefect Docs
WorkerはAgentとInfrastructure Blockを融合した機能ですが、単なる代替機能ではありません。Workerを使用することでワークフローのデプロイ方法がAgentとInfrastructure Blockを使用する場合と変わります。
AgentからWorkerへの変更点
デプロイコマンドの変更
- Agent:
prefect deployment build <entrypoint>
- Worker: prefect deploy
Prefectが自動的にレポジトリ内のFlowを検知してデプロイを行うようになりました。
リモートストレージ設定方法の変更
- Agent: Storage Block
- Worker: pull action
Delploymentのpullセクションにリモートストレージの設定を記述するようになりました。Storage Blockの機能自体は残っていて、pullセクションで使用することが可能となっています。
Where to Store Your Flow Code - Prefect Docs
実行インフラ設定方法の変更
- Agent: Infrastructure Block
- Worker: 実行インフラタイプのWork Pool
複数Deploymentの管理
Deploymentごとに1つの設定ファイルを作成する必要がありましたが、Prefect2.10以降は複数のprefect.yamlファイルで管理できるようになりました。
AgentとWorkerの類似点
- Storage Blockを prefect.yamlのpullセクション中に設定可能
- Work Poolの詳細設定項目とInfrastructure Blockの設定が似ている
- Deploymentのインフラ設定の上書きを
infra_override
から job_variable に変更 - ユーザーのインフラ環境でのAgentとWorkerの起動方法が実質同じ方法
- Agent:
prefect agent start --pool <work pool name>
- Worker:
prefect worker start --pool <work pool name>
- Agent:
Kubernetes Workerのデプロイ
PrefectでワークフローをKubernetesで実行する場合、WorkerをKubernetesにデプロイする方法があります。
公式ではHelmによるデプロイが推奨されているので、デプロイ方法を紹介します。
Helmによるデプロイ
HelmはKubernetesのアプリケーションデプロイを簡単に行えるようにするツールです。
PrefectではKubernetes Workerのデプロイを、Prefect公式が提供してるHelmによるデプロイを推奨しています。
prefect-helm/charts/prefect-worker at main · PrefectHQ/prefect-helm · GitHub
Helmを使用したKubernetesへのWorkerのデプロイ方法が公式ドキュメントに紹介されています。
Kubernetes - Prefect Docs
Helmをローカル環境にインストールしておきます。Macの場合はbrewでインストール可能です。
Helm | Helm のインストール
$ brew install helm
Helm clientにPrefect Helm repositoryを追加します。
$ helm repo add prefect https://prefecthq.github.io/prefect-helm $ helm repo update
prefect
namespaceにPrefectのAPIキーをKubernetesのSecretに作成します。
$ kubectl create secret generic prefect-api-key --namespace=prefect --from-literal=key=your-prefect-cloud-api-key
kubernetes typeのWork PoolとPrefect Cloudのaccount ID, workspace IDを記述した values.yaml
を作成します。
worker: cloudApiConfig: accountId: <target account ID> workspaceId: <target workspace ID> config: workPool: <target work pool name>
自分のPrefect Cloudアカウントの account ID, workspace IDは、Prefect CloudのURLから確認できます。以下のフォーマットとなっているため、対応する値を設定します。
https://app.prefect.cloud/account/<target account ID>/workspaces/<target workspace ID>
Helm releaseを行いWorkerをkubernetes clusterにデプロイします。
$ helm install prefect-worker prefect/prefect-worker --namespace=prefect -f values.yaml
Helmで生成されるmanifest
helm templateコマンドを使って生成されるmanifestを確認することができます。
$ helm template prefect-worker prefect/prefect-worker --namespace=prefect -f values.yaml >> manifest.yaml
以下のmanifestをもとにworkerが起動されます。
Helmを使用せずWorkerの管理をmanifestで行いたい場合は、以下の内容を元に作成するのがいいと思います。
--- # Source: prefect-worker/templates/serviceaccount.yaml apiVersion: v1 kind: ServiceAccount metadata: name: prefect-worker namespace: "prefect" labels: app.kubernetes.io/name: prefect-worker helm.sh/chart: prefect-worker-2023.11.28 app.kubernetes.io/instance: prefect-worker app.kubernetes.io/managed-by: Helm app.kubernetes.io/version: "2.14.7" app.kubernetes.io/component: worker prefect-version: 2.14.7 --- # Source: prefect-worker/templates/role.yaml apiVersion: rbac.authorization.k8s.io/v1 kind: Role metadata: name: prefect-worker namespace: "prefect" labels: app.kubernetes.io/name: prefect-worker helm.sh/chart: prefect-worker-2023.11.28 app.kubernetes.io/instance: prefect-worker app.kubernetes.io/managed-by: Helm app.kubernetes.io/version: "2.14.7" app.kubernetes.io/component: worker prefect-version: 2.14.7 rules: - apiGroups: [""] resources: ["pods", "pods/log", "pods/status"] verbs: ["get", "watch", "list"] - apiGroups: ["batch"] resources: ["jobs"] verbs: [ "get", "list", "watch", "create", "update", "patch", "delete" ] --- # Source: prefect-worker/templates/rolebinding.yaml apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding metadata: name: prefect-worker namespace: "prefect" labels: app.kubernetes.io/name: prefect-worker helm.sh/chart: prefect-worker-2023.11.28 app.kubernetes.io/instance: prefect-worker app.kubernetes.io/managed-by: Helm app.kubernetes.io/version: "2.14.7" app.kubernetes.io/component: worker prefect-version: 2.14.7 roleRef: apiGroup: rbac.authorization.k8s.io kind: Role name: prefect-worker subjects: - kind: ServiceAccount name: prefect-worker namespace: "prefect" --- # Source: prefect-worker/templates/deployment.yaml apiVersion: apps/v1 kind: Deployment metadata: name: prefect-worker namespace: "prefect" labels: app.kubernetes.io/name: prefect-worker helm.sh/chart: prefect-worker-2023.11.28 app.kubernetes.io/instance: prefect-worker app.kubernetes.io/managed-by: Helm app.kubernetes.io/version: "2.14.7" app.kubernetes.io/component: worker prefect-version: 2.14.7 spec: replicas: 1 selector: matchLabels: app.kubernetes.io/name: prefect-worker app.kubernetes.io/instance: prefect-worker app.kubernetes.io/component: worker template: metadata: labels: app.kubernetes.io/name: prefect-worker helm.sh/chart: prefect-worker-2023.11.28 app.kubernetes.io/instance: prefect-worker app.kubernetes.io/managed-by: Helm app.kubernetes.io/version: "2.14.7" app.kubernetes.io/component: worker prefect-version: 2.14.7 spec: serviceAccountName: prefect-worker securityContext: fsGroup: 1001 runAsNonRoot: true runAsUser: 1001 containers: - name: prefect-worker image: "prefecthq/prefect:2.14.7-python3.11-kubernetes" imagePullPolicy: IfNotPresent command: - /usr/bin/tini - -g - -- - /opt/prefect/entrypoint.sh args: - prefect - worker - start - --type - "kubernetes" - --pool - "<target work pool name>" - --install-policy - "prompt" workingDir: /home/prefect env: - name: HOME value: /home/prefect - name: PREFECT_WORKER_PREFETCH_SECONDS value: "10" - name: PREFECT_WORKER_QUERY_SECONDS value: "5" - name: PREFECT_API_ENABLE_HTTP2 value: "true" - name: PREFECT_API_URL value: "https://api.prefect.cloud/api/accounts/<target account ID>/workspaces/<target workspace ID>" - name: PREFECT_KUBERNETES_CLUSTER_UID value: "" - name: PREFECT_API_KEY valueFrom: secretKeyRef: name: prefect-api-key key: key - name: PREFECT_DEBUG_MODE value: "false" envFrom: resources: limits: cpu: 1000m memory: 1Gi requests: cpu: 100m memory: 256Mi securityContext: allowPrivilegeEscalation: false readOnlyRootFilesystem: true runAsNonRoot: true runAsUser: 1001 volumeMounts: - mountPath: /home/prefect name: scratch subPathExpr: home - mountPath: /tmp name: scratch subPathExpr: tmp volumes: - name: scratch emptyDir: {}