概要
PrefectではFlow内部で別のFlowを実行する、Sub Flowという概念があります。
Flows - Prefect Docs
例えばMLの学習パイプラインを
- 学習データの取得
- 前処理
- 学習
のように処理に分ける場合、各処理を1つのFlowとしてグループ化できます。
Sub Flowを呼び出し元Flowから直接実行すると、呼び出し元Flowのインフラ環境で実行されます。
MLの学習パイプラインのように、処理に応じてリソースを分けて実行したいケースが多いと思います。
- 学習データの取得 (CPU: 1024, MEMORY: 2048)
- 前処理 (CPU: 512, MEMORY: 1024)
- 学習 (CPU: 2048, MEMORY: 4096)
Sub Flowごとに実行環境を分ける場合、run_deployment関数を使ってSub Flowを呼び出す方法で実現できます。 Deployments - Prefect Docs
run_deploymentを使用して、Sub Flowごとに異なる実行環境で実行する方法をまとめます。
実行環境
- python 3.8.7
- prefect 2.10.18
- prefect-aws 0.3.5
本文中コード
github.com
前準備
ECS TaskとしてFlowを実行するために、ECS ServiceとしてAgentを起動します。
TerrafromでAgentをホスティングするECS Serviceを立ち上げます。
$ git clone https://github.com/nsakki55/code-for-blogpost $ cd prefect_subflow_orchestration/src/infra $ terraform init $ terraform apply -var-file=my_secret.tfvars
Terrafromの実装はこちらです: code-for-blogpost/prefect_subflow_orchestration/src/infra at main · nsakki55/code-for-blogpost · GitHub
Prefect Cloudにログインを行います。
$ prefect cloud login
ECS Task BlockをPrefect Cloudに登録します。
from prefect_aws.ecs import ECSTask ecs = ECSTask( env={"EXTRA_PIP_PACKAGES": "s3fs"}, cluster="arn:aws:ecs:ap-northeast-1:*****:cluster/prefect-ecs", cpu="256", memory="512", stream_output=True, configure_cloudwatch_logs=True, execution_role_arn="arn:aws:iam::*****:role/prefectEcsTaskExecutionRole", task_role_arn="arn:aws:iam::*****:role/prefectEcsTaskRole", vpc_id="vpc-*****", task_customizations=[ {"op": "replace", "path": "/networkConfiguration/awsvpcConfiguration/assignPublicIp", "value": "DISABLED"}, {"op": "add", "path": "/networkConfiguration/awsvpcConfiguration/subnets", "value": ["subnet-******"]}, ], ) ecs.save("ecs-task-block", overwrite=True)
$ python ecs_task_block.py
今回はFlowのコードをS3で保持するため、S3 BlockをPrefect Cloudに登録します。
import os from prefect.filesystems import S3 block = S3( bucket_path="prefect-subflow", aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"], aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"], ) block.save("s3-block", overwrite=True)
$ python s3_block.py
Sub Flowを作成
別のFlowから呼び出すSub Flowを作成します。
以下のように、3つのFlowをSub Flowとして呼び出してみようと思います。
import time from prefect import flow, task @task() def a(): print("task A") time.sleep(5) @task() def b(): print("task B") time.sleep(5) @task() def c(): print("task C") time.sleep(5) @flow(name="Sub Flow1") def sub_flow1(): print("sub flow1") a() b() @flow(name="Sub Flow2") def sub_flow2(): print("sub flow2") b() c() @flow(name="Sub Flow3") def sub_flow3(): print("sub flow3") a() c()
Sub Flowを直接実行する
公式ドキュメントで紹介されてる、sub flowを直接呼び出す方法で実行してみます。
Flows - Prefect Docs
以下のように、3つのsub flowを、main flowから呼び出します。
from prefect import flow from sub_flow import sub_flow1, sub_flow2, sub_flow3 @flow(name="Main Flow") def main_flow(): print("main flow") sub_flow1() sub_flow2() sub_flow3() if __name__ == "__main__": main_flow()
main flowのdeploymentをPrefect Cloudに登録します。
作成済みのECS Task Blockと、S3 Blockを使用します。
from main_flow import main_flow from prefect.deployments import Deployment from prefect.filesystems import S3 from prefect_aws.ecs import ECSTask ecs_task_block = ECSTask.load("ecs-task-block") s3_block = S3.load("s3-block") deployment = Deployment.build_from_flow( flow=main_flow, name="main-flow1-deployment", work_pool_name="ecs", infrastructure=ecs_task_block, storage=s3_block, ) deployment.apply()
$ python main_flow1_deployment.py
Prefect Cloudからmainのflowを実行します。
実行画面を見ると、Sub Flowの詳細をmainのflow画面から確認することができます。
実行されるECS Taskは、main flowが実行されるECS Taskのみで、sub flowがmain flowのECS Taskの環境で実行されています。
Sub Flowごとに実行環境を分ける方法
run_deploymentを用いて、mainのflowからsub flowのdeploymentを実行します。
sub flowのdeploymentをPrefect Cloudに登録します。
作成したECS Task Blockを使用しますが、infra_overrides に値を指定することで、使用するCPU, MEMORYを変更できます。
3つのsub flowごとに異なるリソースを設定します。
from prefect.deployments import Deployment from prefect.filesystems import S3 from prefect_aws.ecs import ECSTask from sub_flow import sub_flow1, sub_flow2, sub_flow3 ecs_task_block = ECSTask.load("ecs-task-block") s3_block = S3.load("s3-block") deployment_sub_flow1 = Deployment.build_from_flow( flow=sub_flow1, name="sub-flow1-deployment", work_pool_name="ecs", infrastructure=ecs_task_block, storage=s3_block, infra_overrides={"cpu": 512, "memory": 1024}, ) deployment_sub_flow1.apply() deployment_sub_flow2 = Deployment.build_from_flow( flow=sub_flow2, name="sub-flow2-deployment", work_pool_name="ecs", infrastructure=ecs_task_block, storage=s3_block, infra_overrides={"cpu": 1024, "memory": 2048}, ) deployment_sub_flow2.apply() deployment_sub_flow3 = Deployment.build_from_flow( flow=sub_flow3, name="sub-flow3-deployment", work_pool_name="ecs", infrastructure=ecs_task_block, storage=s3_block, infra_overrides={"cpu": 2048, "memory": 4096}, ) deployment_sub_flow3.apply()
$ python sub_flow_deployment.py
mainのflowから、登録したsub flowのdeploymentを呼び出します。
run_deployment
関数を使用することで、指定したdeploymentをflowから実行することができます。実行パラメータ、tag、実行スケジュールを引数から設定することができます。
prefect/src/prefect/deployments/deployments.py at 4c8d8e4fe055cec7fa07da80cc21ff08e32f03f6 · PrefectHQ/prefect · GitHub
from prefect import flow from prefect.deployments import run_deployment @flow(name="Main Flow") def main_flow(): print("main flow") run_deployment(name="Sub Flow1/sub-flow1-deployment") run_deployment(name="Sub Flow2/sub-flow2-deployment") run_deployment(name="Sub Flow3/sub-flow3-deployment") if __name__ == "__main__": main_flow()
mainのflowのdeploymentをPrefect Cloudに登録します。
from prefect.deployments import Deployment from prefect.filesystems import S3 from prefect_aws.ecs import ECSTask from main_flow2 import main_flow ecs_task_block = ECSTask.load("ecs-task-block") s3_block = S3.load("s3-block") deployment = Deployment.build_from_flow( flow=main_flow, name="main-flow2-deployment", work_pool_name="ecs", infrastructure=ecs_task_block, storage=s3_block, ) deployment.apply()
$ python main_flow2_deployment.py
Prefect Cloudからmainのflowを実行します。
直接sub flowを呼び出す場合と同様、sub flowの詳細をmainのflow画面から確認することができます。
sub flowごとにECS Taskを起動するため、Sub Flowの実行にダウンタイムがあります。
ECS Taskを確認すると、mainのflowのECS Taskとは別に、sub flowのECS Taskが実行されています。
使用されるCPU・MEMORYは、sub flowごとのdeploymentで設定した値が使用されます。
まとめ
run_deploymentを使用して、sub flowを実行することで、sub flowごとに異なる計算リソースで実行する方法を紹介しました。
MLの学習パイプラインのように、処理単位で必要な計算リソースが異なるワークフローでは、各処理をsub flowにまとめて、別々のリソースを割り当てるのが便利です。