肉球でキーボード

MLエンジニアの技術ブログです

Prefect 2.0に入門する

本文中のコード github.com

Prefect 2.0

2022年7月にPrefect 2.0が正式リリースされました。
Prefect1.0のサポートが2023年5月1日に終了するため、Prefect 2.0への移行が必須となっています。
Prefect 1.0から2.0に移行するメリットは次のmediumの記事が参考になります。
Reasons to Transition from Prefect 1.0 to 2.0 | by Madison Schott | Medium

Prefect 1.0と2.0の違い

prefect 2.0の公式ドキュメント「Migrating from Prefect 1 to Prefect 2」で Prefect 1.0と2.0の機能の違いが説明されているので、変化があった主要機能を取り上げます。

共通機能

  • TaskFlow がワークフローの基礎要素
  • ワークフローの実行管理・タスクの状態追跡を行う
  • ローカルPCでワークフローを実行・検証を行える
  • ネガティブエンジニアリングを解決する思想に則って、データフローの骨組みを提供する
  • ハイブリッドモデルを採用し、ワークフローのコード・計算リソースはユーザーが管理

なくなった機能

Parameter

Flowの実行パラメータを渡すためのParameterオブジェクトが廃止されました。

Prefect 1.0

Parameterオブジェクトを作成し、1つのタスクとしていました。

from prefect import task, Flow, Parameter

@task
def print_parameter(x: int):
    print(x)

with Flow("param-flow") as flow:
    x = Parameter('x', default = 1)
    print_parameter(x=x)

Prefect 2.0

flowの関数に引数を渡すだけで、実行パラメータとすることができます。

typing, pydanticによる型ヒントを利用することができ、通常のPythonコードらしい書き方でFlowを記述できるようになりました。

from prefect import task, flow

@task
def print_parameter(x: int):
    print(x)

@flow(name="param-flow")
def my_flow(x: int):
    print_parameter(x=x)

state_handlers

タスクの状態に基づいた処理を制御する、state_hadlerが廃止されました。

Prefect 1.0

Taskデコレータに、タスク状態に応じた動作を記述したhandlerを渡していました。

from prefect import task, Flow

def message_on_failure(task, old_state, new_state):
    if new_state.is_failed():
            print('task failed')
    return new_state

@task(state_handlers=[message_on_failure])
def fail_successfully(x):
    return 1 / x

with Flow(name="fail") as flow:
    result = fail_successfully(x=0)

Prefect 2.0

タスクは PrefectFuture オブジェクトを返し、 PrefectFuture でタスクの状態を判定することで、明示的にエラー処理を記述することとなりました。

from prefect import task, flow
from prefect.futures import PrefectFuture

def message_on_failure(task_future: PrefectFuture):
    task_future.wait()
    if task_future.get_state().is_failed():
        print('task failed')

@task
def fail_successfully(x):
    return 1 / x

@flow
def my_flow(nr: int):
    future_obj = fail_successfully.submit(nr)
    message_on_failure(future_obj)

signal

タスク状態を任意に設定する signalの利用が廃止されました

Prefect 1.0

事前に定義されたsignalsオブジェクトでタスク状態を指定していました。

from prefect.engine.signals import FAIL, ENDRUN, SUCCESS, SKIP, PAUSE, RETRY
from prefect import task

@task
def signal_task(message):
    if message == 'stop_immediately!':
        raise FAIL(message='Got a signal to end the task run!')

Prefect 2.0

独自エラー型を使用することができるようになりました。

from prefect import task

@task
def signal_task(message):
    if message == 'stop_immediately!':
        raise RuntimeError(message='Got a signal to end the task run!')

case

条件判定 case が廃止されました。

Prefect 1.0

条件に応じて処理を分岐させる場合、case を使用した比較が必要でした。

from prefect import task, Flow, case

@task
def check_condition():
    return random() < 0.5

with Flow("conditional-branches") as flow:
    cond = check_condition()

    with case(cond, True):
        task_true(val)

    with case(cond, False):
        task_false(val)

Prefect 2.0

if elseを使って、より通常のPythonコードに近い記述方法で条件分岐を書けるようになりました。

from prefect import task, Flow, case

@task
def check_condition():
    return random() < 0.5

flow(name="conditional-flow")
def my_flow()
    cond = check_condition()

    if cond:
        task_true()
    else:
        task_false()

変更があった機能

flowの定義方法

Prefect 1.0

コンテクストマネージャーを使用し、DAG(有効循環グラフ)を作成しFlowを定義していました。

from prefect import Flow

with Flow("flow name") as flow:

Prefect 2.0

DAGを作成しFlowを構築する方法が廃止されました。

モダンで動的なデータフローを構築するのに、DAGを用いるのは過度な制約をもたらすと考えられているためです。

通常のPython関数にflowデコレータを加えることで、PrefectがFlowとして認識することができるようになりました。

from prefect import flow

@flow(name="flow name")
def my_flow_function():

taskの実行設定

タスクを逐次実行・並列実行・分散実行を制御する方法が変わりました。

prefect 1.0

Executorを用いた制御。

from prefect.executors import LocalExecutor

with Flow("flow name", executor=LocalExecutor()) as flow:

Prefect 2.0

TaskRunnerを用いた制御。

from prefect.task_runners import SequentialTaskRunner

@flow(task_runner=SequentialTaskRunner())
def my_flow_function():

flowの実行設定

Flowの実行環境(Docker, ECS Task, Kubernetes Job, etc..)・管理環境(GitHub, S3, GCS, Docker)を指定する方法が変わりました。

Prefect 1.0

Flowに対して run_config を与えて、実行環境の設定をFlowオブジェクト自身に持たせていました。

from prefect.run_configs import DockerRun
from prefect.storage import S3
from prefect import Flow

with Flow(
    "flow_name",
    run_config=DockerRun(image="prefecthq/prefect:1.0"),
    storage=S3(bucket="bucket_name"),
) as flow:
    ...

Prefect 2.0

Flowの実行設定は、Flowオブジェクトとは分離された Deployment で管理することになりました。

Flowとは切り離されたため、実行環境を柔軟に変更することができるようになりました。

prefect deployment build コマンドを実行することで、Flowの実行設定である deployment.yaml が生成されます。buildコマンドの引数にFlowの管理場所、実行環境を指定します。

prefect deployment build flows/my_flow.py:flow_name --name docker-custom --tag dev -sb s3/dev -ib docker-container/docker-custom-image

prefect deployment apply コマンドでPrefect UIにFlowを登録します。

prefect deployment apply deployment.yaml

実行スケジュール設定

Prefect 1.0

Flowの実行スケジュールをFlowオブジェクト自身に持たせていました。

from prefect import Flow
from prefect.schedules import IntervalSchedule

schedule = IntervalSchedule(interval=timedelta(minutes=15))

with Flow("scheduled_flow", schedule=schedule) as flow:

Prefect 2.0

Deployment で実行スケジュールを管理することになり、Flowオブジェクトから分離されました。

prefect deployment build schedule_flow.py:my_flow --cron "0 0 * * *"

ロギング

Flow実行時のロギングの仕様が変わりました。

Prefect 1.0

Task内でのみロギングが可能。loggerはprefectのcontextから取得していました。

import prefect

@task
def my_task():
    logger = prefect.context.get("logger")
    logger.info("An info message.")
    logger.warning("A warning message.")

Prefect 2.0

Taskだけでなく、Flow内でもロギングが可能。loggerの取得方法が変わりました。

from prefect import flow, task, get_run_logger

@task
def my_task():
    logger = get_run_logger()
    logger.info("Hello from task")

@flow
def flow_with_logging():
    logger = get_run_logger()
    logger.info("Hello from flow")   

ETLワークフローを組んでみる

S3から生データをダウンロードし、集計データをS3にアップロードするETLワークフローをPrefectで組んでみます。

ETL

データセットには、ポケモンの個体情報データセットを使用し、属性タイプごとの統計量を集計します。

The Complete Pokemon Dataset | Kaggle

今回組んだETLのコードはGitHubにまとめています。 github.com

install

Prefect 2.0を使う場合、Python 3.7以上の環境を用意します。
Installation - Prefect 2 - Coordinating the world's dataflows

pipでprefectをインストールすると、自動的にPrefect 2.0がインストールされます。

pip install -U prefect

Flowの作成

3つのタスクから構成されるETLワークフローを実装します。

  • extract: S3から生データをダウンロード
  • transform: 生データを受け取り、属性タイプの統計量を集計
  • extract: 集計データをS3にアップロード

通常のPythonコードを書く感覚でFlowを記述できます。

from prefect import flow, task, get_run_logger
import pandas as pd
from prefect_aws import AwsCredentials

# AWS Credentials blockからAWS認証情報を使用してs3 clientを作成
aws_credentials_block = AwsCredentials.load("default")
s3_client = aws_credentials_block.get_boto3_session().client("s3")

S3_BUCKET = "prefect-etl-tutorial"

@task(name="extract task")
def extract(file_name: str) -> pd.DataFrame:
    """
    Download an object from S3 bucket.

    Args:
        file_name: Name of bucket download file name.
    Returns:
        download file with dataframe format.
    """
    logger = get_run_logger()
    logger.info(f"download {file_name} from s3")

    s3_client.download_file(S3_BUCKET, file_name, file_name)
    df = pd.read_csv(file_name)
    return df

@task(name="transform task")
def transform(df: pd.DataFrame) -> pd.DataFrame:
    """
    Transform extract dataframe.

    Args:
        df: dataframe downloaded from s3
    Returns:
        transformed dataset
    """
    logger = get_run_logger()
    logger.info("transform extract dataset")

    df_transformed = df.groupby(["type1"]).mean().reset_index()
    return df_transformed

@task(name="load task")
def load(df: pd.DataFrame, file_name: str) -> None:
    """
    Upload an object to S3 bucket.

    Args:
        df: upload dataframe object.
        file_name: Name of bucket download file name.
    """

    logger = get_run_logger()
    logger.info(f"upload {file_name} to s3")

    df.to_csv(file_name)
    s3_client.upload_file(file_name, S3_BUCKET, file_name)

@flow(name="ETL Flow")
def etl_flow(download_file_name: str, upload_file_name: str):
    logger = get_run_logger()
    logger.info("ETL flow start")

    extracted_df = extract(file_name=download_file_name)
    transformed_df = transform(extracted_df)
    load(transformed_df, file_name=upload_file_name)

    logger.info("ETL flow finished")

if __name__ == "__main__":
    etl_flow(download_file_name="pokemon.csv", upload_file_name="pokemon_transformed.csv")

上記のコードを実行すると、コンソールにログが出力されます。

Taskの状態をPrefectが追跡できています。

(prefect2) etl $ python etl_flow.py
23:15:59.581 | INFO    | prefect.engine - Created flow run 'stalwart-zebu' for flow 'ETL Flow'
23:16:01.691 | INFO    | Flow run 'stalwart-zebu' - ETL flow start
23:16:01.947 | INFO    | Flow run 'stalwart-zebu' - Created task run 'extract task-0' for task 'extract task'
23:16:01.948 | INFO    | Flow run 'stalwart-zebu' - Executing 'extract task-0' immediately...
23:16:02.682 | INFO    | Task run 'extract task-0' - download prefect-etl-tutorial/pokemon.csv from s3
23:16:03.194 | INFO    | Task run 'extract task-0' - Finished in state Completed()
23:16:03.727 | INFO    | Flow run 'stalwart-zebu' - Created task run 'transform task-0' for task 'transform task'
23:16:03.728 | INFO    | Flow run 'stalwart-zebu' - Executing 'transform task-0' immediately...
23:16:04.424 | INFO    | Task run 'transform task-0' - transform extract dataset
23:16:04.704 | INFO    | Task run 'transform task-0' - Finished in state Completed()
23:16:05.043 | INFO    | Flow run 'stalwart-zebu' - Created task run 'load task-0' for task 'load task'
23:16:05.044 | INFO    | Flow run 'stalwart-zebu' - Executing 'load task-0' immediately...
23:16:05.757 | INFO    | Task run 'load task-0' - upload prefect-etl-tutorial/pokemon_transformed.csv to s3
23:16:06.422 | INFO    | Task run 'load task-0' - Finished in state Completed()
23:16:06.427 | INFO    | Flow run 'stalwart-zebu' - ETL flow finished
23:16:06.722 | INFO    | Flow run 'stalwart-zebu' - Finished in state Completed('All states completed.')

Prefect Cloudでワークフローを実行する

上記のFlowをPrefect Cloudに登録し、UIからFlowを実行してみます。

FlowをPrefect Cloudに登録するには、Storage(Flowの管理場所)・Infrastracture(Flowの実行環境)を設定したDeploymentを作成します。

今回は次の構成でDeploymentを作成します。

  • Storage: S3
  • Infrastracture: Docker Container

Prefect Cloudのアカウントを作成し、ターミナルからログインを行います。

https://app.prefect.cloud/

$ prefect cloud login

Blockの作成

Prefet 2.0では外部システムとの連携するためにの設定を Block という機能で管理します。

Storage and Infrastructure - Prefect 2 - Coordinating the world's dataflows

今回はS3・Docker ContainerのBlockを作成します。

Blockの作成により、FlowのコードをS3にアップロードし、ローカルPC上でDocker ContainerとしてFlowを実行できるようになります。

S3 Blockは、バケット名・認証情報を設定します。

S3 Block

Docker Container Blockは、追加でインストールするライブラリを記述します。今回はpandasと、S3とのやりとりを行うために s3fs, prefect-awsを追加インストールします。

Docker Container Block

Deploymentの登録

作成したBlockを指定して、Flowの実行設定を記述したDeploymentを作成します。

prefect deployment build ./etl_flow.py:etl_flow -n etl-flow -sb s3/s3-block -ib docker-container/docker-block -q dev -o deployment.yaml

コマンドの要素の役割は以下のようになります

  • prefect deployment build: deploymentを作成するCLIコマンド
  • ./etl_flow.py:etl_flow: Flowを記述したPythonファイルと、Flow関数名を指定
  • -n etl-flow: deploymentの名前
  • -sb s3/s3-block: Storageの指定. 作成したS3 Blockを利用します
  • -ib docker-container/docker-block: Infrastructureの指定. 作成したDocker Container Blockを利用します。
  • -q dev:ワークキューの指定
  • -o deployment.yaml: 作成するdeploymentのファイル名

作成したDeploymentをPrefect Cloudに登録します

prefect deployment apply deployment.yaml

実行

ローカルPC上にAgentを起動させておきます。

prefect agent start -q dev

Prefect Cloud画面のDeploymentページから登録したETL Flowを実行します。

deployment on prefect cloud

Flowの実行画面には、出力ログや実行タスクの状態が出力されています。

Prefet 1.0の時に比べて、情報量が減りスッキリしましたね。

flow execution

まとめ

Prefect 2.0でETLのFlowを組んで、Prefect Cloudから実行するところまでやってみました。

基本的な要素は1.0の時と変わりませんが、parameterや条件分技などの書き方が代わり、よりネイティブのPythonコードらしい書き方でFlowを構築できるようになりました。

参考