肉球でキーボード

MLエンジニアの技術ブログです

PrefectワークフローをKubernetes Jobで実行する

この記事は MLOps Advent Calendar 202320日目の投稿記事です。

PrefectワークフローをKubernetes Jobで実行する方法を紹介します。
本文中コード: code-for-blogpost/prefect_gke at main · nsakki55/code-for-blogpost · GitHub

Prefect × Kubernetes

PrefectではワークフローをKubernetes Jobとして実行することができます。

kubernetesを使用することで計算リソースを柔軟に拡張したり、ワークフローをコンテナ化して管理するメリットがあります。

以下の手順で、PrefectでワークフローをKubernetes Jobとして実行できます。

  • Kubernetes work poolの作成
  • workerをKubernetes上にデプロイ
  • deploymentに起動imageを設定

今回はGKE上でPrefectのワークフローを実行する方法を紹介します。

Kubernetes Bobとして実行するにはいくつかの方法がありますが、今回は3つのパターンを取り上げます。

  • 自動生成 image
  • 自作 image
  • Prefect 公式 image + GCS

GCPリソースの作成

Kubernetes Engine

GKE上にクラスターを作成します。 prefect-cluster という名前のクラスターを作成します。

$ gcloud container clusters create prefect-cluster --num-nodes=1 --machine-type=n1-standard-1

gcloudコマンドのデフォルトクラスターを設定します。

$ gcloud config set container/cluster prefect-cluster

Artifact Registory

PrefectのFlowのimageを保存するために、Artifact Registoryを作成します。

$ gcloud artifacts repositories create prefect-repository --repository-format=docker --location=asia

Artifact Registoryの認証を行います。

$ gcloud auth configure-docker asia-docker.pkg.dev

Cloud Storage

Flowのコードを保存するために、GCSに prefect-gcs-sample という名前のバケットを作成します。

$ gcloud storage buckets create gs://prefect-gcs-sample --location ASIA-NORTHEAST1

Work Poolの作成

PrefectではFlowの実行インフラ環境をwork poolで管理します。
Work Pools & Workers - Prefect Docs

Kubernetes JobとしてFlowを実行する場合、Kubernetesタイプのwork poolをPrefect Cloud上に作成します。

Prefect CloudのWork Poolsページで、Kubernetes pool typeを選択します。

work pool

gke という名前のwork poolを作成します。

gke work pool

Flow実行詳細を設定できます。

Image Pull Policyを Always に設定します。

今回は変更しませんがImage項目を変更することでデフォルトの起動imageを変更できます。

何も設定しない場合はPrefectの公式imageが起動されます。

kubernetes work pool setting

Workerの作成

Worker概要

workerはwork poolに登録されたFlowをポーリングし、指定されたインフラ上でFlowを実行制御します。

workerについての詳しい説明をこちらの記事で取り上げています。
Prefect WorkerをKubernetesにデプロイする - 肉球でキーボード

workerはFlowを実行する環境にデプロイする必要があるため、Kubernetes Clusterでホスティングします。
Work Pools & Workers - Prefect Docs

You must start a worker within an environment that can access or create the infrastructure needed to execute flow runs. The worker will deploy flow runs to the infrastructure corresponding to the worker type. For example, if you start a worker with type kubernetes, the worker will deploy flow runs to a Kubernetes cluster.

workerとpoolがPrefectを運用する全体の中での立ち位置は、以下の図がわかりやすいです。(引用: Introducing Prefect Workers and Projects)

Prefect worker projects

※ workerはPrefect 2.10でリリースされた機能です。Flowの実行制御を行なっていたAgentとInfrastructure・Storage Blockを用いた実行方法は非推奨となっています。
prefect/RELEASE-NOTES.md at main · PrefectHQ/prefect · GitHub

Prefect Cloud API Keyの作成

WorkerがPrefect Cloudの認証に使用するために、Prefect Cloudの API Keys ページでAPI Keyを作成します。

API Keyを作成すると pnu_******* という文字列が一度だけ表示されるので、コピーしておきます。

API Key

HelmでWorkerをデプロイする

Helmを利用することでKubernetesのアプリケーションデプロイを簡単に行えます。

kubernetes workerをデプロイするためのHelm Chartが公式から用意されています。

prefect-helm/charts/prefect-worker at main · PrefectHQ/prefect-helm · GitHub

Helm clientにPrefect Helm repositoryを追加します。

$ helm repo add prefect https://prefecthq.github.io/prefect-helm
$ helm repo update

Kubernetes Clusterに prefect というnamespaceを作成します。

$ kubectl create namespace prefect

先ほど作成したPrefectのAPI Keyを Kubernetes secretで管理します。

$ kubectl create secret generic prefect-api-key --namespace=prefect --from-literal=key=pnu_******

作成したwork poolとPrefect Cloudの設定をPrefect Workerに反映させるために、 values.yaml を作成します。

worker:
  cloudApiConfig:
    accountId: <target account ID>
    workspaceId: <target workspace ID>
  config:
    workPool: <target work pool name>

には先ほど作成したwork poolの gke を設定します。

自分のPrefect Cloudアカウントの account ID, workspace IDは、Prefect CloudのURLから確認できます。以下のフォーマットとなっているため、対応する値を設定します。

https://app.prefect.cloud/account/<target account ID>/workspaces/<target workspace ID>

values.yaml を使用してHelm releaseを行います

$ helm install prefect-worker prefect/prefect-worker --namespace=prefect -f values.yaml

GKE上にworkerがデプロイされていることを確認します。

Prefect Worker

Flowを作成

以下のようなETL処理に見立てた etl.py を用意しました。

from prefect import flow, get_run_logger, task

@task
def extract():
    logger = get_run_logger()
    logger.info("Extract Task")

@task
def transform():
    logger = get_run_logger()
    logger.info("Transform Task")

@task
def load():
    logger = get_run_logger()
    logger.info("Load Task")

@flow
def etl(name: str = "nsakki55"):
    logger = get_run_logger()
    logger.info(f"ETL Start by {name}")
    extract()
    transform()
    load()
    logger.info(f"ETL Finish")

if __name__ == "__main__":
    etl()

Kubernetes JobでFlowを実行

FlowをKuberntes Jobとして実行する3つのパターンを取り上げます。

FlowをPrefect Cloudから実行するにはDeploymentの登録が必要になります。

自動生成 image

Prefectが自動生成したDockerfileでbuildしたimageを元に、FlowのKubernetes Jobを実行します。

コード: code-for-blogpost/prefect_gke/auto_image at main · nsakki55/code-for-blogpost · GitHub

実行に必要なパッケージをまとめたrequirementes.txt を用意します。

prefect>=2.13.8
prefect-docker>=0.4.0
prefect-kubernetes>=0.3.1

Flowの実行環境設定であるdeploymentを prefect.yaml に記述します。

Deploying Flows to Work pools and Workers - Prefect Docs

buildセクションにFlowの実行に必要なdocker imageのbuild方法を記載します。

dockerfile: auto を指定すると、自動的にDockerfileが作成されimageがbuildされます。

work poolには作成した gke poolを設定します。

name: flows
prefect-version: 2.13.8

build:
- prefect_docker.deployments.steps.build_docker_image:
    id: build-image
    requires: prefect-docker>=0.4.0
    image_name: "asia-docker.pkg.dev/{{ $GCP_PROJECT_ID }}/prefect-repository/auto-image"
    tag: latest
    dockerfile: auto
    platform: "linux/amd64"

push:
- prefect_docker.deployments.steps.push_docker_image:
    requires: prefect-docker>=0.4.0
    image_name: "{{ build-image.image_name }}"
    tag: "{{ build-image.tag }}"

pull:
- prefect.deployments.steps.set_working_directory:
    directory: /opt/prefect/auto_image

definitions:
  tags: &common_tags
    - "gke"
  work_pool: &**common_work_pool**
    name: "gke"
    job_variables:
      image: "{{ build-image.image }}"

deployments:
- name: "auto_image"
  tags: *common_tags
  schedule: null
  entrypoint: "flows/etl.py:etl"
  work_pool: *common_work_pool

環境変数GCP_PROJECT_ID を設定します。

$ export GCP_PROJECT_ID=your-gcp-project-id

以下のようなファイル構成となっています。

auto_image/
 │
 ├── flows/
 │     ├── requirements.txt
 │     └── etl.py
 │
 └── prefect.yaml

prefect.yaml があるパス上で以下のコマンドでdeploymentをPrefect Cloudに登録します。

Deploying Flows to Work pools and Workers - Prefect Docs

$ prefect deploy -n etl/auto_image

上のコマンドを実行するとDockerfileが自動生成され、image build、Artifact Registoryへのimage pushが行われます。

以下のDockerfileが自動生成されます。

FROM prefecthq/prefect:2.13.7-python3.8
COPY requirements.txt /opt/prefect/requirements.txt
RUN python -m pip install -r /opt/prefect/requirements.txt
COPY . /opt/prefect/flows/
COPY . /opt/prefect/
WORKDIR /opt/prefect/

Prefect Cloudから auto_image という名前で登録したdeploymentを実行すると、Flowの実行が成功します。

auto image deployment

GKEのワークロードから実行されたFlowのKubernetes Jobを確認できます。自動生成されたimageを起動していることがわかります。

auto image kubernetes job

自作 image

自分で作成したimageを元に、FlowのKubernetes Jobを実行します。

実行に必要なパッケージをまとめた環境を柔軟に用意できるため、こちらの方法を好む人が多いかと思います。

コード: code-for-blogpost/prefect_gke/custom_image at main · nsakki55/code-for-blogpost · GitHub

以下のような Dockerfile.custom を作成します。

FROM prefecthq/prefect:2.13.7-python3.8
COPY requirements.txt /opt/prefect/requirements.txt
RUN python -m pip install -r /opt/prefect/requirements.txt
COPY . /opt/prefect/custom_image/flows/
COPY . /opt/prefect/custom_image/
WORKDIR /opt/prefect/custom_image

以下のようなファイル構成となっています。requirements.txt、flowファイルは上述の自動生成imageで実行する場合と同様のものを使用します。

custom_image/
 │
 ├── requirements.txt
 ├── etl.py
 ├── Dockerfile.custom
 └── prefect.yaml

Dockerfile.custom を元にimage buildを行い、Artifact Registoryへpushを行います。

$ docker build -t asia-docker.pkg.dev/${GCP_PROJECT_ID}/prefect-repository/custom-image:latest -f Dockerfile.custom .
$ docker push asia-docker.pkg.dev/${GCP_PROJECT_ID}/prefect-repository/custom-image:latest

自作imageを元にKubernetes Jobを実行するdeployment設定のprefect.yaml を作成します。

自動生成したimageを使用しないため、buid, pushセクションはnullを設定して起きます。

definitionsセクションの起動imageに、先ほどArtifact Regsitoryに保存した自作imageを指定します。

name: flows
prefect-version: 2.13.8

build: null

push: null

pull:
- prefect.deployments.steps.set_working_directory:
    directory: /opt/prefect/custom_image

definitions:
  tags: &common_tags
    - "gke"
  work_pool: &common_work_pool
    name: "gke"
    job_variables:
      image: "asia-docker.pkg.dev/{{ $GCP_PROJECT_ID }}/prefect-repository/custom-image"

deployments:
- name: "custom_image"
  tags: *common_tags
  schedule: null
  entrypoint: "etl.py:etl"
  work_pool: *common_work_pool

deploymentをPrefect Cloudに登録します。

prefect deploy コマンドを実行すると、以下のように自動生成したimageをbuildをするか聞かれます。今回は自動生成したimageは使用しないため No を選択します。

$ prefect deploy -n etl/custom_image
? Would you like to build a custom Docker image for this deployment? [y/n]
(n):

Prefect Cloudから custom_image deploymentを実行するとワークフローの実行が成功しています。

auto image deployment

FlowのKubernetes Jobを確認すると、自作imageを起動していることがわかります。

custom image kubernetes job

Prefect 公式 image + GCS

Prefect公式imageを起動し、GCSからFlowファイルをダウンロードして実行します。

FlowファイルをDocker imageではなく、GCSで管理することになります。

コード: code-for-blogpost/prefect_gke/base_image_gcs at main · nsakki55/code-for-blogpost · GitHub

以下のようなフォルダ構成をとります。docker imageをbuildしないためDockerfileは必要ありません。

base_image_gcs/
 │
 ├── flows/
 │     └── etl.py
 │
 └── prefect.yaml

GCSにFlowファイルを管理してKubernetes Jobを実行するdeployment設定のprefect.yaml を作成します。

push, pullセクションにGCSの設定を記述します。

bucketに作成した prefect-gcs-sample bucketを指定します。

deploymentの起動imageを指定しない場合、work poolで設定したデフォルトの起動imageで実行されます。

name: flows
prefect-version: 2.13.8

build: null

push:
- prefect_gcp.deployments.steps.push_to_gcs:
    id: push_code
    requires: prefect-gcp>=0.4.3
    bucket: prefect-gcs-sample
    folder: flows 
    credentials: null

pull:
- prefect_gcp.deployments.steps.pull_from_gcs:
    id: pull_code
    requires: prefect-gcp>=0.4.3
    bucket: '{{ push_code.bucket }}'
    folder: flows 
    credentials: null

definitions:
  tags: &common_tags
    - "gke"
  work_pool: &common_work_pool
    name: "gke"

deployments:
- name: "base_image_gcs"
  tags: *common_tags
  schedule: null
  entrypoint: "flows/etl.py:etl"
  work_pool: *common_work_pool

deploymentをPrefect Cloudに登録します。

prefect deploy を実行すると GCS へのFlowファイルのpushが自動的に行われます。

$ prefect deploy -n etl/base_image_gcs

? Would you like to build a custom Docker image for this deployment? [y/n]
(n): n
Running deployment push steps...
> Running push_to_gcs step...
╭────────────────────────────────────────────────────────────────────────────╮
│ Deployment 'etl/base_image_gcs' successfully created with id               │
│ '06f17afc-f4a4-4907-9749-813c8599a620'.                                    │
╰────────────────────────────────────────────────────────────────────────────╯

Prefect Cloudから base_image_gcs deploymentを実行するとワークフローの実行が成功しています。

base image gcs deployment

FlowのKubernetes Jobを確認すると、Prefect公式image prefecthq/prefect:2.13.7-python3.11 を起動していることがわかります。

base image gcs kubernetes job

まとめ

PrefectでワークフローをKubernetes Jobとして実行する方法を紹介しました。

Prefect 2.10以降に出たworkerの登場で、実行インフラの設定が簡易化されたため、PrefectでKubernetesリソースを使用するハードルが下がったなと感じます。

Prefectの仕様変更が多いため、世に出ているドキュメントの情報が古くなってきています。

本記事は2023年12月に書かれたPrefect 2.14.6での情報です。今後仕様変更が起き本記事の内容が古くなる可能性があるのでご注意ください。