本文中のコード github.com
Prefect 2.0
2022年7月にPrefect 2.0が正式リリースされました。
Prefect1.0のサポートが2023年5月1日に終了するため、Prefect 2.0への移行が必須となっています。
Prefect 1.0から2.0に移行するメリットは次のmediumの記事が参考になります。
Reasons to Transition from Prefect 1.0 to 2.0 | by Madison Schott | Medium
Prefect 1.0と2.0の違い
prefect 2.0の公式ドキュメント「Migrating from Prefect 1 to Prefect 2」で Prefect 1.0と2.0の機能の違いが説明されているので、変化があった主要機能を取り上げます。
共通機能
Task
とFlow
がワークフローの基礎要素- ワークフローの実行管理・タスクの状態追跡を行う
- ローカルPCでワークフローを実行・検証を行える
- ネガティブエンジニアリングを解決する思想に則って、データフローの骨組みを提供する
- ハイブリッドモデルを採用し、ワークフローのコード・計算リソースはユーザーが管理
なくなった機能
Parameter
Flowの実行パラメータを渡すためのParameterオブジェクトが廃止されました。
Prefect 1.0
Parameterオブジェクトを作成し、1つのタスクとしていました。
from prefect import task, Flow, Parameter @task def print_parameter(x: int): print(x) with Flow("param-flow") as flow: x = Parameter('x', default = 1) print_parameter(x=x)
Prefect 2.0
flowの関数に引数を渡すだけで、実行パラメータとすることができます。
typing, pydanticによる型ヒントを利用することができ、通常のPythonコードらしい書き方でFlowを記述できるようになりました。
from prefect import task, flow @task def print_parameter(x: int): print(x) @flow(name="param-flow") def my_flow(x: int): print_parameter(x=x)
state_handlers
タスクの状態に基づいた処理を制御する、state_hadlerが廃止されました。
Prefect 1.0
Taskデコレータに、タスク状態に応じた動作を記述したhandlerを渡していました。
from prefect import task, Flow def message_on_failure(task, old_state, new_state): if new_state.is_failed(): print('task failed') return new_state @task(state_handlers=[message_on_failure]) def fail_successfully(x): return 1 / x with Flow(name="fail") as flow: result = fail_successfully(x=0)
Prefect 2.0
タスクは PrefectFuture
オブジェクトを返し、 PrefectFuture
でタスクの状態を判定することで、明示的にエラー処理を記述することとなりました。
from prefect import task, flow from prefect.futures import PrefectFuture def message_on_failure(task_future: PrefectFuture): task_future.wait() if task_future.get_state().is_failed(): print('task failed') @task def fail_successfully(x): return 1 / x @flow def my_flow(nr: int): future_obj = fail_successfully.submit(nr) message_on_failure(future_obj)
signal
タスク状態を任意に設定する signalの利用が廃止されました
Prefect 1.0
事前に定義されたsignalsオブジェクトでタスク状態を指定していました。
from prefect.engine.signals import FAIL, ENDRUN, SUCCESS, SKIP, PAUSE, RETRY from prefect import task @task def signal_task(message): if message == 'stop_immediately!': raise FAIL(message='Got a signal to end the task run!')
Prefect 2.0
独自エラー型を使用することができるようになりました。
from prefect import task @task def signal_task(message): if message == 'stop_immediately!': raise RuntimeError(message='Got a signal to end the task run!')
case
条件判定 case が廃止されました。
Prefect 1.0
条件に応じて処理を分岐させる場合、case
を使用した比較が必要でした。
from prefect import task, Flow, case @task def check_condition(): return random() < 0.5 with Flow("conditional-branches") as flow: cond = check_condition() with case(cond, True): task_true(val) with case(cond, False): task_false(val)
Prefect 2.0
if elseを使って、より通常のPythonコードに近い記述方法で条件分岐を書けるようになりました。
from prefect import task, Flow, case @task def check_condition(): return random() < 0.5 flow(name="conditional-flow") def my_flow() cond = check_condition() if cond: task_true() else: task_false()
変更があった機能
flowの定義方法
Prefect 1.0
コンテクストマネージャーを使用し、DAG(有効循環グラフ)を作成しFlowを定義していました。
from prefect import Flow with Flow("flow name") as flow:
Prefect 2.0
DAGを作成しFlowを構築する方法が廃止されました。
モダンで動的なデータフローを構築するのに、DAGを用いるのは過度な制約をもたらすと考えられているためです。
通常のPython関数にflowデコレータを加えることで、PrefectがFlowとして認識することができるようになりました。
from prefect import flow @flow(name="flow name") def my_flow_function():
taskの実行設定
タスクを逐次実行・並列実行・分散実行を制御する方法が変わりました。
prefect 1.0
Executorを用いた制御。
from prefect.executors import LocalExecutor with Flow("flow name", executor=LocalExecutor()) as flow:
Prefect 2.0
TaskRunnerを用いた制御。
from prefect.task_runners import SequentialTaskRunner @flow(task_runner=SequentialTaskRunner()) def my_flow_function():
flowの実行設定
Flowの実行環境(Docker, ECS Task, Kubernetes Job, etc..)・管理環境(GitHub, S3, GCS, Docker)を指定する方法が変わりました。
Prefect 1.0
Flowに対して run_config
を与えて、実行環境の設定をFlowオブジェクト自身に持たせていました。
from prefect.run_configs import DockerRun from prefect.storage import S3 from prefect import Flow with Flow( "flow_name", run_config=DockerRun(image="prefecthq/prefect:1.0"), storage=S3(bucket="bucket_name"), ) as flow: ...
Prefect 2.0
Flowの実行設定は、Flowオブジェクトとは分離された Deployment
で管理することになりました。
Flowとは切り離されたため、実行環境を柔軟に変更することができるようになりました。
prefect deployment build
コマンドを実行することで、Flowの実行設定である deployment.yaml
が生成されます。buildコマンドの引数にFlowの管理場所、実行環境を指定します。
prefect deployment build flows/my_flow.py:flow_name --name docker-custom --tag dev -sb s3/dev -ib docker-container/docker-custom-image
prefect deployment apply
コマンドでPrefect UIにFlowを登録します。
prefect deployment apply deployment.yaml
実行スケジュール設定
Prefect 1.0
Flowの実行スケジュールをFlowオブジェクト自身に持たせていました。
from prefect import Flow from prefect.schedules import IntervalSchedule schedule = IntervalSchedule(interval=timedelta(minutes=15)) with Flow("scheduled_flow", schedule=schedule) as flow:
Prefect 2.0
Deployment
で実行スケジュールを管理することになり、Flowオブジェクトから分離されました。
prefect deployment build schedule_flow.py:my_flow --cron "0 0 * * *"
ロギング
Flow実行時のロギングの仕様が変わりました。
Prefect 1.0
Task内でのみロギングが可能。loggerはprefectのcontextから取得していました。
import prefect @task def my_task(): logger = prefect.context.get("logger") logger.info("An info message.") logger.warning("A warning message.")
Prefect 2.0
Taskだけでなく、Flow内でもロギングが可能。loggerの取得方法が変わりました。
from prefect import flow, task, get_run_logger @task def my_task(): logger = get_run_logger() logger.info("Hello from task") @flow def flow_with_logging(): logger = get_run_logger() logger.info("Hello from flow")
ETLワークフローを組んでみる
S3から生データをダウンロードし、集計データをS3にアップロードするETLワークフローをPrefectで組んでみます。
データセットには、ポケモンの個体情報データセットを使用し、属性タイプごとの統計量を集計します。
The Complete Pokemon Dataset | Kaggle
今回組んだETLのコードはGitHubにまとめています。 github.com
install
Prefect 2.0を使う場合、Python 3.7以上の環境を用意します。
Installation - Prefect 2 - Coordinating the world's dataflows
pipでprefectをインストールすると、自動的にPrefect 2.0がインストールされます。
pip install -U prefect
Flowの作成
3つのタスクから構成されるETLワークフローを実装します。
- extract: S3から生データをダウンロード
- transform: 生データを受け取り、属性タイプの統計量を集計
- extract: 集計データをS3にアップロード
通常のPythonコードを書く感覚でFlowを記述できます。
from prefect import flow, task, get_run_logger import pandas as pd from prefect_aws import AwsCredentials # AWS Credentials blockからAWS認証情報を使用してs3 clientを作成 aws_credentials_block = AwsCredentials.load("default") s3_client = aws_credentials_block.get_boto3_session().client("s3") S3_BUCKET = "prefect-etl-tutorial" @task(name="extract task") def extract(file_name: str) -> pd.DataFrame: """ Download an object from S3 bucket. Args: file_name: Name of bucket download file name. Returns: download file with dataframe format. """ logger = get_run_logger() logger.info(f"download {file_name} from s3") s3_client.download_file(S3_BUCKET, file_name, file_name) df = pd.read_csv(file_name) return df @task(name="transform task") def transform(df: pd.DataFrame) -> pd.DataFrame: """ Transform extract dataframe. Args: df: dataframe downloaded from s3 Returns: transformed dataset """ logger = get_run_logger() logger.info("transform extract dataset") df_transformed = df.groupby(["type1"]).mean().reset_index() return df_transformed @task(name="load task") def load(df: pd.DataFrame, file_name: str) -> None: """ Upload an object to S3 bucket. Args: df: upload dataframe object. file_name: Name of bucket download file name. """ logger = get_run_logger() logger.info(f"upload {file_name} to s3") df.to_csv(file_name) s3_client.upload_file(file_name, S3_BUCKET, file_name) @flow(name="ETL Flow") def etl_flow(download_file_name: str, upload_file_name: str): logger = get_run_logger() logger.info("ETL flow start") extracted_df = extract(file_name=download_file_name) transformed_df = transform(extracted_df) load(transformed_df, file_name=upload_file_name) logger.info("ETL flow finished") if __name__ == "__main__": etl_flow(download_file_name="pokemon.csv", upload_file_name="pokemon_transformed.csv")
上記のコードを実行すると、コンソールにログが出力されます。
Taskの状態をPrefectが追跡できています。
(prefect2) etl $ python etl_flow.py 23:15:59.581 | INFO | prefect.engine - Created flow run 'stalwart-zebu' for flow 'ETL Flow' 23:16:01.691 | INFO | Flow run 'stalwart-zebu' - ETL flow start 23:16:01.947 | INFO | Flow run 'stalwart-zebu' - Created task run 'extract task-0' for task 'extract task' 23:16:01.948 | INFO | Flow run 'stalwart-zebu' - Executing 'extract task-0' immediately... 23:16:02.682 | INFO | Task run 'extract task-0' - download prefect-etl-tutorial/pokemon.csv from s3 23:16:03.194 | INFO | Task run 'extract task-0' - Finished in state Completed() 23:16:03.727 | INFO | Flow run 'stalwart-zebu' - Created task run 'transform task-0' for task 'transform task' 23:16:03.728 | INFO | Flow run 'stalwart-zebu' - Executing 'transform task-0' immediately... 23:16:04.424 | INFO | Task run 'transform task-0' - transform extract dataset 23:16:04.704 | INFO | Task run 'transform task-0' - Finished in state Completed() 23:16:05.043 | INFO | Flow run 'stalwart-zebu' - Created task run 'load task-0' for task 'load task' 23:16:05.044 | INFO | Flow run 'stalwart-zebu' - Executing 'load task-0' immediately... 23:16:05.757 | INFO | Task run 'load task-0' - upload prefect-etl-tutorial/pokemon_transformed.csv to s3 23:16:06.422 | INFO | Task run 'load task-0' - Finished in state Completed() 23:16:06.427 | INFO | Flow run 'stalwart-zebu' - ETL flow finished 23:16:06.722 | INFO | Flow run 'stalwart-zebu' - Finished in state Completed('All states completed.')
Prefect Cloudでワークフローを実行する
上記のFlowをPrefect Cloudに登録し、UIからFlowを実行してみます。
FlowをPrefect Cloudに登録するには、Storage(Flowの管理場所)・Infrastracture(Flowの実行環境)を設定したDeploymentを作成します。
今回は次の構成でDeploymentを作成します。
- Storage: S3
- Infrastracture: Docker Container
Prefect Cloudのアカウントを作成し、ターミナルからログインを行います。
$ prefect cloud login
Blockの作成
Prefet 2.0では外部システムとの連携するためにの設定を Block
という機能で管理します。
Storage and Infrastructure - Prefect 2 - Coordinating the world's dataflows
今回はS3・Docker ContainerのBlockを作成します。
Blockの作成により、FlowのコードをS3にアップロードし、ローカルPC上でDocker ContainerとしてFlowを実行できるようになります。
S3 Blockは、バケット名・認証情報を設定します。
Docker Container Blockは、追加でインストールするライブラリを記述します。今回はpandasと、S3とのやりとりを行うために s3fs, prefect-awsを追加インストールします。
Deploymentの登録
作成したBlockを指定して、Flowの実行設定を記述したDeploymentを作成します。
prefect deployment build ./etl_flow.py:etl_flow -n etl-flow -sb s3/s3-block -ib docker-container/docker-block -q dev -o deployment.yaml
コマンドの要素の役割は以下のようになります
prefect deployment build
: deploymentを作成するCLIコマンド./etl_flow.py:etl_flow
: Flowを記述したPythonファイルと、Flow関数名を指定-n etl-flow
: deploymentの名前-sb s3/s3-block
: Storageの指定. 作成したS3 Blockを利用します-ib docker-container/docker-block
: Infrastructureの指定. 作成したDocker Container Blockを利用します。-q dev
:ワークキューの指定-o deployment.yaml
: 作成するdeploymentのファイル名
作成したDeploymentをPrefect Cloudに登録します
prefect deployment apply deployment.yaml
実行
ローカルPC上にAgentを起動させておきます。
prefect agent start -q dev
Prefect Cloud画面のDeploymentページから登録したETL Flowを実行します。
Flowの実行画面には、出力ログや実行タスクの状態が出力されています。
Prefet 1.0の時に比べて、情報量が減りスッキリしましたね。
まとめ
Prefect 2.0でETLのFlowを組んで、Prefect Cloudから実行するところまでやってみました。
基本的な要素は1.0の時と変わりませんが、parameterや条件分技などの書き方が代わり、よりネイティブのPythonコードらしい書き方でFlowを構築できるようになりました。
参考
- Welcome to Prefect 2 - Prefect 2 - Coordinating the world's dataflows
- (Re)Introducing Prefect: The Global Coordination Plane - Prefect
- Orchestrate Your Data Science Project with Prefect 2.0 | by Khuyen Tran | The Prefect Blog | Medium
- Reasons to Transition from Prefect 1.0 to 2.0 | by Madison Schott | Medium
- Serverless Real-Time Data Pipelines on AWS with Prefect, ECS and GitHub Actions | by Anna Geller | The Prefect Blog | Medium