この記事は MLOps Advent Calendar 2023の20日目の投稿記事です。
PrefectワークフローをKubernetes Jobで実行する方法を紹介します。
本文中コード: code-for-blogpost/prefect_gke at main · nsakki55/code-for-blogpost · GitHub
Prefect × Kubernetes
PrefectではワークフローをKubernetes Jobとして実行することができます。
kubernetesを使用することで計算リソースを柔軟に拡張したり、ワークフローをコンテナ化して管理するメリットがあります。
以下の手順で、PrefectでワークフローをKubernetes Jobとして実行できます。
- Kubernetes work poolの作成
- workerをKubernetes上にデプロイ
- deploymentに起動imageを設定
今回はGKE上でPrefectのワークフローを実行する方法を紹介します。
Kubernetes Bobとして実行するにはいくつかの方法がありますが、今回は3つのパターンを取り上げます。
- 自動生成 image
- 自作 image
- Prefect 公式 image + GCS
GCPリソースの作成
Kubernetes Engine
GKE上にクラスターを作成します。 prefect-cluster
という名前のクラスターを作成します。
$ gcloud container clusters create prefect-cluster --num-nodes=1 --machine-type=n1-standard-1
gcloudコマンドのデフォルトクラスターを設定します。
$ gcloud config set container/cluster prefect-cluster
Artifact Registory
PrefectのFlowのimageを保存するために、Artifact Registoryを作成します。
$ gcloud artifacts repositories create prefect-repository --repository-format=docker --location=asia
Artifact Registoryの認証を行います。
$ gcloud auth configure-docker asia-docker.pkg.dev
Cloud Storage
Flowのコードを保存するために、GCSに prefect-gcs-sample
という名前のバケットを作成します。
$ gcloud storage buckets create gs://prefect-gcs-sample --location ASIA-NORTHEAST1
Work Poolの作成
PrefectではFlowの実行インフラ環境をwork poolで管理します。
Work Pools & Workers - Prefect Docs
Kubernetes JobとしてFlowを実行する場合、Kubernetesタイプのwork poolをPrefect Cloud上に作成します。
Prefect CloudのWork Poolsページで、Kubernetes pool typeを選択します。
gke
という名前のwork poolを作成します。
Flow実行詳細を設定できます。
Image Pull Policyを Always
に設定します。
今回は変更しませんがImage項目を変更することでデフォルトの起動imageを変更できます。
何も設定しない場合はPrefectの公式imageが起動されます。
Workerの作成
Worker概要
workerはwork poolに登録されたFlowをポーリングし、指定されたインフラ上でFlowを実行制御します。
workerについての詳しい説明をこちらの記事で取り上げています。
Prefect WorkerをKubernetesにデプロイする - 肉球でキーボード
workerはFlowを実行する環境にデプロイする必要があるため、Kubernetes Clusterでホスティングします。
Work Pools & Workers - Prefect Docs
You must start a worker within an environment that can access or create the infrastructure needed to execute flow runs. The worker will deploy flow runs to the infrastructure corresponding to the worker type. For example, if you start a worker with type
kubernetes
, the worker will deploy flow runs to a Kubernetes cluster.
workerとpoolがPrefectを運用する全体の中での立ち位置は、以下の図がわかりやすいです。(引用: Introducing Prefect Workers and Projects)
※ workerはPrefect 2.10でリリースされた機能です。Flowの実行制御を行なっていたAgentとInfrastructure・Storage Blockを用いた実行方法は非推奨となっています。
prefect/RELEASE-NOTES.md at main · PrefectHQ/prefect · GitHub
Prefect Cloud API Keyの作成
WorkerがPrefect Cloudの認証に使用するために、Prefect Cloudの API Keys ページでAPI Keyを作成します。
API Keyを作成すると pnu_*******
という文字列が一度だけ表示されるので、コピーしておきます。
HelmでWorkerをデプロイする
Helmを利用することでKubernetesのアプリケーションデプロイを簡単に行えます。
kubernetes workerをデプロイするためのHelm Chartが公式から用意されています。
prefect-helm/charts/prefect-worker at main · PrefectHQ/prefect-helm · GitHub
Helm clientにPrefect Helm repositoryを追加します。
$ helm repo add prefect https://prefecthq.github.io/prefect-helm $ helm repo update
Kubernetes Clusterに prefect
というnamespaceを作成します。
$ kubectl create namespace prefect
先ほど作成したPrefectのAPI Keyを Kubernetes secretで管理します。
$ kubectl create secret generic prefect-api-key --namespace=prefect --from-literal=key=pnu_******
作成したwork poolとPrefect Cloudの設定をPrefect Workerに反映させるために、 values.yaml
を作成します。
worker: cloudApiConfig: accountId: <target account ID> workspaceId: <target workspace ID> config: workPool: <target work pool name>
gke
を設定します。
自分のPrefect Cloudアカウントの account ID, workspace IDは、Prefect CloudのURLから確認できます。以下のフォーマットとなっているため、対応する値を設定します。
https://app.prefect.cloud/account/<target account ID>/workspaces/<target workspace ID>
values.yaml
を使用してHelm releaseを行います
$ helm install prefect-worker prefect/prefect-worker --namespace=prefect -f values.yaml
GKE上にworkerがデプロイされていることを確認します。
Flowを作成
以下のようなETL処理に見立てた etl.py
を用意しました。
from prefect import flow, get_run_logger, task @task def extract(): logger = get_run_logger() logger.info("Extract Task") @task def transform(): logger = get_run_logger() logger.info("Transform Task") @task def load(): logger = get_run_logger() logger.info("Load Task") @flow def etl(name: str = "nsakki55"): logger = get_run_logger() logger.info(f"ETL Start by {name}") extract() transform() load() logger.info(f"ETL Finish") if __name__ == "__main__": etl()
Kubernetes JobでFlowを実行
FlowをKuberntes Jobとして実行する3つのパターンを取り上げます。
FlowをPrefect Cloudから実行するにはDeploymentの登録が必要になります。
自動生成 image
Prefectが自動生成したDockerfileでbuildしたimageを元に、FlowのKubernetes Jobを実行します。
コード: code-for-blogpost/prefect_gke/auto_image at main · nsakki55/code-for-blogpost · GitHub
実行に必要なパッケージをまとめたrequirementes.txt
を用意します。
prefect>=2.13.8 prefect-docker>=0.4.0 prefect-kubernetes>=0.3.1
Flowの実行環境設定であるdeploymentを prefect.yaml
に記述します。
Deploying Flows to Work pools and Workers - Prefect Docs
buildセクションにFlowの実行に必要なdocker imageのbuild方法を記載します。
dockerfile: auto
を指定すると、自動的にDockerfileが作成されimageがbuildされます。
work poolには作成した gke
poolを設定します。
name: flows prefect-version: 2.13.8 build: - prefect_docker.deployments.steps.build_docker_image: id: build-image requires: prefect-docker>=0.4.0 image_name: "asia-docker.pkg.dev/{{ $GCP_PROJECT_ID }}/prefect-repository/auto-image" tag: latest dockerfile: auto platform: "linux/amd64" push: - prefect_docker.deployments.steps.push_docker_image: requires: prefect-docker>=0.4.0 image_name: "{{ build-image.image_name }}" tag: "{{ build-image.tag }}" pull: - prefect.deployments.steps.set_working_directory: directory: /opt/prefect/auto_image definitions: tags: &common_tags - "gke" work_pool: &**common_work_pool** name: "gke" job_variables: image: "{{ build-image.image }}" deployments: - name: "auto_image" tags: *common_tags schedule: null entrypoint: "flows/etl.py:etl" work_pool: *common_work_pool
環境変数に GCP_PROJECT_ID
を設定します。
$ export GCP_PROJECT_ID=your-gcp-project-id
以下のようなファイル構成となっています。
auto_image/ │ ├── flows/ │ ├── requirements.txt │ └── etl.py │ └── prefect.yaml
prefect.yaml
があるパス上で以下のコマンドでdeploymentをPrefect Cloudに登録します。
Deploying Flows to Work pools and Workers - Prefect Docs
$ prefect deploy -n etl/auto_image
上のコマンドを実行するとDockerfileが自動生成され、image build、Artifact Registoryへのimage pushが行われます。
以下のDockerfileが自動生成されます。
FROM prefecthq/prefect:2.13.7-python3.8 COPY requirements.txt /opt/prefect/requirements.txt RUN python -m pip install -r /opt/prefect/requirements.txt COPY . /opt/prefect/flows/ COPY . /opt/prefect/ WORKDIR /opt/prefect/
Prefect Cloudから auto_image という名前で登録したdeploymentを実行すると、Flowの実行が成功します。
GKEのワークロードから実行されたFlowのKubernetes Jobを確認できます。自動生成されたimageを起動していることがわかります。
自作 image
自分で作成したimageを元に、FlowのKubernetes Jobを実行します。
実行に必要なパッケージをまとめた環境を柔軟に用意できるため、こちらの方法を好む人が多いかと思います。
コード: code-for-blogpost/prefect_gke/custom_image at main · nsakki55/code-for-blogpost · GitHub
以下のような Dockerfile.custom
を作成します。
FROM prefecthq/prefect:2.13.7-python3.8 COPY requirements.txt /opt/prefect/requirements.txt RUN python -m pip install -r /opt/prefect/requirements.txt COPY . /opt/prefect/custom_image/flows/ COPY . /opt/prefect/custom_image/ WORKDIR /opt/prefect/custom_image
以下のようなファイル構成となっています。requirements.txt、flowファイルは上述の自動生成imageで実行する場合と同様のものを使用します。
custom_image/ │ ├── requirements.txt ├── etl.py ├── Dockerfile.custom └── prefect.yaml
Dockerfile.custom
を元にimage buildを行い、Artifact Registoryへpushを行います。
$ docker build -t asia-docker.pkg.dev/${GCP_PROJECT_ID}/prefect-repository/custom-image:latest -f Dockerfile.custom . $ docker push asia-docker.pkg.dev/${GCP_PROJECT_ID}/prefect-repository/custom-image:latest
自作imageを元にKubernetes Jobを実行するdeployment設定のprefect.yaml
を作成します。
自動生成したimageを使用しないため、buid, pushセクションはnullを設定して起きます。
definitionsセクションの起動imageに、先ほどArtifact Regsitoryに保存した自作imageを指定します。
name: flows prefect-version: 2.13.8 build: null push: null pull: - prefect.deployments.steps.set_working_directory: directory: /opt/prefect/custom_image definitions: tags: &common_tags - "gke" work_pool: &common_work_pool name: "gke" job_variables: image: "asia-docker.pkg.dev/{{ $GCP_PROJECT_ID }}/prefect-repository/custom-image" deployments: - name: "custom_image" tags: *common_tags schedule: null entrypoint: "etl.py:etl" work_pool: *common_work_pool
deploymentをPrefect Cloudに登録します。
prefect deploy
コマンドを実行すると、以下のように自動生成したimageをbuildをするか聞かれます。今回は自動生成したimageは使用しないため No を選択します。
$ prefect deploy -n etl/custom_image ? Would you like to build a custom Docker image for this deployment? [y/n] (n):
Prefect Cloudから custom_image deploymentを実行するとワークフローの実行が成功しています。
FlowのKubernetes Jobを確認すると、自作imageを起動していることがわかります。
Prefect 公式 image + GCS
Prefect公式imageを起動し、GCSからFlowファイルをダウンロードして実行します。
FlowファイルをDocker imageではなく、GCSで管理することになります。
コード: code-for-blogpost/prefect_gke/base_image_gcs at main · nsakki55/code-for-blogpost · GitHub
以下のようなフォルダ構成をとります。docker imageをbuildしないためDockerfileは必要ありません。
base_image_gcs/ │ ├── flows/ │ └── etl.py │ └── prefect.yaml
GCSにFlowファイルを管理してKubernetes Jobを実行するdeployment設定のprefect.yaml
を作成します。
push, pullセクションにGCSの設定を記述します。
bucketに作成した prefect-gcs-sample
bucketを指定します。
deploymentの起動imageを指定しない場合、work poolで設定したデフォルトの起動imageで実行されます。
name: flows prefect-version: 2.13.8 build: null push: - prefect_gcp.deployments.steps.push_to_gcs: id: push_code requires: prefect-gcp>=0.4.3 bucket: prefect-gcs-sample folder: flows credentials: null pull: - prefect_gcp.deployments.steps.pull_from_gcs: id: pull_code requires: prefect-gcp>=0.4.3 bucket: '{{ push_code.bucket }}' folder: flows credentials: null definitions: tags: &common_tags - "gke" work_pool: &common_work_pool name: "gke" deployments: - name: "base_image_gcs" tags: *common_tags schedule: null entrypoint: "flows/etl.py:etl" work_pool: *common_work_pool
deploymentをPrefect Cloudに登録します。
prefect deploy
を実行すると GCS へのFlowファイルのpushが自動的に行われます。
$ prefect deploy -n etl/base_image_gcs ? Would you like to build a custom Docker image for this deployment? [y/n] (n): n Running deployment push steps... > Running push_to_gcs step... ╭────────────────────────────────────────────────────────────────────────────╮ │ Deployment 'etl/base_image_gcs' successfully created with id │ │ '06f17afc-f4a4-4907-9749-813c8599a620'. │ ╰────────────────────────────────────────────────────────────────────────────╯
Prefect Cloudから base_image_gcs deploymentを実行するとワークフローの実行が成功しています。
FlowのKubernetes Jobを確認すると、Prefect公式image prefecthq/prefect:2.13.7-python3.11 を起動していることがわかります。
まとめ
PrefectでワークフローをKubernetes Jobとして実行する方法を紹介しました。
Prefect 2.10以降に出たworkerの登場で、実行インフラの設定が簡易化されたため、PrefectでKubernetesリソースを使用するハードルが下がったなと感じます。
Prefectの仕様変更が多いため、世に出ているドキュメントの情報が古くなってきています。
本記事は2023年12月に書かれたPrefect 2.14.6での情報です。今後仕様変更が起き本記事の内容が古くなる可能性があるのでご注意ください。