肉球でキーボード

MLエンジニアの技術ブログです

PrefectでSub Flowごとに実行環境を分ける方法

概要

PrefectではFlow内部で別のFlowを実行する、Sub Flowという概念があります。
Flows - Prefect Docs

例えばMLの学習パイプラインを

  • 学習データの取得
  • 前処理
  • 学習

のように処理に分ける場合、各処理を1つのFlowとしてグループ化できます。

Sub Flowを呼び出し元Flowから直接実行すると、呼び出し元Flowのインフラ環境で実行されます。

MLの学習パイプラインのように、処理に応じてリソースを分けて実行したいケースが多いと思います。

  • 学習データの取得 (CPU: 1024, MEMORY: 2048)
  • 前処理 (CPU: 512, MEMORY: 1024)
  • 学習 (CPU: 2048, MEMORY: 4096)

Sub Flowごとに実行環境を分ける場合、run_deployment関数を使ってSub Flowを呼び出す方法で実現できます。 Deployments - Prefect Docs

run_deploymentを使用して、Sub Flowごとに異なる実行環境で実行する方法をまとめます。

実行環境
- python 3.8.7
- prefect 2.10.18
- prefect-aws 0.3.5

本文中コード
github.com

前準備

ECS TaskとしてFlowを実行するために、ECS ServiceとしてAgentを起動します。

TerrafromでAgentをホスティングするECS Serviceを立ち上げます。

$ git clone https://github.com/nsakki55/code-for-blogpost
$ cd prefect_subflow_orchestration/src/infra
$ terraform init
$ terraform apply -var-file=my_secret.tfvars

Terrafromの実装はこちらです: code-for-blogpost/prefect_subflow_orchestration/src/infra at main · nsakki55/code-for-blogpost · GitHub

Prefect Cloudにログインを行います。

$ prefect cloud login

ECS Task BlockをPrefect Cloudに登録します。

from prefect_aws.ecs import ECSTask

ecs = ECSTask(
    env={"EXTRA_PIP_PACKAGES": "s3fs"},
    cluster="arn:aws:ecs:ap-northeast-1:*****:cluster/prefect-ecs",
    cpu="256",
    memory="512",
    stream_output=True,
    configure_cloudwatch_logs=True,
    execution_role_arn="arn:aws:iam::*****:role/prefectEcsTaskExecutionRole",
    task_role_arn="arn:aws:iam::*****:role/prefectEcsTaskRole",
    vpc_id="vpc-*****",
    task_customizations=[
        {"op": "replace", "path": "/networkConfiguration/awsvpcConfiguration/assignPublicIp", "value": "DISABLED"},
        {"op": "add", "path": "/networkConfiguration/awsvpcConfiguration/subnets", "value": ["subnet-******"]},
    ],
)
ecs.save("ecs-task-block", overwrite=True)
$ python ecs_task_block.py

今回はFlowのコードをS3で保持するため、S3 BlockをPrefect Cloudに登録します。

import os

from prefect.filesystems import S3

block = S3(
    bucket_path="prefect-subflow",
    aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
    aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"],
)
block.save("s3-block", overwrite=True)
$ python s3_block.py

Sub Flowを作成

別のFlowから呼び出すSub Flowを作成します。

以下のように、3つのFlowをSub Flowとして呼び出してみようと思います。

import time

from prefect import flow, task

@task()
def a():
    print("task A")
    time.sleep(5)

@task()
def b():
    print("task B")
    time.sleep(5)

@task()
def c():
    print("task C")
    time.sleep(5)

@flow(name="Sub Flow1")
def sub_flow1():
    print("sub flow1")
    a()
    b()

@flow(name="Sub Flow2")
def sub_flow2():
    print("sub flow2")
    b()
    c()

@flow(name="Sub Flow3")
def sub_flow3():
    print("sub flow3")
    a()
    c()

Sub Flowを直接実行する

公式ドキュメントで紹介されてる、sub flowを直接呼び出す方法で実行してみます。
Flows - Prefect Docs

以下のように、3つのsub flowを、main flowから呼び出します。

from prefect import flow

from sub_flow import sub_flow1, sub_flow2, sub_flow3

@flow(name="Main Flow")
def main_flow():
    print("main flow")
    sub_flow1()
    sub_flow2()
    sub_flow3()

if __name__ == "__main__":
    main_flow()

main flowのdeploymentをPrefect Cloudに登録します。

作成済みのECS Task Blockと、S3 Blockを使用します。

from main_flow import main_flow
from prefect.deployments import Deployment
from prefect.filesystems import S3
from prefect_aws.ecs import ECSTask

ecs_task_block = ECSTask.load("ecs-task-block")
s3_block = S3.load("s3-block")

deployment = Deployment.build_from_flow(
    flow=main_flow,
    name="main-flow1-deployment",
    work_pool_name="ecs",
    infrastructure=ecs_task_block,
    storage=s3_block,
)

deployment.apply()
$ python main_flow1_deployment.py

Prefect Cloudからmainのflowを実行します。

実行画面を見ると、Sub Flowの詳細をmainのflow画面から確認することができます。

実行されるECS Taskは、main flowが実行されるECS Taskのみで、sub flowがmain flowのECS Taskの環境で実行されています。

Sub Flowごとに実行環境を分ける方法

run_deploymentを用いて、mainのflowからsub flowのdeploymentを実行します。

sub flowのdeploymentをPrefect Cloudに登録します。

作成したECS Task Blockを使用しますが、infra_overrides に値を指定することで、使用するCPU, MEMORYを変更できます。

3つのsub flowごとに異なるリソースを設定します。

from prefect.deployments import Deployment
from prefect.filesystems import S3
from prefect_aws.ecs import ECSTask

from sub_flow import sub_flow1, sub_flow2, sub_flow3

ecs_task_block = ECSTask.load("ecs-task-block")
s3_block = S3.load("s3-block")

deployment_sub_flow1 = Deployment.build_from_flow(
    flow=sub_flow1,
    name="sub-flow1-deployment",
    work_pool_name="ecs",
    infrastructure=ecs_task_block,
    storage=s3_block,
    infra_overrides={"cpu": 512, "memory": 1024},
)

deployment_sub_flow1.apply()

deployment_sub_flow2 = Deployment.build_from_flow(
    flow=sub_flow2,
    name="sub-flow2-deployment",
    work_pool_name="ecs",
    infrastructure=ecs_task_block,
    storage=s3_block,
    infra_overrides={"cpu": 1024, "memory": 2048},
)

deployment_sub_flow2.apply()

deployment_sub_flow3 = Deployment.build_from_flow(
    flow=sub_flow3,
    name="sub-flow3-deployment",
    work_pool_name="ecs",
    infrastructure=ecs_task_block,
    storage=s3_block,
    infra_overrides={"cpu": 2048, "memory": 4096},
)

deployment_sub_flow3.apply()
$ python sub_flow_deployment.py

mainのflowから、登録したsub flowのdeploymentを呼び出します。

run_deployment 関数を使用することで、指定したdeploymentをflowから実行することができます。実行パラメータ、tag、実行スケジュールを引数から設定することができます。
prefect/src/prefect/deployments/deployments.py at 4c8d8e4fe055cec7fa07da80cc21ff08e32f03f6 · PrefectHQ/prefect · GitHub

from prefect import flow
from prefect.deployments import run_deployment

@flow(name="Main Flow")
def main_flow():
    print("main flow")
    run_deployment(name="Sub Flow1/sub-flow1-deployment")
    run_deployment(name="Sub Flow2/sub-flow2-deployment")
    run_deployment(name="Sub Flow3/sub-flow3-deployment")

if __name__ == "__main__":
    main_flow()

mainのflowのdeploymentをPrefect Cloudに登録します。

from prefect.deployments import Deployment
from prefect.filesystems import S3
from prefect_aws.ecs import ECSTask

from main_flow2 import main_flow

ecs_task_block = ECSTask.load("ecs-task-block")
s3_block = S3.load("s3-block")

deployment = Deployment.build_from_flow(
    flow=main_flow,
    name="main-flow2-deployment",
    work_pool_name="ecs",
    infrastructure=ecs_task_block,
    storage=s3_block,
)

deployment.apply()
$ python main_flow2_deployment.py

Prefect Cloudからmainのflowを実行します。

直接sub flowを呼び出す場合と同様、sub flowの詳細をmainのflow画面から確認することができます。

sub flowごとにECS Taskを起動するため、Sub Flowの実行にダウンタイムがあります。

ECS Taskを確認すると、mainのflowのECS Taskとは別に、sub flowのECS Taskが実行されています。

使用されるCPU・MEMORYは、sub flowごとのdeploymentで設定した値が使用されます。

まとめ

run_deploymentを使用して、sub flowを実行することで、sub flowごとに異なる計算リソースで実行する方法を紹介しました。

MLの学習パイプラインのように、処理単位で必要な計算リソースが異なるワークフローでは、各処理をsub flowにまとめて、別々のリソースを割り当てるのが便利です。

参考