肉球でキーボード

MLエンジニアの技術ブログです

Prefect FlowをECS Taskで実行する方法

概要

PrefectではInfrastructure blockにECS Task Blockを指定することで、FlowをECS Taskで実行することができます。

本記事では、FlowをECS Taskで実行する2パターンの手順を紹介します。

本文中コード: https://github.com/nsakki55/prefect-ecs-task-flow

FlowをECS Taskで実行する方法

FlowをECS Taskとして実行する方法は2パターンあります

  1. BuiltIn Container+Storage
  2. Custom Container

1つ目の方法では、Prefect公式 Docker imageを利用し、Flow実行時に必要なファイルをダウンロードします。

ECS Task Blockでは、実行imageが指定されない場合、AgentのPrefect・Pythonバージョンの公式Docker Imageが使用されます

実行するFlowコードがImage中に含まれないため、Storage Blockを設定し、Image起動時にダウンロードします。

2つ目の方法では、依存関係とFlowコードを含んだ自作imageを利用します。

必要なファイルを含んだDockerfileを作成し、ECS Task Blockの実行imageに自作imageを指定します。

前準備

AgentをECS Serviceで起動する

Flowの実行制御を行うAgentを起動します。

AWSでAgentをホスティングする場合、ECS Serviceを利用する方法があります。

ECS ServiceでAgentを起動する、ECS Task定義は以下のように書けます。

ECS Taskの起動imageに、Prefect公式image prefecthq/prefect:2.8.7-python3.8 を指定します。

実行command prefect agent start -p ecs-pool で、Agentを起動します。

Prefect Cloudを認識するために、 PREFECT_API_URLPREFECT_API_KEY を設定します。

[
  {
    "name": "prefect-ecs-agent",
    "image": "prefecthq/prefect:2.8.7-python3.8",
    "memory": 2048,
    "cpu": 1024,
    "environment": [ 
      {
        "name": "PREFECT_LOGGING_LEVEL",
        "value": "INFO"
      },
      {
        "name": "PREFECT_API_URL",
        "value": "${prefect_api_url}"
      },
      {
        "name": "PREFECT_API_KEY",
        "value": "${prefect_api_key}"
      }
    ],
    "command": [
      "prefect",
      "agent",
      "start",
      "-p",
      "ecs-pool"
    ]
  }
]

ECS ServiceでECS Agentを起動するTerraformを作成しました。

https://github.com/nsakki55/prefect-ecs-task-flow/tree/master/infra

infra/secret.tf の変数を、ご自身の設定に変更してください。

access_key      = "*****" # AWS Access Key
secret_key      = "*****" # AWS Secret Key
region          = "*****" # AWS Region
prefect_api_key = "*****" # Prefect Cloud API Key

terraformコマンドでECS Service実行に必要なリソースを作成します。

$ terraform init
$ terraform apply -var-file=secret.tfvars

AgentのECS Serviceが起動できていることを確認できます。

Flowの作成

実行するFlowを用意します。

以下のタスクを組み合わせた、シンプルなETLワークフローを実行します。

  • extract: S3からファイルをダウンロード
  • transform: ダウンロードしたファイルを集計
  • load: S3に集計ファイルをアップロード

このFlowを実行するにはpandas をインストールする必要があります。

from prefect import flow, task
import pandas as pd
import boto3

s3_client = boto3.Session().client("s3")
S3_BUCKET = "prefect-etl-tutorial"

@task(name="extract task")
def extract(file_name: str) -> pd.DataFrame:
    s3_client.download_file(S3_BUCKET, file_name, file_name)
    df = pd.read_csv(file_name)
    return df

@task(name="transform task")
def transform(df: pd.DataFrame) -> pd.DataFrame:
    df_transformed = df.groupby(["type1"])["weight_kg"].mean().reset_index()
    return df_transformed

@task(name="load task")
def load(df: pd.DataFrame, file_name: str) -> None:
    df.to_csv(file_name)
    s3_client.upload_file(file_name, S3_BUCKET, file_name)

@flow(name="ETL Flow")
def etl_flow(download_file_name: str, upload_file_name: str):
    extracted_df = extract(file_name=download_file_name)
    transformed_df = transform(extracted_df)
    load(transformed_df, file_name=upload_file_name)

BuiltIn Container + Storage

Storage Blockに S3 Blockを使用し、Prefect公式imageを使用する方法を紹介します。

図のように、Prefect公式のDocker ImageをECS Taskとして起動し、FlowコードをS3からダウンロードする構成を作成します。

S3 Block作成

FlowコードをS3で管理するために、S3 Blockを作成します。

DeploymentをPrefect Cloudに登録する際に、Flowコードが自動的にS3にアップロードされます。

今回は prefect-etl というバケットを指定し、 etl-s3-block という名前でS3 Blockを作成します。

from prefect.filesystems import S3
import os

block = S3(bucket_path="prefect-etl",
           aws_access_key_id=os.environ['AWS_ACCESS_KEY_ID'],
           aws_secret_access_key=os.environ['AWS_SECRET_ACCESS_KEY']
)
block.save("etl-s3-block", overwrite=True)

ECS Task Block作成

FlowをECS Taskとして実行するために、ECS Task Blockを作成します。

起動imageを指定しない場合、ECS AgentのPrefect・Pythonバージョンの公式imageが使用されます。

Flow実行に必要な依存関係を、image起動時にインストールするために、 EXTRA_PIP_PACKAGES環境変数に設定します。

ネットワークの設定は、Agentを起動してるECS Serviceと同じ値を設定しています。

from prefect_aws.ecs import ECSTask

ecs = ECSTask(
    env={"EXTRA_PIP_PACKAGES": "s3fs pandas boto3"},
    cluster="arn:aws:ecs:ap-northeast-1:*****:cluster/prefect-ecs",
    cpu="256",
    memory="512",
    stream_output=True,
    configure_cloudwatch_logs=True,
    execution_role_arn="arn:aws:iam::*****:role/prefectEcsTaskExecutionRole",
    task_role_arn="arn:aws:iam::*****:role/prefectEcsTaskRole",
    vpc_id="vpc-*****",
    task_customizations=[
  {
    "op": "replace",
    "path": "/networkConfiguration/awsvpcConfiguration/assignPublicIp",
    "value": "DISABLED"
  },
  {
    "op": "add",
    "path": "/networkConfiguration/awsvpcConfiguration/subnets",
    "value": [
      "subnet-*****"
    ]
  }
]
)
ecs.save("builtin-image-ecs-task-block", overwrite=True)

Deployment登録

StorageにS3 Block、InfrastructureにECS Task Blockを指定したDeploymentを作成します。

先ほど作成したS3 Block, ECS Task Blockを指定します。

from my_flow import etl_flow
from prefect.deployments import Deployment
from prefect_aws.ecs import ECSTask
from prefect.filesystems import S3

s3_block = S3.load("etl-s3-block")
ecs_task_block = ECSTask.load("base-image-ecs-task-block")

deployment = Deployment.build_from_flow(
    flow=etl_flow,
    name="base-image-s3-deployment",
    work_pool_name="ecs-pool",
    storage=s3_block,
    infrastructure=ecs_task_block,
)

deployment.apply()

Flow実行

Prefect CloudにDeploymentが登録されているか確認します。

作成したS3 Block, ECS Task Blockが設定できてることを確認できます。

Flowを実行すると、FlowがECS Taskとして実行されます。

実行されるECS Taskのimageを確認すると、Agentと同じPrefect・Python バージョンである、

prefecthq/prefect:2.8.7-python3.8 となっています。

Custom Container

依存関係とFlowコードを含んだ自作imageをECS Taskで実行します。

下図のように、ECRから自作imageを取得し、ECS Taskとして実行する構成を作成します。

自作imageの作成

依存関係と、Flowファイルを含めたDockerfileを作成します。

flowファイルは opt/prefect/flows 以下に配置します。

(Prefect CloudからのFlow実行時、 /opt/prefect/flows 以下のFlowファイルを実行するためです。)

FROM prefecthq/prefect:2.8.7-python3.8

COPY requirements.txt requirements.txt
RUN pip install -r requirements.txt

COPY my_flow.py /opt/prefect/flows/my_flow.py

imageをbuildして、ECRにpushします。

prefect-custom-container というimage名で保存します。

$ docker build -t prefect-custom-container .
$ docker tag prefect-custom-container:latest *****.dkr.ecr.ap-northeast-1.amazonaws.com/prefect-custom-container:latest
$ docker push *****.dkr.ecr.ap-northeast-1.amazonaws.com/prefect-custom-container:latest

ECS Task Block作成

自作imageを指定した、ECS Task Blockを作成します。

起動imageにECRにpushしたimage名を指定します。

from prefect_aws.ecs import ECSTask

ecs = ECSTask(
    image="*****.dkr.ecr.ap-northeast-1.amazonaws.com/prefect-custom-container:latest",
    cluster="arn:aws:ecs:ap-northeast-1:*****:cluster/prefect-ecs",
    cpu="256",
    memory="512",
    stream_output=True,
    configure_cloudwatch_logs=True,
    execution_role_arn="arn:aws:iam::*****:role/prefectEcsTaskExecutionRole",
    task_role_arn="arn:aws:iam::*****:role/prefectEcsTaskRole",
    vpc_id="vpc-******",
    task_customizations=[
  {
    "op": "replace",
    "path": "/networkConfiguration/awsvpcConfiguration/assignPublicIp",
    "value": "DISABLED"
  },
  {
    "op": "add",
    "path": "/networkConfiguration/awsvpcConfiguration/subnets",
    "value": [
      "subnet-*****"
    ]
  }
]
)
ecs.save("custom-container-ecs-task-block", overwrite=True)

Deployment登録

Infrastructure Blockに、作成したECS Task Blockを設定したDeploymentを登録します。

Storage Blockは明示的に指定しません。

from my_flow import etl_flow
from prefect.deployments import Deployment
from prefect_aws.ecs import ECSTask

ecs_task_block = ECSTask.load("custom-container-ecs-task-block")

deployment = Deployment.build_from_flow(
    flow=etl_flow,
    name="custom-container-deployment",
    work_pool_name="ecs-pool",
    infrastructure=ecs_task_block,
)

deployment.apply()

Flow実行

Prefect CloudにDeploymentが登録されているか確認します。

作成したECS Task Blockが設定されています。

Flowを実行すると、FlowがECS Taskとして実行されます。

実行されたECS Task定義を確認すると、起動imageに自作imageが指定されています。

まとめ

PrefectのFlowをECS Taskで実行する2つの方法を紹介しました。

  1. BuiltIn Container+Storage
  2. Custom Container

ECS Task Blockの設定はハマりやすいポイントなので、Prefect導入時の参考になれば幸いです。
本文中コード: https://github.com/nsakki55/prefect-ecs-task-flow

参考