概要
PrefectではInfrastructure blockにECS Task Blockを指定することで、FlowをECS Taskで実行することができます。
本記事では、FlowをECS Taskで実行する2パターンの手順を紹介します。
本文中コード: https://github.com/nsakki55/prefect-ecs-task-flow
FlowをECS Taskで実行する方法
FlowをECS Taskとして実行する方法は2パターンあります
- BuiltIn Container+Storage
- Custom Container
1つ目の方法では、Prefect公式 Docker imageを利用し、Flow実行時に必要なファイルをダウンロードします。
ECS Task Blockでは、実行imageが指定されない場合、AgentのPrefect・Pythonバージョンの公式Docker Imageが使用されます。
実行するFlowコードがImage中に含まれないため、Storage Blockを設定し、Image起動時にダウンロードします。
2つ目の方法では、依存関係とFlowコードを含んだ自作imageを利用します。
必要なファイルを含んだDockerfileを作成し、ECS Task Blockの実行imageに自作imageを指定します。
前準備
AgentをECS Serviceで起動する
Flowの実行制御を行うAgentを起動します。
AWSでAgentをホスティングする場合、ECS Serviceを利用する方法があります。
ECS ServiceでAgentを起動する、ECS Task定義は以下のように書けます。
ECS Taskの起動imageに、Prefect公式image prefecthq/prefect:2.8.7-python3.8
を指定します。
実行command prefect agent start -p ecs-pool
で、Agentを起動します。
Prefect Cloudを認識するために、 PREFECT_API_URL
、 PREFECT_API_KEY
を設定します。
[ { "name": "prefect-ecs-agent", "image": "prefecthq/prefect:2.8.7-python3.8", "memory": 2048, "cpu": 1024, "environment": [ { "name": "PREFECT_LOGGING_LEVEL", "value": "INFO" }, { "name": "PREFECT_API_URL", "value": "${prefect_api_url}" }, { "name": "PREFECT_API_KEY", "value": "${prefect_api_key}" } ], "command": [ "prefect", "agent", "start", "-p", "ecs-pool" ] } ]
ECS ServiceでECS Agentを起動するTerraformを作成しました。
https://github.com/nsakki55/prefect-ecs-task-flow/tree/master/infra
infra/secret.tf
の変数を、ご自身の設定に変更してください。
access_key = "*****" # AWS Access Key secret_key = "*****" # AWS Secret Key region = "*****" # AWS Region prefect_api_key = "*****" # Prefect Cloud API Key
terraformコマンドでECS Service実行に必要なリソースを作成します。
$ terraform init $ terraform apply -var-file=secret.tfvars
AgentのECS Serviceが起動できていることを確認できます。
Flowの作成
実行するFlowを用意します。
以下のタスクを組み合わせた、シンプルなETLワークフローを実行します。
- extract: S3からファイルをダウンロード
- transform: ダウンロードしたファイルを集計
- load: S3に集計ファイルをアップロード
このFlowを実行するにはpandas
をインストールする必要があります。
from prefect import flow, task import pandas as pd import boto3 s3_client = boto3.Session().client("s3") S3_BUCKET = "prefect-etl-tutorial" @task(name="extract task") def extract(file_name: str) -> pd.DataFrame: s3_client.download_file(S3_BUCKET, file_name, file_name) df = pd.read_csv(file_name) return df @task(name="transform task") def transform(df: pd.DataFrame) -> pd.DataFrame: df_transformed = df.groupby(["type1"])["weight_kg"].mean().reset_index() return df_transformed @task(name="load task") def load(df: pd.DataFrame, file_name: str) -> None: df.to_csv(file_name) s3_client.upload_file(file_name, S3_BUCKET, file_name) @flow(name="ETL Flow") def etl_flow(download_file_name: str, upload_file_name: str): extracted_df = extract(file_name=download_file_name) transformed_df = transform(extracted_df) load(transformed_df, file_name=upload_file_name)
BuiltIn Container + Storage
Storage Blockに S3 Blockを使用し、Prefect公式imageを使用する方法を紹介します。
図のように、Prefect公式のDocker ImageをECS Taskとして起動し、FlowコードをS3からダウンロードする構成を作成します。
S3 Block作成
FlowコードをS3で管理するために、S3 Blockを作成します。
DeploymentをPrefect Cloudに登録する際に、Flowコードが自動的にS3にアップロードされます。
今回は prefect-etl
というバケットを指定し、 etl-s3-block
という名前でS3 Blockを作成します。
from prefect.filesystems import S3 import os block = S3(bucket_path="prefect-etl", aws_access_key_id=os.environ['AWS_ACCESS_KEY_ID'], aws_secret_access_key=os.environ['AWS_SECRET_ACCESS_KEY'] ) block.save("etl-s3-block", overwrite=True)
ECS Task Block作成
FlowをECS Taskとして実行するために、ECS Task Blockを作成します。
起動imageを指定しない場合、ECS AgentのPrefect・Pythonバージョンの公式imageが使用されます。
Flow実行に必要な依存関係を、image起動時にインストールするために、 EXTRA_PIP_PACKAGES
を環境変数に設定します。
ネットワークの設定は、Agentを起動してるECS Serviceと同じ値を設定しています。
from prefect_aws.ecs import ECSTask ecs = ECSTask( env={"EXTRA_PIP_PACKAGES": "s3fs pandas boto3"}, cluster="arn:aws:ecs:ap-northeast-1:*****:cluster/prefect-ecs", cpu="256", memory="512", stream_output=True, configure_cloudwatch_logs=True, execution_role_arn="arn:aws:iam::*****:role/prefectEcsTaskExecutionRole", task_role_arn="arn:aws:iam::*****:role/prefectEcsTaskRole", vpc_id="vpc-*****", task_customizations=[ { "op": "replace", "path": "/networkConfiguration/awsvpcConfiguration/assignPublicIp", "value": "DISABLED" }, { "op": "add", "path": "/networkConfiguration/awsvpcConfiguration/subnets", "value": [ "subnet-*****" ] } ] ) ecs.save("builtin-image-ecs-task-block", overwrite=True)
Deployment登録
StorageにS3 Block、InfrastructureにECS Task Blockを指定したDeploymentを作成します。
先ほど作成したS3 Block, ECS Task Blockを指定します。
from my_flow import etl_flow from prefect.deployments import Deployment from prefect_aws.ecs import ECSTask from prefect.filesystems import S3 s3_block = S3.load("etl-s3-block") ecs_task_block = ECSTask.load("base-image-ecs-task-block") deployment = Deployment.build_from_flow( flow=etl_flow, name="base-image-s3-deployment", work_pool_name="ecs-pool", storage=s3_block, infrastructure=ecs_task_block, ) deployment.apply()
Flow実行
Prefect CloudにDeploymentが登録されているか確認します。
作成したS3 Block, ECS Task Blockが設定できてることを確認できます。
Flowを実行すると、FlowがECS Taskとして実行されます。
実行されるECS Taskのimageを確認すると、Agentと同じPrefect・Python バージョンである、
prefecthq/prefect:2.8.7-python3.8
となっています。
Custom Container
依存関係とFlowコードを含んだ自作imageをECS Taskで実行します。
下図のように、ECRから自作imageを取得し、ECS Taskとして実行する構成を作成します。
自作imageの作成
依存関係と、Flowファイルを含めたDockerfileを作成します。
flowファイルは opt/prefect/flows
以下に配置します。
(Prefect CloudからのFlow実行時、 /opt/prefect/flows
以下のFlowファイルを実行するためです。)
FROM prefecthq/prefect:2.8.7-python3.8 COPY requirements.txt requirements.txt RUN pip install -r requirements.txt COPY my_flow.py /opt/prefect/flows/my_flow.py
imageをbuildして、ECRにpushします。
prefect-custom-container
というimage名で保存します。
$ docker build -t prefect-custom-container . $ docker tag prefect-custom-container:latest *****.dkr.ecr.ap-northeast-1.amazonaws.com/prefect-custom-container:latest $ docker push *****.dkr.ecr.ap-northeast-1.amazonaws.com/prefect-custom-container:latest
ECS Task Block作成
自作imageを指定した、ECS Task Blockを作成します。
起動imageにECRにpushしたimage名を指定します。
from prefect_aws.ecs import ECSTask ecs = ECSTask( image="*****.dkr.ecr.ap-northeast-1.amazonaws.com/prefect-custom-container:latest", cluster="arn:aws:ecs:ap-northeast-1:*****:cluster/prefect-ecs", cpu="256", memory="512", stream_output=True, configure_cloudwatch_logs=True, execution_role_arn="arn:aws:iam::*****:role/prefectEcsTaskExecutionRole", task_role_arn="arn:aws:iam::*****:role/prefectEcsTaskRole", vpc_id="vpc-******", task_customizations=[ { "op": "replace", "path": "/networkConfiguration/awsvpcConfiguration/assignPublicIp", "value": "DISABLED" }, { "op": "add", "path": "/networkConfiguration/awsvpcConfiguration/subnets", "value": [ "subnet-*****" ] } ] ) ecs.save("custom-container-ecs-task-block", overwrite=True)
Deployment登録
Infrastructure Blockに、作成したECS Task Blockを設定したDeploymentを登録します。
Storage Blockは明示的に指定しません。
from my_flow import etl_flow from prefect.deployments import Deployment from prefect_aws.ecs import ECSTask ecs_task_block = ECSTask.load("custom-container-ecs-task-block") deployment = Deployment.build_from_flow( flow=etl_flow, name="custom-container-deployment", work_pool_name="ecs-pool", infrastructure=ecs_task_block, ) deployment.apply()
Flow実行
Prefect CloudにDeploymentが登録されているか確認します。
作成したECS Task Blockが設定されています。
Flowを実行すると、FlowがECS Taskとして実行されます。
実行されたECS Task定義を確認すると、起動imageに自作imageが指定されています。
まとめ
PrefectのFlowをECS Taskで実行する2つの方法を紹介しました。
- BuiltIn Container+Storage
- Custom Container
ECS Task Blockの設定はハマりやすいポイントなので、Prefect導入時の参考になれば幸いです。
本文中コード: https://github.com/nsakki55/prefect-ecs-task-flow