肉球でキーボード

MLエンジニアの技術ブログです

データエンジニアリングの基礎を読みました


データエンジニアリングの基礎」を読んだので、感想・各章の内容についてまとめます

www.oreilly.co.jp

全体を通しての感想

原本は Fundamentals of Data Engineering で本書は日本語訳となります。
筆者のJoe Reis氏とMatt Housley氏はデータエンジニアリングのコンサルタントを行っていて、業界経験が長いお二人です。
データエンジニア界隈は急速に変化する業界と本文中で書かれています。
業界変化の中で「変わらないもの」を選択し、今後数年間役に立つコンセプトをまとめたものと本書を説明しています。

上記の狙い通り、本書はツールや特定技術ソリューションの話題は避け、データエンジニアリングの背後にある普遍的な技術概念の説明に徹しています。
SQL実行の内部の処理や、磁気ディスクドライブの物理挙動にまで踏み込んでいて、データエンジニアリングの基礎というタイトルにふさわしい内容でした。
データエンジニアリング領域に関わらず、エンジニアリング一般論として興味深いテーマについてベテランエンジニアの見解を知れる贅沢な内容でした。

日本語訳ですが、英語原本での独特の言い回しを日本語でも伝わるよう丁寧に訳されてるのが伝わってきて、非常に読みやすかったです。
今後、データエンジニアの必読書となるだろう一冊でした。

Ⅰ部データエンジニアリングの基礎と構成要素

1章 データエンジニアリング概説

1.1 データエンジニアリングとは何か
    1.1.1 データエンジニアリングの定義
    1.1.2 データエンジニアリングライフサイクル
    1.1.3 データエンジニアの発展
    1.1.4 データエンジニアリングとデータサイエンス
1.2 データエンジニアリングのスキルと活動
    1.2.1 データ成熟度とデータエンジニア
    1.2.2 データエンジニアに求められる背景知識とスキル
    1.2.3 ビジネス上の責務
    1.2.4 技術的責務
    1.2.5 データエンジニアの役割のスペクトラム:タイプAからタイプBまで
1.3 組織内でのデータエンジニアリング
    1.3.1 内向きデータエンジニアと外向きデータエンジニア
    1.3.2 データエンジニアと他の技術職
    1.3.3 データエンジニアとビジネスリーダーシップ
1.4 結論
1.5 参考資料

データエンジニアリングとデータエンジニアとは?について整理しています。データエンジニアリングの歴史と、データエンジニアと類似職種との関係性の話は個人的に面白かったです。

組織におけるデータエンジニアの非技術的な役割が書かれています。多くの利害関係者をつなぐ役割が求められると述べられていて、非技術部分の業務のウェイトが大きい印象を受けました。

2章 データエンジニアリングライフサイクル

2.1 データエンジニアリングライフサイクルとは何か?
    2.1.1 データライフサイクルとデータエンジニアリングライフサイクル
    2.1.2 生成:ソースシステム
    2.1.3 保存(ストレージ)
    2.1.4 取り込み
    2.1.5 変換
    2.1.6 データの提供
2.2 データエンジニアリングにおける主要な底流
    2.2.1 セキュリティ
    2.2.2 データ管理
    2.2.3 DataOps
    2.2.4 データアーキテクチャ
    2.2.5 オーケストレーション
    2.2.6 ソフトウェアエンジニアリング
2.3 結論
2.4 参考資料

本書ではデータをプロダクト価値に変えていく一連の流れをデータエンジニアリングライフサイクルという言葉で表現しています。

データエンジニアリングライフサイクルは、データの生成・保存・取り込み・変換・提供で構成されます。これらの土台となる要素がセキュリティ・データ管理・DataOps・データアーキテクチャオーケストレーション・ソフトウェアエンジニアとしています。

データエンジニアリングの技術範囲が自分は曖昧だったので、体系的に整理してる内容は貴重でした。データ運用のためのOpsに着目したDataOpsという概念を初めて知りました。

3章 適切なデータアーキテクチャの設計

3.1 データアーキテクチャとは何か?
    3.1.1 エンタープライズアーキテクチャとは何か?
    3.1.2 データアーキテクチャの定義
    3.1.3 「良い」データアーキテクチャ
3.2 良いデータアーキテクチャの原則
    原則1:共通コンポーネントを賢く選択する
    原則2:障害に備える
    原則3:スケーラビリティ設計
    原則4:アーキテクチャはリーダーシップだ
    原則5:常に設計し続ける
    原則6:疎結合システムを構築する
    原則7:可逆な決定をする
    原則8:セキュリティを優先する
    原則9:FinOpsを活用する
3.3 主要なアーキテクチャの概念
    3.3.1 ドメインとサービス
    3.3.2 分散システム、スケーラビリティ、障害に備えた設計
    3.3.3 密結合と疎結合:ティア、モノリス、マイクロサービス
    3.3.4 ユーザアクセス:シングルテナントとマルチテナント
    3.3.5 イベント駆動アーキテクチャ
    3.3.6 ブラウンフィールドプロジェクトとグリーンフィールドプロジェクト
3.4 データアーキテクチャの例と種類
    3.4.1 データウェアハウス
    3.4.2 データレイク
    3.4.3 次世代データレイクとデータプラットフォームの収斂
    3.4.4 モダンデータスタック
    3.4.5 Lambdaアーキテクチャ
    3.4.6 Kappaアーキテクチャ
    3.4.7 Dataflowモデル、バッチ、ストリームの統合
    3.4.8 IoTのためのアーキテクチャ
    3.4.9 データメッシュ
    3.4.10 その他のデータアーキテクチャ
3.5 データアーキテクチャの設計にかかわるのは誰か
3.6 結論
3.7 参考資料

本書ではデータアーキテクチャを「データアーキテクチャは、企業の進化するデータへの要求をサポートするシステム設計であり、トレードオフを慎重に評価した上での柔軟で可逆な決定を通じて実現される」と定義してます。

データアーキテクチャの例としてデータウェアハウス、データレイクを取り上げています。

アーキテクチャが抽象度が高い概念で、様々な定義がされてる言葉なので、世の中のアーキテクチャに対する共通見解を丁寧に整理してくれてます。

4章 データエンジニアリングライフサイクルにおけるテクノロジの選択

4.1 チームのサイズと容量
4.2 市場投入までのスピード
4.3 相互運用性
4.4 コスト最適化とビジネス価値
    4.4.1 総所有コスト(TCO)
    4.4.2 所有の総機会費用
    4.4.3 FinOps
4.5 現在vs.未来:不変テクノロジvs.一過性テクノロジ
    4.5.1 アドバイス
4.6 設置場所
    4.6.1 オンプレミス
    4.6.2 クラウド
    4.6.3 ハイブリッドクラウド
    4.6.4 マルチクラウド
    4.6.5 非中央集権型計算:ブロックチェーンとエッジ
    4.6.6 アドバイス
    4.6.7 クラウドからオンプレミスへの本国回帰
4.7 構築vs.購入
    4.7.1 オープンソースソフトウェア
    4.7.2 プロプライエタリなウォールドガーデン
    4.7.3 アドバイス
4.8 モノリスvs.モジュール
    4.8.1 モノリス
    4.8.2 モジュール性
    4.8.3 分散モノリスパターン
    4.8.4 アドバイス
4.9 サーバレスvs.サーバ
    4.9.1 サーバレス
    4.9.2 コンテナ
    4.9.3 サーバとサーバレスの評価方法
    4.9.4 アドバイス
4.10 最適化、性能、ベンチマーク戦争
    4.10.1 1990年代の「ビッグデータ」
    4.10.2 無意味なコスト比較
    4.10.3 非対称な最適化
    4.10.4 購入者責任
4.11 底流とテクノロジ選択への影響
    4.11.1 データ管理
    4.11.2 DataOps
    4.11.3 データ管理
    4.11.4 オーケストレーションの例:Airflow
    4.11.5 ソフトウェアエンジニアリング
4.12 結論
4.13 参考資料

前章のデータアーキテクチャの設計ができたのち、どのテクノロジーで実現するかの章になります。

オンプレvsクラウドや、モノリスvsモジュールといった、データエンジニアにとらわれず一般的にエンジニア界隈で議論に上がるテーマを取り扱っています。

これらのテーマに対するシニアエンジニアからの見解やアドバイスが書かれていて、この章を読むためだけに本書を買う価値があると思えるほどでした。

Ⅱ部 データエンジニアリングライフサイクルの詳細

5章 ソースシステムにおけるデータ生成

5.1 データソース:データはどのように生成されるのか?
5.2 ソースシステム:主要な概念
    5.2.1 ファイルと非構造化データ
    5.2.2 API
    5.2.3 アプリケーションデータベース(OLTPシステム)
    5.2.4 OLAP:オンラインアナリティクス処理システム
    5.2.5 変更データキャプチャ
    5.2.6 ログ
    5.2.7 データベースログ
    5.2.8 CRUD
    5.2.9 インサートオンリー
    5.2.10 メッセージとストリーム
    5.2.11 時間と時刻の種類
5.3 ソースシステムの実践的な詳細
    5.3.1 データベース
    5.3.2 API
    5.3.3 データ共有
    5.3.4 サードパーティデータソース
    5.3.5 メッセージキューとイベントストリーミングパイプライン
5.4 一緒に仕事する人
5.5 底流とそのソースシステムへの影響
    5.5.1 セキュリティ
    5.5.2 データ管理
    5.5.3 DataOps
    5.5.4 データアーキテクチャ
    5.5.5 オーケストレーション
    5.5.6 ソフトウェアエンジニアリング
5.6 結論
5.7 参考資料

データ生成に着目した章です。

データ形式ごとの各データベースの特徴が網羅されていて、贅沢な内容になっています。

ストリームデータの取り扱いについても取り上げられているのはありがたいです。

データ生成元はデータエンジニアの制御の外にあることが多いですが、データエンジニアライフサイクルにおいて重要な部分と指摘してます。

そのため、データエンジニアがアプリケーションチームとうまくコラボレーションできるよう、利害関係を構築する話がなされています。

6章 ストレージへの保存

6.1 データストレージの原材料
    6.1.1 磁気ディスクドライブ
    6.1.2 SSD(ソリッドステートドライブ)
    6.1.3 RAM(ランダムアクセスメモリ)
    6.1.4 ネットワークとCPU
    6.1.5 シリアライズ
    6.1.6 圧縮
    6.1.7 キャッシュ
6.2 データストレージシステム
    6.2.1 単体サーバvs.分散ストレージ
    6.2.2 結果整合性と強い一貫性
    6.2.3 ファイルストレージ
    6.2.4 ブロックストレージ
    6.2.5 オブジェクトストレージ
    6.2.6 キャッシュとメモリベースのストレージシステム
    6.2.7 HDFS(Hadoop分散ファイルシステム)
    6.2.8 ストリーミングストレージ
    6.2.9 インデックス、パーティション分割、クラスタリング
6.3 データエンジニアリングにおけるストレージ抽象
    6.3.1 データウェアハウス
    6.3.2 データレイク
    6.3.3 データレイクハウス
    6.3.4 データプラットフォーム
    6.3.5 ストリーム・トゥ・バッチストレージアーキテクチャ
6.4 ストレージの要点とトレンド
    6.4.1 データカタログ
    6.4.2 データ共有
    6.4.3 スキーマ
    6.4.4 コンピュートとストレージの分離
    6.4.5 データストレージのライフサイクルとデータ保持
    6.4.6 シングルテナントvs.マルチテナント
6.5 一緒に仕事する人
6.6 底流
    6.6.1 セキュリティ
    6.6.2 データ管理
    6.6.3 DataOps
    6.6.4 データアーキテクチャ
    6.6.5 オーケストレーション
    6.6.6 ソフトウェアエンジニアリング
6.7 結論
6.8 参考資料

データの保存に着目した章です。

ストレージがあまりにも一般的で当たり前のものと思いがちなため、ストレージメディアに内在するトレードオフを知らないデータエンジニアが多いと指摘しています。

そのため、磁気ディスクドライブの物理的な挙動にまで踏み込んでいて、ここまで解説するのかと筆者らへ尊敬の念を抱きました。

理解が曖昧だったデータレイクハウスについて具体的な説明があったのはありがたかったです。

7章 データ取り込み

7.1 データ取り込みとは
7.2 取り込みフェーズにおけるエンジニアリング上の重要な検討事項
    7.2.1 区切りありデータvs.区切りなしデータ
    7.2.2 頻度
    7.2.3 同期vs.非同期
    7.2.4 シリアライズとデシリアライズ
    7.2.5 スループットとスケーラビリティ
    7.2.6 信頼性と耐久性
    7.2.7 ペイロード
    7.2.8 プッシュvs.プルvs.ポーリング
7.3 バッチ取り込みに関する検討事項
    7.3.1 スナップショットまたは差分抽出
    7.3.2 ファイルのエクスポートと取り込み
    7.3.3 ETL vs. ELT
    7.3.4 挿入、更新とバッチサイズ
    7.3.5 データの移行
7.4 メッセージ取り込みとストリーム取り込みの検討事項
    7.4.1 スキーマ進化
    7.4.2 遅延到着データ
    7.4.3 順序と多重配送
    7.4.4 リプレイ
    7.4.5 TTL(Time to Live)
    7.4.6 メッセージサイズ
    7.4.7 エラー処理とデッドレターキュー
    7.4.8 消費者によるプルとプッシュ
    7.4.9 場所
7.5 データ取り込みの方法
    7.5.1 直接データベース接続
    7.5.2 CDC:変更データキャプチャ
    7.5.3 API
    7.5.4  メッセージキューおよびイベントストリーミングプラットフォーム
    7.5.5 マネージドデータコネクタ
    7.5.6 オブジェクトストレージを用いたデータの移動
    7.5.7 EDI
    7.5.8 データベースとファイルのエクスポート
    7.5.9 一般的なファイルフォーマットに関する現実的な問題
    7.5.10 シェル
    7.5.11 SSH
    7.5.12 SFTPとSCP
    7.5.13 Webhook
    7.5.14 Webインタフェース
    7.5.15 Webスクレイピング
    7.5.16 データ移行のための転送アプライアンス
    7.5.17 データ共有
7.6 一緒に仕事する人
    7.6.1 上流の利害関係者
    7.6.2 下流の利害関係者
7.7 底流
    7.7.1 セキュリティ
    7.7.2 データ管理
    7.7.3 DataOps
    7.7.4 オーケストレーション
    7.7.5 ソフトウェアエンジニアリング
7.8 結論
7.9 参考資料

前章のデータ生成とストレージを結びつける、データ取り込みについての章です。

バッチやストリーム取り込みのパターンや、データ取り込みに関する技術テーマを幅広く取り扱っています。

データ品質テストについても触れられています。データソースに暗黙的な変化があることで、データ利用者が気付かぬうちに被害を受ける例をあげています。

データスキーマやニーズは変化するもので、変化する前提でデータアーキテクチャを設計するのが本書の基本姿勢です。

8章 クエリ、データモデリング、変換

8.1 クエリ
    8.1.1 クエリとは何か?
    8.1.2 クエリのライフサイクル
    8.1.3 クエリオプティマイザ
    8.1.4 クエリ性能の向上
    8.1.5 ストリームデータに対するクエリ
8.2 データモデリング
    8.2.1 データモデルとは何か?
    8.2.2 概念データモデル、論理データモデル、物理データモデル
    8.2.3 正規化
    8.2.4 バッチアナリティクスデータのモデリング手法
    8.2.5 ストリームデータモデリング
8.3 変換
    8.3.1 バッチ変換
    8.3.2 マテリアライズドビュー、フェデレーテッドクエリ、データ仮想化
    8.3.3 ストリーミング変換と処理
8.4 一緒に仕事する人
    8.4.1 上流の利害関係者
    8.4.2 下流の利害関係者
8.5 底流
    8.5.1 セキュリティ
    8.5.2 データ管理
    8.5.3 DataOps
    8.5.4 データアーキテクチャ
    8.5.5 オーケストレーション
    8.5.6 ソフトウェアエンジニアリング
8.6 結論
8.7 参考資料

データ変換に着目した章です。

SQLが内部でどのような処理が実行されているかを説明しています。クエリ性能向上のための方法が理屈と共に紹介されているのがありがたいです。

データモデリングの手法としてKimball, Inmon, データボルトを取り上げています。

UDFを使用する危うさを指摘してます。コードがバージョン管理から外れ、組み込みのSQLコマンドより性能が悪化する可能性があると述べられています。

UDFは便利でつい使ってしまいがちだったので反省しました。

9章 アナリティクス、機械学習、リバースETL へのデータの提供

9.1 データ提供に関する一般的な考慮事項
    9.1.1 信頼
    9.1.2 ユースケースは何か? ユーザは誰か?
    9.1.3 データプロダクト
    9.1.4 セルフサービスにするべきか?
    9.1.5 データ定義とロジック
    9.1.6 データメッシュ
9.2 アナリティクス
    9.2.1 ビジネスアナリティクス
    9.2.2 オペレーショナルアナリティクス
    9.2.3 組み込みアナリティクス
9.3 機械学習
9.4 データエンジニアがMLについて知っておくべきこと
9.5 アナリティクスやMLに対してデータを提供する方法
    9.5.1 ファイル交換
    9.5.2 データベース
    9.5.3 ストリーミングシステム
    9.5.4 クエリフェデレーション
    9.5.5 データ共有
    9.5.6 セマンティックレイヤとメトリクスレイヤ
    9.5.7 ノートブックによるデータの提供
9.6 リバースETL
9.7 一緒に仕事する人
9.8 底流
    9.8.1 セキュリティ
    9.8.2 データ管理
    9.8.3 DataOps
    9.8.4 データアーキテクチャ
    9.8.5 オーケストレーション
    9.8.6 ソフトウェアエンジニアリング
9.9 結論
9.10 参考資料

データの提供に着目した章です。

データのユースケースとユーザーから考え、データプロジェクトを始める重要性を説いています。ユースケースとして分析やMLをあげ、ユーザーにアナリストやDS、MLエンジニア、そしてビジネス職を取り上げています。

ユーザーが自分でデータプロダクトを構築するセルフサービスにすべきかの議論が興味深かったです。

エンドユーザーを理解してないと、セルフサービス化は難しいと指摘しています。

Ⅲ部 セキュリティとプライバシー、およびデータエンジニアリングの未来

10章 セキュリティとプライバシー

10.1 人材
    10.1.1 ネガティブ思考の力
    10.1.2 常に心配性でいる
10.2 プロセス
    10.2.1 劇場型セキュリティvs.習慣としてのセキュリティ
    10.2.2 アクティブセキュリティ
    10.2.3 最小権限の原則
    10.2.4 クラウドでの責任共有
    10.2.5 常にデータのバックアップを取る
    10.2.6 セキュリティポリシーの例
10.3 テクノロジ
    10.3.1 パッチとシステムアップデート
    10.3.2 暗号化
    10.3.3 ロギング、監視、アラート
    10.3.4 ネットワークアクセス
    10.3.5 低レイヤデータエンジニアリングにおけるセキュリティ
10.4 結論
10.5 参考資料

セキュリティについてデータエンジニアリングライフサイクルの全てのステージで最初に考える必要があると指摘しています。

データエンジニア領域に関わるセキュリティ項目の各概要を取り上げています。

顧客データの漏洩ニュースをよく見かけますが、データエンジニアを務める以上は対岸の火事では済まされないなと感じる内容でした。

11章 データエンジニアリングの未来

11.1 データエンジニアリングライフサイクルは消えない
11.2 複雑さの衰退と使いやすいデータツールの興隆
11.3 クラウドスケールデータOSと相互運用性の改善
11.4 「大企業的」データエンジニアリング
11.5 職種名と担当範囲は変化する
11.6  モダンデータスタックからの脱却とライブデータスタックへの移行
    11.6.1 ライブデータスタック
    11.6.2  ストリーミングパイプラインとリアルタイムアナリティクスデータベース
    11.6.3 データとアプリケーションの融合
    11.6.4 アプリケーションとML間での緊密なフィードバック
    11.6.5 ダークマターデータとスプレッドシートの興隆?
11.7 結論

データエンジニアの今後の未来予想の章です。

データエンジニアリングライフサイクルはすぐに消えることはないが、ソフトウェアエンジニア、データエンジニア、データサイエンティスト、MLエンジニアの境界がより曖昧になっていくと指摘しています。

データ界隈の職種で生きるなら、技術領域は広めに持っておいた方がいいと自分は考えていたので、共感できる部分が多かったです。

ワークフローオーケストレーションの歴史

Data Engineering Study #23 Data orchestration 特集の発表「ワークフローオーケストレーション入門」から、ワークフローオーケストレーションの歴史について記事にまとめました。

概要

近年データエンジニアリングの周辺技術が話題に上がるようになり、ワークフローオーケストレーションが注目を集めています。

workflow orchestration関連語のGoogle Trend

上図はワークフローオーケストレーションの関連ワードのGoogle Trendです。

データオーケストレーションを中心として、ワークフローオーケストレーション関連語の検索数が上昇傾向にあります。

本記事では最新のワークフローオーケストレーションの動向を知るために、ワークフローオーケストレーションの歴史を深ぼります。

Prefect社のブログ記事 A Brief History of Workflow Orchestration をもとに、ワークフローオーケストレーションの歴史を5つの時代に区分しました。

  • CRON時代
  • リレーショナルデータベース時代
  • データウェアハウス・データ統合時代
  • ビッグデータ時代
  • モダンデータスタック時代

各時代の主要なツールをまとめると以下のように変遷してきています。

ワークフローオーケストレーションツールの変遷

CRON時代

ワークフローオーケストレーションの歴史はCRONから始まりました。

1974年にUNIXにCRONが導入されました。

機能は指定された時刻にコマンドを実行するというものでした。

以下はCRONコマンドを用いて、一連の処理を記述したシェルスクリプトを定期実行する例です。

0 0 0 1 1 * ./workflow.sh

CRONは特定の処理をスケジュール実行したい場合の手段として使い勝手が良く、現在も幅広く使用されている機能です。

リレーショナルデータベース時代

1979年に商用リレーショナルデータベースOracle v2がリリースされました。

その後、1995年にOracleジョブキュー(DBMS_JOB)を導入します。

機能はデータベース用コードの定期的な実行をスケジュールするものでした。

以下は新しくDBMS_JOBジョブを作成するPL/SQLコード例です。

PL/SQL(Procedural Language/Structured Query Language)はOracleデータベースで使用されるプログラミング言語です。

DECLARE
  job_number NUMBER;
BEGIN
  DBMS_JOB.SUBMIT(
    job       => job_number,
    what      => 'BEGIN YOUR_PROCEDURE_NAME; END;',
    next_date => SYSDATE,
    interval  => 'SYSDATE + 1'
  );
  COMMIT;
END;

DBMS_JOBのようにリレーショナルデータベースの機能の一部として、ジョブのスケジュール実行を行えるようになりました。

データウェアハウス・データ統合時代

データウェアハウスが登場し、データ統合が行われる以前、複数のデータソースごとデータを処理していました。

例えば、アプリケーションデータはCRONで処理し、リレーショナルデータベースは付属機能のジョブを実行するといった状況です。

データウェアハウス・データ統合以前の状況

複数のデータソースからデータを収集・変換し、データウェアハウスに格納するETL処理が主流となってきました。

データウェアハウス・データ統合

データをソースからターゲットに変換する機能を提供するツールが登場します。

代表的なのは1998年にInformatica社がリリースしたPowerCenterです。

PowerCenterはスケジュールされたジョブの実行・管理を中心としたツールでした。

データ処理のソースとターゲット、2つを繋ぐワークフローの概念が初めて導入されました。

以下はPowerCenterのデータマッピングの設定画面の例です。

PowerCenter

ビッグデータ時代

2006年にGoogleHadoopOSS化し、2011年にデータレイクが提唱・流行します。

Hadoopにより分散処理が可能となったことで、大量のデータをデータレイクに格納できるようになりました。

格納したデータを活用方法に応じて処理するELT処理が主流となります。

データ基盤構築はオンプレ環境のHadoopエコシステムで作られるケースが増えてきました。

Hadoopエコシステムのデータ基盤

Hadoopエコシステムを活用するためのワークフローオーケストレーションツールが登場します。

これらのツールはワークフローツール第一世代と呼ばれることが多いです。

代表的なツールだとOozie, Luigi, Azkabanがあります。いずれのツールもHadoopジョブを管理する機能がGitHubのREADMEに記述されています。

現在も使用例が多いAirflowはワークフロー管理のプラットフォームの立ち位置ですが、時代背景的にはHadoopジョブの管理の課題感から生まれたツールになっています。

ワークフローツール第1世代

モダンデータスタック時代

2011年 BigQuery(GCP)がGA、2013年 Redshift(AWS)がGA, Sparkを事業化したDatabricks社が設立、2014年 SnowflakeがGA、Kafkaを事業化したConfluent社が設立されます。

新しい技術の登場により、データ基盤に変化が起きました

  • オンプレから安価で高速なクラウドにデータ基盤がシフト
  • ストリームデータ処理技術が発達

このような新しいデータ基盤の課題を解決するツールに対し、Modern Data Stackという言葉を当てるようになりました。

Data Stackはデータ基盤を構成する製品群を意味します。Modern Data Stackは従来のData Stackの課題を解決しようとする技術トレンドの総称で、特定のアーキテクチャ・技術・ソリューションを指す言葉ではありません。
データ活用領域のトレンド「Modern Data Stack」に関するホワイトペーパーを公開 | NTTデータグループ - NTT DATA GROUP

Emerging Architectures for Modern Data Infrastructure: 2020 | Andreessen Horowitz の記事でデータ基盤技術の変化がまとめられています。技術トレンドの変化に、ワークフローオーケストレーションが含まれています。

Architecture Shifts in Data Infrastructure (Emerging Architectures for Modern Data Infrastructure: 2020より引用)

Modern Data Stackと呼ばれるような新たなワークフローオーケストレーションツールが登場し始めました。

2016年 DigdagOSS化、StepFunctions(AWS)がGA、2017年 Prefectがリリース、ArgoOSS化、2018年 Dagsterがリリース、2019年 CloudComposer(GCP)がGA

これらはワークフローツール第二世代と呼ばれ、次世代のワークフローツールとして近年注目を集めています。

まとめ

ワークフローオーケストレーションの歴史を5つの時代に区分し、各時代の主要なツールを以下のようにまとめました。

ワークフローオーケストレーションツールの変遷

データ基盤がオンプレからクラウドへ移り変わり、Modern Data Stackと呼ばれる新しいツールを組み合わせてデータ基盤を構築するのが現代の主流となりました。

この技術トレンドの変化からワークフローオーケストレーションの注目が集まってきていると言えます。

参考

入門 考える技術・書く技術を読みました


入門 考える技術・書く技術」の読書メモ
www.diamond.co.jp

全体を通しての感想

バーバラ・ミント氏著のピラミッド原則をまとめた「考える技術・書く技術」に対する、「日本人による日本人のための実践ガイド」とまえがきで述べられています。
全168Pと文量が少ないのと、本書の題材の通り簡潔な文章でまとめられているため、非常に読みやすい内容でした。

自分は業務でデータ分析の結果をまとめたドキュメントを書いています。
同僚のドキュメントを真似て自分なりのルールでドキュメントを作成していました。
質の高い技術文書を書く方法 - As a Futurist...
の記事を読み、文章を書く技術を体系的に学ぼうと思ったのが本書を読み始めた動機です。

本書はビジネス職向けの内容ですが技術文章にも使える汎用的な内容となっていました。
本文が4章のうち3章が考えるプロセスについての内容となっているため、論理的に考える技術がそのまま書く技術に繋がっているのが分かります。

序章. 誤解だらけのライティング

- 誰も教えてくれなかったレポート・ライティング
  - 誤解1 書きたいことを書きなさい
  - 誤解2 起承転結で書きなさい
- グローバル・スタンダードを学ぶ
  - レポートを受ける立場になって読んでみる
  - 考えるプロセスと書くプロセスを分ける

日本の教育がライティングを学ぶ機会がないことを指摘しています。

  • 作文や感想文のような自分の書きたいことを書く教育が行われる
    • 読み手のために書くライティングの基本が欠如してる
  • 起承転結で書く教育から、結論を最後に書く習慣が染み付いてる
    • 結論を冒頭に書くビジネス文書の原則に反する

本文に入る前のウォーミングアップとして、読み手の立場を考えたライティングの流れを紹介しています。
結論が最後に書かれた場合と、結論を冒頭に書き判断根拠を箇条書きで分かりやすく表現した場合を比較しています。
両者の違いは「書くプロセス」ではなく、前段階の「考えるプロセス」にあると指摘しています。
考えるプロセスを行うためにピラミッド原則を取り上げています。
以降の本文でピラミッド原則に則ったライティング方法が書かれています。

1章. 読み手の関心・疑問に向かって書く

- 読み手は何に関心を持ち、どんな疑問を抱くのか
- 読み手の関心を呼び起こすには
- 読み手の疑問を明らかにする「OPQ分析」
- OPQ分析のコツ

ビジネス文書は読み手の疑問に対する答えが伝えるべきメッセージであり、それ以外の関心のないテーマは読まれないと言い切っています。
なので、読み手の理解がビジネス文書を書くための最重要ポイントとしています。

読み手の疑問と伝えるべきメッセージを、問題報告書・販売企画書・セミナーへの案内状・履歴書に添えるレター・クライアントへの提案書を例に書かれています。

読み手を分析する手法としてOPQ分析を取り上げています。

  • Objective (望ましい状況)
    • 読み手が目指している望ましい状況
  • Problem (問題. 現状とObjectiveとのギャップ)
    • 解決すべき問題
  • Question (読み手の疑問)
    • 問題に直面した読み手が、その解決に向けて自然に抱くだろう疑問
  • Answer (答え. 文書の主メッセージ)
    • 読み手の疑問に対する答えが、そのまま文書の主メッセージになる. 疑問(Q)に忠実に答えることが大切
  • レール(トピック)
    • OPQAの内容を同じモノサシで比べる

OPQ分析のコツを3つあげています

  • すべて読み手の視点で表現する
  • 比較のレール(トピック)を外さない
  • 文書の主メッセージはQに直接答える

OPQ分析の練習問題として、売り上げ目標の達成・不良資産の発覚・在庫削減か売り上げ増かを扱う際のOPQAを例示しています。

2章. 考えを形にする

- メッセージの構造を明らかにする
  - 一度に覚えられる数には限界がある
  - メッセージ構造をそのまま文書へ
- グループ化と要約メッセージ
  - メッセージが一般論にならないようにする
- 要約メッセージを文章にするときの「4つの鉄則」
  - 鉄則(1)名詞表現、体言止めは使用禁止とする
  - 鉄則(2)「あいまい言葉」は使用禁止とする
  - 鉄則(3)メッセージはただ1つの文章で表現する
  - 鉄則(4)「しりてが」接続詞は使用禁止とする
- 「So What?」を繰り返す

読み手は内容を理解するために受けとった情報をグループ化し、理解可能な考えの数に収めようとします。そのため書き手が情報をグループ化することで、読み手の負担を減らせると指摘しています。
ピラミッド型に組み立てたメッセージ構造がそのまま文章構造となります。

考えを組み立てるプロセスは2種類の作業があるとし、2つの作業で相互チェックしながら考えを組み立てていきます。

  • グループ化し、要約メッセージを探す
  • メッセージに従って、グループを作る

「グループ化」と「要約メッセージ」がピラミッドの要となる。

要約メッセージ作成の注意点として、一般論にならないようにすると述べられています。
多くの人が全てをカバーできそうな包括的・抽象的な言葉を選んでしまいがちであると指摘しています。
決定的な言い回しを避けて、曖昧さを残した言葉を選ぶ経験が自分にもあったので反省しました。

要約メッセージを作成する際の4つの鉄則が述べられています

  • 名詞表現、体言止めは使用禁止とする
  • 「あいまい言葉」は使用禁止とする
  • メッセージはただ1つの文章で表現する
  • 「しりてが」接続詞は使用禁止とする

1つ目の鉄則の例に

  • 過去5年、ベトナム市場は年率19%で拡大している
  • 過去5年、インドネシア市場は年率18%で拡大している
  • 過去5年、タイ市場は年率18%で拡大している

のメッセージから考えられる要約メッセージの悪い例として「東南アジア市場の推移」という言葉が挙げられていました。
文章ではありませんが、自分がよく行なっている分析データの見出しにも通じるものがあるなと思いました。

4つ目の鉄則の、論理的な関係が明確でない「しりてが」接続詞は、「...し、…」「…であり、…」「…して、…」「…だが、…」「…せず、…」「…なく、…」という言葉です。

3章. ピラミッドを作る

- 帰納法でロジックを展開する
  - 帰納法の仕組み
  - 「同じ種類の考え」を前提とする
  - 帰納法は「つなぎ言葉」でチェックする
  - 結論を先に述べる
- 演繹法でロジックを展開する
  - ビジネスで演繹法を使う際の注意点
  - 演繹法は「前提」をチェックする
- ピラミッド作成のコツ
  - コツ(1) 1つの考えを短く明快に
  - コツ(2) 縦と横の「二次元」を意識する
  - 1対1の関係に要注意
  - 1対1の番外編「イメージによる説得」

ロジックの基本である帰納法と演算法を紹介しています。
帰納法とは複数の特定事象(前提)から、要約(結論)を導くロジック展開です。
帰納法の表現スタイルは「私の言いたいことは、…です。理由は3つあります。第一の理由は…、第二の理由は…、第三の理由は…」となります。

演算法とは絶対に正しいことや、一般的に正しいと判断されること(前提)から、妥当と思われる結論を導くものです。
演算法は一見すると流れがスムーズなので、それが帰ってロジックの誤謬を気付きにくくさせると注意しています。

ピラミッド作成のコツを2つ取り上げています

  • 1つの考えを短く明快に
    • 主メッセージとキーラインを早めに決める
    • ピラミッド内で文章を書こうとしない
  • 縦と横の「二次元」を意識する
    • 縦の関係
      • 上が結論で下が根拠・説明の関係
    • 横の関係
      • 結論を導くロジックの流れ
    • ピラミッドの縦横を無視しない

4章. 文章で表現する

- 文書全体の構造はピラミッドに同じ
  - ケース「X事業投資」
  - 主メッセージの位置
  - 目次のつけ方
- 段落表現のビジネス・スタンダード
  - 段落は「改行+大きめの行間」で
- 文章のわかりやすさは「接続詞」次第
  - ロジカル接続詞
  - 「しりてが」接続詞の使用ルール
  - 曖昧な接続詞は誤訳のモト
- 読み手を引きつける「導入部」
  - OPQ分析を使って導入部を作る
- 「結び」で今後のステップを示唆する

考えるプロセスで作ったピラミッド型のメッセージ構成を崩さず、そのまま構造が見えるような文章で表現するのが書くプロセスでは大切と述べています。
文書表現の基本は以下を挙げています

  • メッセージごとの固まりが一目でわかるようにする
  • 各メッセージ文を固まりの冒頭に配置する
  • ピラミッドのメッセージをそのまま形にする

段落表現の基本は以下を挙げています

  • メッセージごとに段落を作る(1段落1メッセージ)
  • 段落の違い(メッセージの固まり)を明確に表現する
  • 段落のメッセージ文を段落の冒頭に置く(主メッセージ同様、例外的に段落の最後に置くこともある)

ビジネス・ライティングでの段落の作り方を「改行に加えて、通常より大きめの行間を設ける」ことで段落の違いを目立たせると説明しています。
この段落の作り方はあまり意識したことがなかったので、実践してみようと思いました。

文章のわかりやすさは「接続詞」で決まると説明されています。
しりてが接続詞の「and接続詞」は使わず、因果関係を表すロジカル接続詞だけを使うよう推奨しています。

導入部はメッセージを何人もの読み手を対象とする正式な文章やプレゼン資料では不可欠なものと述べています。
導入部はOPQ分析を使って、読み手の関心をひく方法を紹介しています。

終章. メール劇的向上術

- メールが見違えるように変わる「感謝の言葉にPDF」
- 「1日1回ピラミッド」×4カ月

メールの書き方の方法に「感謝の言葉にPDF」という合言葉を使った方法を紹介しています。

  • 感謝の言葉
    • いきなり本文に入らず、簡潔に1, 2行を使って感謝の言葉を述べるようにする
    • 「感謝の言葉」が自分勝手なライティングのブレーキになる
  • P(主メッセージ部分)
    • Purpose Statement (目的文)
  • D(詳細)
    • Detail (詳細)
    • 主メッセージの理由・判断根拠・内容説明・具体案
    • PとDがメール本文になる
  • F(今後のアクション)
    • Follow-Through(フォロースルー)
    • 読み手に求めるアクション・自分のアクションの説明

pyenv-virtualenv initでプロンプト表示速度が低下する問題

問題

~/.zshrcに記述した eval "$(pyenv virtualenv-init -)" によって、zshのプロンプト表示が遅くなってることに気づきました。

原因と解決方法を調べたのでまとめました。

結論

~/.zshrcに記述するのを

eval "$(pyenv virtualenv-init -)"

eval "$(pyenv virtualenv-init - | sed s/precmd/chpwd/g)"

に変更する

pyenv-virtualenv initとは?

GitHub - pyenv/pyenv-virtualenv: a pyenv plugin to manage virtualenv (a.k.a. python-virtualenv).

カレントディレクトリにある.python-versionの設定に基づいて、python仮想環境を自動的にactivate/deactivateします。

.python-version ファイルは、pyenv でローカルのPython バージョンを設定するために、pyenv local コマンドで作成・削除できます。

原因

展開されるシェルスクリプトの内容を確認します。

$ echo $(pyenv virtualenv-init -)

以下のシェルスクリプトが実行されています。処理内容を見ていきます。

export PATH="/Users/satsuki/.pyenv/plugins/pyenv-virtualenv/shims:${PATH}"
export PYENV_VIRTUALENV_INIT=1

_pyenv_virtualenv_hook() {
    local ret=$?
    if [ -n "${VIRTUAL_ENV-}" ]; then
        eval "$(pyenv sh-activate --quiet || pyenv sh-deactivate --quiet || true)" || true
    else
        eval "$(pyenv sh-activate --quiet || true)" || true
    fi
    return $ret
}

typeset -g -a precmd_functions

if [[ -z $precmd_functions[(r)_pyenv_virtualenv_hook] ]]; then
    precmd_functions=(_pyenv_virtualenv_hook $precmd_functions)
fi

_pyenv_virtualenv_hook という関数が定義されています。この関数では以下の処理が実行されます。

  • 仮想環境が存在する場合、環境をactivateし、失敗した場合deactiveを行う
  • 仮想環境が存在しない場合、環境をactivateする

zshでは precmd_functions というグローバルな配列で、プロンプトが変更された時に実行する関数を管理しています。

zsh: 9 Functions

作成した_pyenv_virtualenv_hook 関数を precmd_functions 配列に追加しています。

これにより、プロンプトが変更されるたびにvirtualenvの仮想環境のactivateが実行され、zshの動作が重くなります。

解決方法

pyenv-virtualenvのGitHub Issueでシェル動作が遅くなる現象のDiscussionがあり、回避方法が提案されています。

Slow shell performance after running pyenv virtualenv-init · Issue #259 · pyenv/pyenv-virtualenv · GitHub

eval "$(pyenv virtualenv-init)” の代わりに以下を ~/.zshrcに書き込みます

eval "$(pyenv virtualenv-init - | sed s/precmd/chpwd/g)"

上記のコマンドでは、先ほどのpyenv virtualenv-init で展開されるシェルスクリプト内のprecmdの文字列をchpwdに書き換えています。

chpwd_functions というグローバルな配列で、カレントディレクトリが変更された時に実行する関数を管理しています。

よって、プロンプトを変更する度に仮想環境をactivateしていた処理を、カレントディレクトリを変更する度に実行するよう変更しています。

プロンプト表示速度の変化

zsh-prompt-benchmarkzshのプロンプトレンダリング速度を測ってみました。

修正前

********************************************************************
                      Prompt Benchmark Results
********************************************************************
Warmup duration      8s
Benchmark duration   2.050s
Benchmarked prompts  7
Time per prompt      292.84ms  <-- prompt latency (lower is better)
********************************************************************

修正前

修正後

********************************************************************
                      Prompt Benchmark Results
********************************************************************
Warmup duration      8s
Benchmark duration   2.015s
Benchmarked prompts  78
Time per prompt      25.84ms  <-- prompt latency (lower is better)
********************************************************************

修正後

プロンプト表示速度が大きく改善しました。

参考

ターミナルをiTerm2からAlacrittyに変更した

ターミナルをiTerm2からAlacrittyに変えたので、Alacrittyの設定方法について紹介します。

Alacrittyとは

Rust製のターミナルエミュレーターです。

公式READMEでは以下のように特徴を説明しています。

  • 高速に動作
  • configファイルで柔軟にカスタマイズ可能
  • BDS, Linux, macOS, Windowsのマルチプラトフォームに対応

他のターミナルエミュレーターの機能にある

  • タブ
  • ウィンドウ分割
  • GUIでの設定

といった機能は意図的に実装されていないようです。

タブやウィンドウ分割などの機能は、ウィンドウマネージャーやtmuxに任せる方が適切と作者は指摘していて、ターミナルとして必要最低限の機能に絞っています。

GUIでの設定がないのも、configファイルに管理をまとめられるメリットがあります。

オリジナル特徴

公式READMEにAlacrittyの機能が書かれています。
alacritty/docs/features.md at master · alacritty/alacritty · GitHub

Viモード

ターミナル内をVimのキーバーインドで移動することができます。

デフォルトでは Ctrl + Shift + SpaceVimモードを起動します。

範囲選択をVimと同じ操作で行えます。

検索

ターミナル内の文字を検索できます。

前方検索を Ctrl + Shift + f  (Command + f  on MacOS)

後方検索を Ctrl + Shift + b  (Command + b  on MacOS)

で起動します。

Hints

ターミナル内のハイパーリングやテキストを見つけ、他のアプリケーションに渡すことできます。

Ctrl + Shift + U でターミナル内のURLを見つけ、ブラウザで開くことできます。

選択範囲の拡大

マウス操作で選択範囲を拡張できます。

ダブルクリック: 意味的に一つの言葉を選択

トリプルクリック: 行選択

Ctrl + マウス操作: ブロック選択

クリックでURLを開く

ターミナル内のURLをクリックで開くことができます。

マルチウィンドウ

複数ウィンドウでターミナルを実行することができます。

インストール

複数のインストール方法が提供されていますが、自分はbrewでインストールしました。

$ brew install --cask alacritty

設定

~/.config/alacritty/alacritty.toml でAlacrittyの設定を管理します。

バージョン0.13.0以降、設定ファイルの形式がyamlからtoml形式に変更がありました。

alacritty.toml は自動で作成されないので、自身で作成する必要があります。

設定パラメーターはこちらの公式ドキュメントに記載されています。
Alacritty

各種設定のデフォルト値のalacritty.tomlは以下のようになります。

gist31550242085f9233088be8a49241a7ed

カラーテーマの設定

カラーテーマの設定を公式が用意しているので、importすることですぐに使うことができます。
GitHub - alacritty/alacritty-theme: Collection of Alacritty color schemes

以下のコマンドで ~/.config/alacritty/themes以下にレポジトリをダウンロードしておきます。

$ mkdir -p ~/.config/alacritty/themes
$ git clone https://github.com/alacritty/alacritty-theme ~/.config/alacritty/themes

alacritty.tomlの上部で使用したカラーテーマのtomlファイルをimportします。

import = ["~/.config/alacritty/themes/themes/ubuntu.toml"]

[shell]
program = "/bin/zsh"
args = ['--login']

# 以下設定....

例えばubuntuテーマを読み込むと以下のようなカラーテーマが適応されます。

color theme

起動時にtmuxを実行

Alacritty起動時にtmuxコマンドを実行する設定です。

セッションがあればアタッチし、なければ新しくセッションを作成します。

[shell]
program = "/bin/zsh"
args = ["-l", "-c", "tmux a -t 0 || tmux"]

まとめ

iTerm2からAlacrittyに移行して、ターミナルを操作してる時のもっさり感がなくなったなと感じてます。しばらくはターミナルはAlacrittyを使い続けると思います。

Alacrittyの設定のみを紹介しましたが、プロンプトやzshの設定、tmuxの設定をカスタマイズしていくことになると思います。

参考

PrefectワークフローをKubernetes Jobで実行する

この記事は MLOps Advent Calendar 202320日目の投稿記事です。

PrefectワークフローをKubernetes Jobで実行する方法を紹介します。
本文中コード: code-for-blogpost/prefect_gke at main · nsakki55/code-for-blogpost · GitHub

Prefect × Kubernetes

PrefectではワークフローをKubernetes Jobとして実行することができます。

kubernetesを使用することで計算リソースを柔軟に拡張したり、ワークフローをコンテナ化して管理するメリットがあります。

以下の手順で、PrefectでワークフローをKubernetes Jobとして実行できます。

  • Kubernetes work poolの作成
  • workerをKubernetes上にデプロイ
  • deploymentに起動imageを設定

今回はGKE上でPrefectのワークフローを実行する方法を紹介します。

Kubernetes Bobとして実行するにはいくつかの方法がありますが、今回は3つのパターンを取り上げます。

  • 自動生成 image
  • 自作 image
  • Prefect 公式 image + GCS

GCPリソースの作成

Kubernetes Engine

GKE上にクラスターを作成します。 prefect-cluster という名前のクラスターを作成します。

$ gcloud container clusters create prefect-cluster --num-nodes=1 --machine-type=n1-standard-1

gcloudコマンドのデフォルトクラスターを設定します。

$ gcloud config set container/cluster prefect-cluster

Artifact Registory

PrefectのFlowのimageを保存するために、Artifact Registoryを作成します。

$ gcloud artifacts repositories create prefect-repository --repository-format=docker --location=asia

Artifact Registoryの認証を行います。

$ gcloud auth configure-docker asia-docker.pkg.dev

Cloud Storage

Flowのコードを保存するために、GCSに prefect-gcs-sample という名前のバケットを作成します。

$ gcloud storage buckets create gs://prefect-gcs-sample --location ASIA-NORTHEAST1

Work Poolの作成

PrefectではFlowの実行インフラ環境をwork poolで管理します。
Work Pools & Workers - Prefect Docs

Kubernetes JobとしてFlowを実行する場合、Kubernetesタイプのwork poolをPrefect Cloud上に作成します。

Prefect CloudのWork Poolsページで、Kubernetes pool typeを選択します。

work pool

gke という名前のwork poolを作成します。

gke work pool

Flow実行詳細を設定できます。

Image Pull Policyを Always に設定します。

今回は変更しませんがImage項目を変更することでデフォルトの起動imageを変更できます。

何も設定しない場合はPrefectの公式imageが起動されます。

kubernetes work pool setting

Workerの作成

Worker概要

workerはwork poolに登録されたFlowをポーリングし、指定されたインフラ上でFlowを実行制御します。

workerについての詳しい説明をこちらの記事で取り上げています。
Prefect WorkerをKubernetesにデプロイする - 肉球でキーボード

workerはFlowを実行する環境にデプロイする必要があるため、Kubernetes Clusterでホスティングします。
Work Pools & Workers - Prefect Docs

You must start a worker within an environment that can access or create the infrastructure needed to execute flow runs. The worker will deploy flow runs to the infrastructure corresponding to the worker type. For example, if you start a worker with type kubernetes, the worker will deploy flow runs to a Kubernetes cluster.

workerとpoolがPrefectを運用する全体の中での立ち位置は、以下の図がわかりやすいです。(引用: Introducing Prefect Workers and Projects)

Prefect worker projects

※ workerはPrefect 2.10でリリースされた機能です。Flowの実行制御を行なっていたAgentとInfrastructure・Storage Blockを用いた実行方法は非推奨となっています。
prefect/RELEASE-NOTES.md at main · PrefectHQ/prefect · GitHub

Prefect Cloud API Keyの作成

WorkerがPrefect Cloudの認証に使用するために、Prefect Cloudの API Keys ページでAPI Keyを作成します。

API Keyを作成すると pnu_******* という文字列が一度だけ表示されるので、コピーしておきます。

API Key

HelmでWorkerをデプロイする

Helmを利用することでKubernetesのアプリケーションデプロイを簡単に行えます。

kubernetes workerをデプロイするためのHelm Chartが公式から用意されています。

prefect-helm/charts/prefect-worker at main · PrefectHQ/prefect-helm · GitHub

Helm clientにPrefect Helm repositoryを追加します。

$ helm repo add prefect https://prefecthq.github.io/prefect-helm
$ helm repo update

Kubernetes Clusterに prefect というnamespaceを作成します。

$ kubectl create namespace prefect

先ほど作成したPrefectのAPI Keyを Kubernetes secretで管理します。

$ kubectl create secret generic prefect-api-key --namespace=prefect --from-literal=key=pnu_******

作成したwork poolとPrefect Cloudの設定をPrefect Workerに反映させるために、 values.yaml を作成します。

worker:
  cloudApiConfig:
    accountId: <target account ID>
    workspaceId: <target workspace ID>
  config:
    workPool: <target work pool name>

には先ほど作成したwork poolの gke を設定します。

自分のPrefect Cloudアカウントの account ID, workspace IDは、Prefect CloudのURLから確認できます。以下のフォーマットとなっているため、対応する値を設定します。

https://app.prefect.cloud/account/<target account ID>/workspaces/<target workspace ID>

values.yaml を使用してHelm releaseを行います

$ helm install prefect-worker prefect/prefect-worker --namespace=prefect -f values.yaml

GKE上にworkerがデプロイされていることを確認します。

Prefect Worker

Flowを作成

以下のようなETL処理に見立てた etl.py を用意しました。

from prefect import flow, get_run_logger, task

@task
def extract():
    logger = get_run_logger()
    logger.info("Extract Task")

@task
def transform():
    logger = get_run_logger()
    logger.info("Transform Task")

@task
def load():
    logger = get_run_logger()
    logger.info("Load Task")

@flow
def etl(name: str = "nsakki55"):
    logger = get_run_logger()
    logger.info(f"ETL Start by {name}")
    extract()
    transform()
    load()
    logger.info(f"ETL Finish")

if __name__ == "__main__":
    etl()

Kubernetes JobでFlowを実行

FlowをKuberntes Jobとして実行する3つのパターンを取り上げます。

FlowをPrefect Cloudから実行するにはDeploymentの登録が必要になります。

自動生成 image

Prefectが自動生成したDockerfileでbuildしたimageを元に、FlowのKubernetes Jobを実行します。

コード: code-for-blogpost/prefect_gke/auto_image at main · nsakki55/code-for-blogpost · GitHub

実行に必要なパッケージをまとめたrequirementes.txt を用意します。

prefect>=2.13.8
prefect-docker>=0.4.0
prefect-kubernetes>=0.3.1

Flowの実行環境設定であるdeploymentを prefect.yaml に記述します。

Deploying Flows to Work pools and Workers - Prefect Docs

buildセクションにFlowの実行に必要なdocker imageのbuild方法を記載します。

dockerfile: auto を指定すると、自動的にDockerfileが作成されimageがbuildされます。

work poolには作成した gke poolを設定します。

name: flows
prefect-version: 2.13.8

build:
- prefect_docker.deployments.steps.build_docker_image:
    id: build-image
    requires: prefect-docker>=0.4.0
    image_name: "asia-docker.pkg.dev/{{ $GCP_PROJECT_ID }}/prefect-repository/auto-image"
    tag: latest
    dockerfile: auto
    platform: "linux/amd64"

push:
- prefect_docker.deployments.steps.push_docker_image:
    requires: prefect-docker>=0.4.0
    image_name: "{{ build-image.image_name }}"
    tag: "{{ build-image.tag }}"

pull:
- prefect.deployments.steps.set_working_directory:
    directory: /opt/prefect/auto_image

definitions:
  tags: &common_tags
    - "gke"
  work_pool: &**common_work_pool**
    name: "gke"
    job_variables:
      image: "{{ build-image.image }}"

deployments:
- name: "auto_image"
  tags: *common_tags
  schedule: null
  entrypoint: "flows/etl.py:etl"
  work_pool: *common_work_pool

環境変数GCP_PROJECT_ID を設定します。

$ export GCP_PROJECT_ID=your-gcp-project-id

以下のようなファイル構成となっています。

auto_image/
 │
 ├── flows/
 │     ├── requirements.txt
 │     └── etl.py
 │
 └── prefect.yaml

prefect.yaml があるパス上で以下のコマンドでdeploymentをPrefect Cloudに登録します。

Deploying Flows to Work pools and Workers - Prefect Docs

$ prefect deploy -n etl/auto_image

上のコマンドを実行するとDockerfileが自動生成され、image build、Artifact Registoryへのimage pushが行われます。

以下のDockerfileが自動生成されます。

FROM prefecthq/prefect:2.13.7-python3.8
COPY requirements.txt /opt/prefect/requirements.txt
RUN python -m pip install -r /opt/prefect/requirements.txt
COPY . /opt/prefect/flows/
COPY . /opt/prefect/
WORKDIR /opt/prefect/

Prefect Cloudから auto_image という名前で登録したdeploymentを実行すると、Flowの実行が成功します。

auto image deployment

GKEのワークロードから実行されたFlowのKubernetes Jobを確認できます。自動生成されたimageを起動していることがわかります。

auto image kubernetes job

自作 image

自分で作成したimageを元に、FlowのKubernetes Jobを実行します。

実行に必要なパッケージをまとめた環境を柔軟に用意できるため、こちらの方法を好む人が多いかと思います。

コード: code-for-blogpost/prefect_gke/custom_image at main · nsakki55/code-for-blogpost · GitHub

以下のような Dockerfile.custom を作成します。

FROM prefecthq/prefect:2.13.7-python3.8
COPY requirements.txt /opt/prefect/requirements.txt
RUN python -m pip install -r /opt/prefect/requirements.txt
COPY . /opt/prefect/custom_image/flows/
COPY . /opt/prefect/custom_image/
WORKDIR /opt/prefect/custom_image

以下のようなファイル構成となっています。requirements.txt、flowファイルは上述の自動生成imageで実行する場合と同様のものを使用します。

custom_image/
 │
 ├── requirements.txt
 ├── etl.py
 ├── Dockerfile.custom
 └── prefect.yaml

Dockerfile.custom を元にimage buildを行い、Artifact Registoryへpushを行います。

$ docker build -t asia-docker.pkg.dev/${GCP_PROJECT_ID}/prefect-repository/custom-image:latest -f Dockerfile.custom .
$ docker push asia-docker.pkg.dev/${GCP_PROJECT_ID}/prefect-repository/custom-image:latest

自作imageを元にKubernetes Jobを実行するdeployment設定のprefect.yaml を作成します。

自動生成したimageを使用しないため、buid, pushセクションはnullを設定して起きます。

definitionsセクションの起動imageに、先ほどArtifact Regsitoryに保存した自作imageを指定します。

name: flows
prefect-version: 2.13.8

build: null

push: null

pull:
- prefect.deployments.steps.set_working_directory:
    directory: /opt/prefect/custom_image

definitions:
  tags: &common_tags
    - "gke"
  work_pool: &common_work_pool
    name: "gke"
    job_variables:
      image: "asia-docker.pkg.dev/{{ $GCP_PROJECT_ID }}/prefect-repository/custom-image"

deployments:
- name: "custom_image"
  tags: *common_tags
  schedule: null
  entrypoint: "etl.py:etl"
  work_pool: *common_work_pool

deploymentをPrefect Cloudに登録します。

prefect deploy コマンドを実行すると、以下のように自動生成したimageをbuildをするか聞かれます。今回は自動生成したimageは使用しないため No を選択します。

$ prefect deploy -n etl/custom_image
? Would you like to build a custom Docker image for this deployment? [y/n]
(n):

Prefect Cloudから custom_image deploymentを実行するとワークフローの実行が成功しています。

auto image deployment

FlowのKubernetes Jobを確認すると、自作imageを起動していることがわかります。

custom image kubernetes job

Prefect 公式 image + GCS

Prefect公式imageを起動し、GCSからFlowファイルをダウンロードして実行します。

FlowファイルをDocker imageではなく、GCSで管理することになります。

コード: code-for-blogpost/prefect_gke/base_image_gcs at main · nsakki55/code-for-blogpost · GitHub

以下のようなフォルダ構成をとります。docker imageをbuildしないためDockerfileは必要ありません。

base_image_gcs/
 │
 ├── flows/
 │     └── etl.py
 │
 └── prefect.yaml

GCSにFlowファイルを管理してKubernetes Jobを実行するdeployment設定のprefect.yaml を作成します。

push, pullセクションにGCSの設定を記述します。

bucketに作成した prefect-gcs-sample bucketを指定します。

deploymentの起動imageを指定しない場合、work poolで設定したデフォルトの起動imageで実行されます。

name: flows
prefect-version: 2.13.8

build: null

push:
- prefect_gcp.deployments.steps.push_to_gcs:
    id: push_code
    requires: prefect-gcp>=0.4.3
    bucket: prefect-gcs-sample
    folder: flows 
    credentials: null

pull:
- prefect_gcp.deployments.steps.pull_from_gcs:
    id: pull_code
    requires: prefect-gcp>=0.4.3
    bucket: '{{ push_code.bucket }}'
    folder: flows 
    credentials: null

definitions:
  tags: &common_tags
    - "gke"
  work_pool: &common_work_pool
    name: "gke"

deployments:
- name: "base_image_gcs"
  tags: *common_tags
  schedule: null
  entrypoint: "flows/etl.py:etl"
  work_pool: *common_work_pool

deploymentをPrefect Cloudに登録します。

prefect deploy を実行すると GCS へのFlowファイルのpushが自動的に行われます。

$ prefect deploy -n etl/base_image_gcs

? Would you like to build a custom Docker image for this deployment? [y/n]
(n): n
Running deployment push steps...
> Running push_to_gcs step...
╭────────────────────────────────────────────────────────────────────────────╮
│ Deployment 'etl/base_image_gcs' successfully created with id               │
│ '06f17afc-f4a4-4907-9749-813c8599a620'.                                    │
╰────────────────────────────────────────────────────────────────────────────╯

Prefect Cloudから base_image_gcs deploymentを実行するとワークフローの実行が成功しています。

base image gcs deployment

FlowのKubernetes Jobを確認すると、Prefect公式image prefecthq/prefect:2.13.7-python3.11 を起動していることがわかります。

base image gcs kubernetes job

まとめ

PrefectでワークフローをKubernetes Jobとして実行する方法を紹介しました。

Prefect 2.10以降に出たworkerの登場で、実行インフラの設定が簡易化されたため、PrefectでKubernetesリソースを使用するハードルが下がったなと感じます。

Prefectの仕様変更が多いため、世に出ているドキュメントの情報が古くなってきています。

本記事は2023年12月に書かれたPrefect 2.14.6での情報です。今後仕様変更が起き本記事の内容が古くなる可能性があるのでご注意ください。

Prefect WorkerをKubernetesにデプロイする

Prefect 2.10以降、インフラ管理の方法はAgentが非推奨となりWorkerが推奨となりました。

PrefectのWorkerについてのまとめ、KubernetesにWorkerをデプロイする場合のmanifestを紹介します。
本文中コード: code-for-blogpost/prefect_kubernetes_worker at main · nsakki55/code-for-blogpost · GitHub

Workerとは

PrefectのWorkerは Work Poolとセットで使用し、ワークフローの実行インフラ環境を管理する機能です。

Work Poolはワークフローの実行管理を行うPub/Subのような機能です。
Work Pools & Workers - Prefect Docs
利用するインフラ環境に対応するタイプのWork Poolを作成する必要があります。
Docker、ECS Task、Kubernetesなどのインフラ環境ごとに Work Pool を作成します。ワークフローの実行設定にインフラ環境に対応した Work Poolを選択することで、ワークフローの実行環境を柔軟に変更できます。

WorkerはWork Poolからワークフローを取得し、指定されたインフラ環境でワークフローを実行・制御する役割を持ちます。
ワークフローを実行するインフラ環境にアクセス、またはリソースの作成ができる環境にWorker をデプロイします。
Work Pools & Workers - Prefect Docs

以下はPrefectでワークフローを実行する際の、WorkerとWork Poolの役割を表した図です。
ワークフローの実行設定を Deployment で管理し、実行したいインフラタイプのWork Poolに割り当てます。WorkerはWork Poolに割り当てられたDeploymentを取得し、指定されたインフラ環境でワークフローの実行を行います。

Workers and Work Pools(引用: Workers & Work Pools - Prefect Docs)

公式でサポートされているWorker Typeは7種類あります。

Worker Type(引用: Work Pools & Workers - Prefect Docs)

WorkerとAgent

WorkerはPrefect 2.10から導入されたました。
prefect/RELEASE-NOTES.md at main · PrefectHQ/prefect · GitHub

Prefect 2.10以前のワークフローのインフラ実行管理は、AgentとBlock機能が担っていました。
Worker導入以降、AgentとBlockの利用は非推奨となっています。
Deployments - Prefect Docs

WorkerとAgentの違いをまとめた公式ドキュメントがあります。
Upgrade from Agents to Workers - Prefect Docs

WorkerはAgentとInfrastructure Blockを融合した機能ですが、単なる代替機能ではありません。Workerを使用することでワークフローのデプロイ方法がAgentとInfrastructure Blockを使用する場合と変わります。

AgentからWorkerへの変更点

デプロイコマンドの変更

Prefectが自動的にレポジトリ内のFlowを検知してデプロイを行うようになりました。

リモートストレージ設定方法の変更

Delploymentのpullセクションにリモートストレージの設定を記述するようになりました。Storage Blockの機能自体は残っていて、pullセクションで使用することが可能となっています。
Where to Store Your Flow Code - Prefect Docs

実行インフラ設定方法の変更

  • Agent: Infrastructure Block
  • Worker: 実行インフラタイプのWork Pool

複数Deploymentの管理
Deploymentごとに1つの設定ファイルを作成する必要がありましたが、Prefect2.10以降は複数のprefect.yamlファイルで管理できるようになりました。

AgentとWorkerの類似点

  • Storage Blockを prefect.yamlのpullセクション中に設定可能
  • Work Poolの詳細設定項目とInfrastructure Blockの設定が似ている
  • Deploymentのインフラ設定の上書きを infra_override から job_variable に変更
  • ユーザーのインフラ環境でのAgentとWorkerの起動方法が実質同じ方法
    • Agent:prefect agent start --pool <work pool name>
    • Worker: prefect worker start --pool <work pool name>

Kubernetes Workerのデプロイ

PrefectでワークフローをKubernetesで実行する場合、WorkerをKubernetesにデプロイする方法があります。
公式ではHelmによるデプロイが推奨されているので、デプロイ方法を紹介します。

Helmによるデプロイ

HelmKubernetesのアプリケーションデプロイを簡単に行えるようにするツールです。
PrefectではKubernetes Workerのデプロイを、Prefect公式が提供してるHelmによるデプロイを推奨しています。
prefect-helm/charts/prefect-worker at main · PrefectHQ/prefect-helm · GitHub

Helmを使用したKubernetesへのWorkerのデプロイ方法が公式ドキュメントに紹介されています。
Kubernetes - Prefect Docs

Helmをローカル環境にインストールしておきます。Macの場合はbrewでインストール可能です。
Helm | Helm のインストール

$ brew install helm

Helm clientにPrefect Helm repositoryを追加します。

$ helm repo add prefect https://prefecthq.github.io/prefect-helm
$ helm repo update

prefectnamespaceにPrefectのAPIキーをKubernetesのSecretに作成します。

$ kubectl create secret generic prefect-api-key --namespace=prefect --from-literal=key=your-prefect-cloud-api-key

kubernetes typeのWork PoolとPrefect Cloudのaccount ID, workspace IDを記述した values.yaml を作成します。

worker:
  cloudApiConfig:
    accountId: <target account ID>
    workspaceId: <target workspace ID>
  config:
    workPool: <target work pool name>

自分のPrefect Cloudアカウントの account ID, workspace IDは、Prefect CloudのURLから確認できます。以下のフォーマットとなっているため、対応する値を設定します。

https://app.prefect.cloud/account/<target account ID>/workspaces/<target workspace ID>

Helm releaseを行いWorkerをkubernetes clusterにデプロイします。

$ helm install prefect-worker prefect/prefect-worker --namespace=prefect -f values.yaml

Helmで生成されるmanifest

helm templateコマンドを使って生成されるmanifestを確認することができます。

$ helm template prefect-worker prefect/prefect-worker --namespace=prefect -f values.yaml >> manifest.yaml

以下のmanifestをもとにworkerが起動されます。

Helmを使用せずWorkerの管理をmanifestで行いたい場合は、以下の内容を元に作成するのがいいと思います。

---
# Source: prefect-worker/templates/serviceaccount.yaml
apiVersion: v1
kind: ServiceAccount
metadata:
  name: prefect-worker
  namespace: "prefect"
  labels:
    app.kubernetes.io/name: prefect-worker
    helm.sh/chart: prefect-worker-2023.11.28
    app.kubernetes.io/instance: prefect-worker
    app.kubernetes.io/managed-by: Helm
    app.kubernetes.io/version: "2.14.7"
    app.kubernetes.io/component: worker
    prefect-version: 2.14.7
---
# Source: prefect-worker/templates/role.yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: prefect-worker
  namespace: "prefect"
  labels:
    app.kubernetes.io/name: prefect-worker
    helm.sh/chart: prefect-worker-2023.11.28
    app.kubernetes.io/instance: prefect-worker
    app.kubernetes.io/managed-by: Helm
    app.kubernetes.io/version: "2.14.7"
    app.kubernetes.io/component: worker
    prefect-version: 2.14.7
rules:
- apiGroups: [""]
  resources: ["pods", "pods/log", "pods/status"]
  verbs: ["get", "watch", "list"]
- apiGroups: ["batch"]
  resources: ["jobs"]
  verbs: [ "get", "list", "watch", "create", "update", "patch", "delete" ]
---
# Source: prefect-worker/templates/rolebinding.yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: prefect-worker
  namespace: "prefect"
  labels:
    app.kubernetes.io/name: prefect-worker
    helm.sh/chart: prefect-worker-2023.11.28
    app.kubernetes.io/instance: prefect-worker
    app.kubernetes.io/managed-by: Helm
    app.kubernetes.io/version: "2.14.7"
    app.kubernetes.io/component: worker
    prefect-version: 2.14.7
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: Role
  name: prefect-worker
subjects:
  - kind: ServiceAccount
    name: prefect-worker
    namespace: "prefect"
---
# Source: prefect-worker/templates/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: prefect-worker
  namespace: "prefect"
  labels:
    app.kubernetes.io/name: prefect-worker
    helm.sh/chart: prefect-worker-2023.11.28
    app.kubernetes.io/instance: prefect-worker
    app.kubernetes.io/managed-by: Helm
    app.kubernetes.io/version: "2.14.7"
    app.kubernetes.io/component: worker
    prefect-version: 2.14.7
spec:
  replicas: 1
  selector:
    matchLabels:
      app.kubernetes.io/name: prefect-worker
      app.kubernetes.io/instance: prefect-worker
      app.kubernetes.io/component: worker
  template:
    metadata:
      labels:
        app.kubernetes.io/name: prefect-worker
        helm.sh/chart: prefect-worker-2023.11.28
        app.kubernetes.io/instance: prefect-worker
        app.kubernetes.io/managed-by: Helm
        app.kubernetes.io/version: "2.14.7"
        app.kubernetes.io/component: worker
        prefect-version: 2.14.7
    spec:
      serviceAccountName: prefect-worker
      securityContext:
        fsGroup: 1001
        runAsNonRoot: true
        runAsUser: 1001
      containers:
        - name: prefect-worker
          image: "prefecthq/prefect:2.14.7-python3.11-kubernetes"
          imagePullPolicy: IfNotPresent
          command:
            - /usr/bin/tini
            - -g
            - --
            - /opt/prefect/entrypoint.sh
          args:
            - prefect
            - worker
            - start
            - --type
            - "kubernetes"
            - --pool
            - "<target work pool name>"
            - --install-policy
            - "prompt"
          workingDir: /home/prefect
          env:
            - name: HOME
              value: /home/prefect
            - name: PREFECT_WORKER_PREFETCH_SECONDS
              value: "10"
            - name: PREFECT_WORKER_QUERY_SECONDS
              value: "5"
            - name: PREFECT_API_ENABLE_HTTP2
              value: "true"
            - name: PREFECT_API_URL
              value: "https://api.prefect.cloud/api/accounts/<target account ID>/workspaces/<target workspace ID>"
            - name: PREFECT_KUBERNETES_CLUSTER_UID
              value: ""
            - name: PREFECT_API_KEY
              valueFrom:
                secretKeyRef:
                  name: prefect-api-key
                  key:  key
            - name: PREFECT_DEBUG_MODE
              value: "false"
          envFrom:
          resources:
            limits:
              cpu: 1000m
              memory: 1Gi
            requests:
              cpu: 100m
              memory: 256Mi
          securityContext:
            allowPrivilegeEscalation: false
            readOnlyRootFilesystem: true
            runAsNonRoot: true
            runAsUser: 1001
          volumeMounts:
            - mountPath: /home/prefect
              name: scratch
              subPathExpr: home
            - mountPath: /tmp
              name: scratch
              subPathExpr: tmp
      volumes:
        - name: scratch
          emptyDir: {}

参考