肉球でキーボード

MLエンジニアの技術ブログです

プロダクトマネージャーのしごとを読みました


「プロダクトマネージャーのしごと」を読んだので、メモをかねて各章の感想を書きます。

www.oreilly.co.jp

全体を通しての感想

原本は Product Management in Practice, 2nd Edition で本書は日本語訳となります。

本書が出版された時にプロダクトマネージャーの実践的な本が出たと話題になりました。

プロダクトマネージャーの仕事の実態を包み欠かさず明かし、どうプロダクトマネージャーとしての成功率を高めていくかを紹介していいます。

いわゆるベストプラクティスをまとめた内容ではなく、プロダクトマネージャーとして日々の業務にどう向き合っていけば良いかの指針を示してくれる内容です。
あまりに現実的な内容で、自分としては胃が痛くなるような話が多かったです。

機械翻訳っぽさがなく、丁寧に訳された日本語訳となっていて読みやすいです。

文章がウィットに富んでいる言い回しが多く、文体も堅苦しくないため一気に読めてしまいました。

各章の内容

1. プロダクトマネジメントの実践

1.1 プロダクトマネジメントとは?
1.2 プロダクトマネジメントではないこと
1.3 優れたプロダクトマネージャーのプロフィール
1.4 悪いプロダクトマネージャーのプロフィール
1.5 プロダクトマネージャーとして週に60時間も働いてはいけない
1.6 プログラムマネージャーとは?プロダクトオーナーとは?
1.7 まとめ:曖昧さの海を航海する
1.8 チェックリスト

プロダクトマネージャーは一体何をやる人なのか説明しています。

プロダクトマネジメントについて定義するものではなく(というか筆者はプロダクトマネジメントは定義できるものではないと指摘してます)、プロダクトマネージャーと呼ばれてる人たちに共通してる内容を整理しています。

悪いプロダクトマネージャーの紹介に、プロダクト殉教者と呼ばれる言葉が出てきます。冗談っぽく話してますが、事実こういった状況に陥るプロダクトマネージャーがいかに多いか書かれていました。

最も印象的だったのが、プロアクトマネージャーがもたらす実際の価値は簡単に数値化できるものではなく、他職種に比べプロダクマネージャーは実際に何をやったのか分からないという内容でした。

2. プロダクトマネジメントのCOREスキル

2.1 ハイブリッドモデル:UX/テクノロジー/ビジネス
2.2 プロダクトマネジメントのCOREスキル:コミュニケーション、組織化、リサーチ、実行
    2.2.1 コミュニケーション
    2.2.2 組織化
    2.2.3 リサーチ
    2.2.4 実行
2.3 でも……ハードスキルについてはどうなのか?
2.4 まとめ:プロダクトマネジメントに対する会話を変える
2.5 チェックリスト

プロダクトマネジメントに必要なスキルを4つ挙げています

  • Communicate: ステークホルダーとコミュニケーションする
  • Organize: 持続的に成功するチームを組織化する
  • Research: プロダクトのユーザーのニーズとゴールをリサーチする
  • Execute: プロダクトチームがゴールに到達するための日々のタスクを実行する

ソフトスキルを中心に話した後、ハードスキルは必要なのか?について書かれています。

一般的にハードスキルに基づいてプロダクトマネージャーを採用する組織が多いが、プロダクトマネージャーに期待される日々の仕事とほとんど関係ないと指摘しています。

3. 好奇心をあらわにする

3.1 心からの興味を持つ
3.2 しなやかマインドセットを育む
3.3 間違いという贈り物
3.4 守りの姿勢と距離を置く
3.5 「なぜ」を使わずに理由を尋ねる
3.6 好奇心を広げる
3.7 まとめ:好奇心がカギ
3.8 チェックリスト

プロダクトマネージャーとしてのチームメンバーとの関わり方や、マインドについて書かれています。

自分が理解できる領域の職種の人とだけ関わるのではなく、様々な専門分野の人と仕事が出来る環境を作ることの重要性を説いています。

人とのつながりの中で仕事をするため、人間性の良さは重要だなと改めて実感しました。

4. 過剰コミュニケーションの技術

4.1 あたりまえを問う
4.2 遠回しではなく、単刀直入に
4.3 すべてがあなたのせいではない。アウトカムは意図より重要
4.4 プロダクトマネジメントでいちばん危険な言葉:「よさそう」
4.5 「よさそう」からの脱却戦術:Disagree&Commit
4.6 いろいろなコミュニケーションスタイルを意識する
4.7 コミュニケーションはあなたの仕事。仕事をすることで謝罪してはいけない
4.8 過剰コミュニケーションの実践:プロダクトマネージャーの3つのよくあるコミュニケーションシナリオ
    4.8.1 シナリオ1
    4.8.2 シナリオ2
    4.8.3 シナリオ3
4.9 まとめ:迷ったらコミュニケーション
4.10 チェックリスト

コミュニケーション不足から、経営陣が期待していたものと、現場のエンジニアが作り上げたプロダクトが異なるシチュエーションを取り上げて、プロダクトマネージャーのコミュニケーションの重要性を紹介しています。

コミュニケーションを曖昧にすることで起きうる悲劇のエピソードが散りばめられていて、過剰すぎるコミュニケーションの必要性を説いている章になります。

5. シニアステークホルダーと働く(ポーカーゲームをする)

5.1 「影響力」から情報へ
5.2 気に入らない答えでも答えは答え
5.3 「上司は馬鹿だ」、もしくは、おめでとう――あなたはチームを壊した
5.4 警告なしで驚かさない
5.5 社内政治の世界でユーザー中心主義を貫く
5.6 シニアステークホルダーも人間だ
5.7 実践ポーカーゲーム:シニアステークホルダーマネジメントの3つのよくあるシナリオ
    5.7.1 シナリオ1
    5.7.2 シナリオ2
    5.7.3 シナリオ3
5.8 まとめ:これはあなたの仕事の一部であり、障害ではない
5.9 チェックリスト

プロダクトマネージャーは組織的権力があるステークホルダーとチームのメンバーとの間で働くことになります。

本章は社内政治にまつわる内容となっていて、プロダクトマネージャーとして避けては通れない調整の話が出てきます。

ステークホルダーとのやりとり例が3つ取り上げられていますが、どれもリアルな話で学びが大きかったです。

6. ユーザーに話しかける(あるいは「ポーカーって何?」)

6.1 ステークホルダーとユーザーは違う
6.2 そう、ユーザーと話す方法を学ばなければいけない
6.3 ペルソナ・ノン・グラータ
6.4 プロダクトとリサーチ:友だちのふりをした敵から、永遠の大親友へ
6.5 まとめ:いや、真剣な話、ユーザーとの会話を学ぶべき
6.6 チェックリスト

プロダクトを良いものにしていくには、ユーザーの声を聞くことが重要であると指摘しています。
プロダクトマネージャーとしてユーザーの真の課題を知るための方法が書かれています。

7. 「ベストプラクティス」のワーストなところ

7.1 誇張を鵜呑みにしない
7.2 現実と恋に落ちる
7.3 フレームワークやモデルは有用なフィクション
7.4 あなたはここにいる
7.5 何のために問題解決しているのか?
7.6 でも、前のチームではうまくいったのに
7.7 「プロセス嫌い」と働く
7.8 ベストプラクティスのベストなところ
7.9 まとめ:出発点であって保証ではない
7.10 チェックリスト

プロダクトマネジメントのベストプラクティスに固執することの危険性を取り上げています。

プロダクトマネジメントの本当を細かく知りたい場合、業界トップ企業のケーススタディではなく、自分の組織の身近なマネージャーに聞くことを推奨しています。

8. アジャイルについての素晴らしくも残念な真実

8.1 アジャイルにまつわる3つの迷信を論破する
8.2 アジャイルマニフェストに目を向ける
8.3 マニフェストからモンスターへ
8.4 アリスター・コーバーンの「アジャイルのこころ」を再発見する
8.5 アジャイルと「常識の我が物化」
8.6 アジャイルを「正しく」やって悪化する場合
8.7 アジャイルを「正しく」やって改善する場合
8.8 二度としたくないアジャイルについての会話7選
8.9 まとめ:曖昧さはここにも
8.10 チェックリスト

著者のMatt LeMay氏は本書の他にアジャイルについての本も出版しています。(みんなでアジャイル ―変化に対応できる顧客中心組織のつくりかた)

本書では教科書通りのアジャイルの実践にこだわることで、チームが方向性を見失う危険性を指摘しています。

9. ドキュメントは無限に時間を消費する(そう、ロードマップもドキュメント)

9.1 「プロダクトマネージャーがロードマップの持ち主だ!」
9.2 問題はロードマップではなく、それをどう使うか
9.3 ガントチャート上は欲しいものが必ず手に入る
9.4 プロダクトの仕様書はプロダクトではない
9.5 最高のドキュメントは不完全
9.6 最初のドラフトは1ページで、作るのに1時間以上かけない
9.7 テンプレートがある場合
9.8 商用ロードマップツールとナレッジマネジメントツールの簡単なメモ
9.9 まとめ:メニューは食事ではない
9.10 チェックリスト

プロダクトの戦略ロードマップを完璧なドキュメントにまとめたくある衝動を抑えた方が良いと述べています。

ロードマップは約束ではなく、戦略的なコミュニケーション用のドキュメントとしています。

不完全なドキュメントの方がチームメンバーとの議論を進められ、完全なドキュメントを作ることより価値があると指摘しています。

ガントチャートを利用するとチームがアウトプットよりアウトカムを優先する状況を取り上げていました。ガントチャートが導入される妥当な理由としてステークホルダーがスケジュールを正確に知りたいケースががあり、不確実なことや変わりそうなことを率直に伝える必要性を述べています。

10. ビジョン、ミッション、達成目標、戦略を始めたとしたイケてる言葉たち

10.1 アウトカムとアウトプットのシーソー
10.2 SMARTなゴール、CLEARなゴール、OKRなどなど
10.3 優れた戦略と実行は不可分だ
10.4 優れた戦略はシンプルで明快だ
10.5 わからないなら例を求めよう
10.6 まとめ:シンプルに保ち、役立たせよう
10.7 チェックリスト

プロダクトマネジメントにまつわる大袈裟でイケてる言葉の多くは、2つの質問に集約されると指摘しています。

  • 何を達成しようとしているのか
  • どうやって達成するつもりか

現場のメンバーからのフィードバックを得ずに作った「世界一の包括的戦略プレゼンテーション資料」が、まとまりのない、ビジネス用語を寄せ集めた委員会謹製の、ややこしい図表や希望的観測だらけの代物と切り捨てる話が個人的にお気に入りです。

11. 「データ、舵を取れ!」

11.1 「データ」という禁句をめぐるトラブル
11.2 意思決定から始め、それからデータを見つける
11.3 重要な指標に集中する
11.4 明確な期待を定めるためにサバイバル指標を利用する
11.5 実験とその不満
11.6 「説明責任」から行動へ
11.7 まとめ:近道なんてない!
11.8 チェックリスト

プロダクトマネージャが日々の業務でデータとどう付き合っていくべきか書かれています。

意思決定のために十分なデータがないことを嘆くプロダクトマネージャーの例が取り上げられています。したいと思っている意思決定から始めて、「あればなおよい」ものとしてデータを扱うことが書かれています。

データドリブンという言葉に踊らされて、ユーザー課題の解決から遠ざかるケースは容易に起きるなと思います。

12. 優先順位づけ: すべてのよりどころ

12.1 層になったケーキをひと口食べる
12.2 どの意思決定もトレードオフ
12.3 体験全体に留意する
12.4 うわべだけの魅力ではなく本質の理解へ
12.5 でもこれは緊急なんです!
12.6 優先順位づけの実際:項目は同じでも、ゴールと戦略に応じて結果は変わる
12.7 まとめ:志は大きく、スタートは小さく
12.8 チェックリスト

プロダクトマネージャーとしての仕事の大部分は、何をどのようにいつ作るか決めることと述べられています。

大きな計画や意思決定を分割して、フィードバックをもらいながら微調整をしていくことが必要とされています。

13. おうちでやってみよう: リモートワークの試練と困難

13.1 遠くから信頼を築く
13.2 シンプルなコミュニケーションアグリーメントが意味のある信頼を築く
13.3 同期コミュニケーションと非同期コミュニケーション
13.4 分散チームのための同期コミュニケーション:時間と空間を企画する
13.5 分散チームのための非同期コミュニケーション:具体的な期待を設定する
13.6 「同期サンドイッチ」を作る
13.7 非公式なコミュニケーションのためのスペースを作り、それを守る
13.8 ハイブリッドな時代:対面とリモートのバランスを取る
13.9 まとめ:あなたのコミュニケーション能力の強化トレーニング
13.10 チェックリスト

時流に合わせたテーマでリモートワークの中でプロダクトマネージャーとして働く難しさを取り上げてる章です。

非同期メッセージで送信する前に確認するチェックリストが、自分は刺さりました

  • メールを読んだ受信者は、10秒以内に、取ってほしいアクションがわかるか?
  • 期待する結果と納期は明記してあるか?
  • 複数の受信者に送る場合は、それぞれの受信者への依頼事項は明確か?
  • フィードバックを求める場合は、どんな種類のフィードバックを求めているのか、フィードバックを求める理由を明確に示したか?
  • 一般的なフォローアップやチェックの場合は、どんなタイプの反応・アクションが欲しいかを明確にしているか?

同期コミュニケーションと非同期コミュニケーションをうまく使いこなすための実践方法があり、おすすめの章です。

14. プロダクトマネージャーのなかのマネージャー(プロダクトリーダーシップ編)

14.1 出世する
14.2 びっくり!あなたがしていることはすべて間違い
14.3 自分自身に課す基準は自分がチームに課す基準
14.4 自律性の限界
14.5 明確なゴール、明確なガードレール、小さなフィードバックループ
14.6 自分自身を客体化する
14.7 プロダクトリーダーシップの実践
    14.7.1 シナリオ1
    14.7.2 シナリオ2
    14.7.3 シナリオ3
14.8 まとめ:最高の自分へ踏み出す
14.9 チェックリスト

プロダクトマネージャーから上の役職に就いた場合の内容になっています。

基本的には前章までのプロダクトマネージャーについて指摘されていた内容と通ずる内容となっています。

15. 良いときと悪いとき

15.1 自動操縦の組織の心地よい静寂
15.2 良いときは(必ずしも)簡単なときではない
15.3 世界の重荷を背負う
15.4 世界最高の会社で働いていることを想像する
15.5 まとめ:大変な仕事だがその価値はある
15.6 チェックリスト

COREスキルが実践され、プロダクトマネージャーとしてうまく行ってる時が、必ずしも物事が簡単に運んでいる時ではないと指摘しています。

新しい課題を積極的に探し、率直な気持ちで、好奇心を持ち、先入観なく取り組んでいる時がプロダクトマネジメントとしてうまく行ってる時と述べています。

世界の重荷を背負い込もうとする人にとって、プロダクトマネジメントはハマりすぎる仕事と注意しています。

16. どんなことでも

本書の総括的な立ち位置の章です。

プロダクトマネージャーの仕事は、プロダクトのためになるならどんなことでも行うというのが本書のスタンスだったので、それをおさらいしています。

機械学習システムデザインを読みました

機械学習システムデザイン」を読んだので、感想・各章の内容についてまとめます

www.oreilly.co.jp

全体を通しての感想

原本は Designing Machine Learning Systems で本書は日本語訳版となります。

原本の著者である Chip Huyen氏が書籍に関する情報をまとめたGitHubレポジトリを公開しています。https://github.com/chiphuyen/dmls-book

著者が現場の機械学習エンジニアということもあり、書かれている内容は実際の現場に即した内容に徹していました。実環境の機械学習システムで直面する課題やユースケースをまとめていて、機械学習エンジニアが普段感じてる課題をうまく言語化しています。

特に筆者の経験に基づいて現場で直面した問題や手堅い手法が書かれているのは、実務家として目から鱗の内容でした。

本書は概念的な内容が書かれていて、具体的なプログラムやライブラリの使い方を紹介するチュートリアル本ではないです。

MLOpsの背景にある機械学習システムの難しさや、対応方法について体型的に学ぶのにちょうどいい本だと思います。

前提知識として要求されてるソフトウェアエンジニア知識がそこまで多くないので、分析業務がメインのデータサイエンティストの方にも取っ付きやすい内容だと思います。

oreily本は機械翻訳にかけたような日本語訳の本がたまにありますが、本書は機械翻訳っぽさがなく、人の手で丁寧に訳された文章で読みやすい日本語訳となってます。

各章の内容

1. 機械学習システムの概要

1.1 機械学習を使うとき
    1.1.1 機械学習のユースケース
1.2 機械学習システムの理解
    1.2.1 研究分野での機械学習と実現場での機械学習
    1.2.2 機械学習システム vs. 従来のソフトウェア
1.3 まとめ

どういう状況下で機械学習が役に立つのか・役に立たないのかを、実例を交えながら紹介しています。機械学習エンジニアをやっていると、機械学習を使ってサービスに応用できないか?という相談を受ける場面があると思います。ToC, ToBの両方で機械学習ユースケースをまとめているので、実務者が書いてる本で流石だなと感じます。

研究分野と対比して、実際の機械学習システムではさまざまな利害関係者が存在し、ソフトスキルが求められる点だったり、実サービスで使うため推論レイテンシー要件や予測値によってユーザーが不公平を被らないようにする必要がある特徴を話をしてます。

機械学習はソフトウェアエンジニアの一部と思えば、ソフトウェアエンジニアのベストプラクティスを機械学習に適用すればいいのではないか?という疑問に対して、コートとデータが密に関わっている観点から、従来のソフトウェアエンジニアと機械学習システムの違いを論じてます。

2. 機械学習システム設計の概要

2.1 ビジネスと機械学習の目標
2.2 機械学習システムの要件
    2.2.1 信頼性
    2.2.2 拡張性
    2.2.3 保守性
    2.2.4 適応性
2.3 反復型プロセス
2.4 機械学習の問題の組み立て
    2.4.1 機械学習のタスクタイプ
    2.4.2 目的関数
2.5 マインド vs. データ
2.6 まとめ

システム設計を行うために、ビジネス要件とシステム要件の両方を整理してます。ビジネスゴールと機械学習のメトリクスが結びついている必要性を強調しています。システム設計の技術的な部分だけを切り取らずに、あくまで実務者向けの本という気概を感じます。

3.データエンジニアリングの基礎知識

3.1 データソース
3.2 データフォーマット
    3.2.1 JSON
    3.2.2 行優先フォーマット vs. 列優先フォーマット
    3.2.3 テキスト形式 vs. バイナリ形式
3.3 データモデル
    3.3.1 リレーショナルモデル
    3.3.2 NoSQL
    3.3.3 構造化データ vs. 非構造化データ
3.4 データストレージエンジンと処理
    3.4.1 トランザクション型処理と分析型処理
    3.4.2 ETL:抽出(extract)、変換(transform)、格納(load)
3.5 データフローの形態
    3.5.1 データベース経由のデータの受け渡し
    3.5.2 サービス経由のデータの受け渡し
    3.5.3 リアルタイム伝送経由のデータの受け渡し
3.6 バッチ処理 vs. ストリーム処理
3.7 まとめ

こういった機械学習システム本には珍しく、データエンジニアリングを取り上げています。

本書全体を通して、機械学習システムがデータに密接に関わっている点を強調してるため、機械学習システムを開発する者ならデータ基盤にも目を向けないといけないという意図を感じます。

扱っている内容はデータ基盤を作るためのツール説明というよりは、背後にある概念の説明となっています。

4.訓練データ

4.1 サンプリング
    4.1.1 非確率サンプリング
    4.1.2 シンプルなランダムサンプリング
    4.1.3 層化サンプリング
    4.1.4 重み付きサンプリング
    4.1.5 リザーバーサンプリング
    4.1.6 重点サンプリング
4.2 ラベル付け
    4.2.1 手作業によるラベル
    4.2.2 天然のラベル
    4.2.3 欠損ラベルへの対処
4.3 クラスの不均衡
    4.3.1 クラスの不均衡という問題
    4.3.2 クラスの不均衡への対処
4.4 データオーグメンテーション
    4.4.1 ラベルを保ったシンプルな変換
    4.4.2 摂動
    4.4.3 データの合成
4.5 まとめ

現場での訓練データ作成の課題に対するアプローチを扱っています。不均衡データに対してサンプリング、評価指標、データ水増しという異なる視点での取り扱い方法が書かれているのはありがたいです。

アノテーションで起きる課題と対応策について詳細に書かれている文献は珍しいなと思います。

5.特徴量エンジニアリング

5.1 特徴の学習と特徴エンジニアリング
5.2 一般的な特徴エンジニアリングの活動
    5.2.1 欠損値の処理
    5.2.2 スケーリング
    5.2.3 離散化
    5.2.4 カテゴリー特徴のエンコード
    5.2.5 特徴交差
    5.2.6 離散的および連続的な位置埋め込み
5.3 データリーク
    5.3.1 データリークの主な原因
    5.3.2 データリークの検出
5.4 優れた特徴の設計
    5.4.1 特徴重要度
    5.4.2 特徴の汎化
5.5 まとめ

深層学習モデルは特徴を自動的に学習するため、特徴量作成の手間をかける必要がありませんが、実運用されるモデルが深層学習モデルのケースはまだまだ多くないと指摘しています。

カテゴリ特徴の扱いにhasing trickの有効性を推しています。Booking.com のhasing trickに関する論文が取り上げられていたので後で読んでみようと思います。

筆者は特徴の設計についてのキャッチアップはkaggleのコンペ解法を読むことが役立ったと述べています。本書全体を通じてkaggleに関する文献の引用が多いです。

6.モデル開発とオフライン評価

6.1 モデルの開発と訓練
    6.1.1 機械学習モデルの評価
    6.1.2 アンサンブル
    6.1.3 実験管理とバージョン管理
    6.1.4 分散訓練
    6.1.5 AutoML
6.2 モデルのオフライン評価
    6.2.1 ベースライン
    6.2.2 評価手法
6.3 まとめ

実運用されるモデルを作成する際のTips的な内容が紹介されてます。バージョン管理の節で、データのバージョン管理をデンタルフロスに例え、取り組んだ方がいいと多くの人は賛同するが、実際に取り組んでいる出る人はほとんどいないことを指摘してました。身に覚えがあるので、皆んな頭で分かってはいても後回しになってる状態なんだなと同情します。

機械学習モデルのデバッグ」というコラムがあり、MLに問題が起きた時の原因究明の難しさをうまく言語化していて面白かったです。

評価指標にキャリブレーションの重要性を取り上げています。自分がいる広告業界では取り上げられることが多い指標ですが、MLの評価指標として取り上げられる例を見たことがなかったので目新しさがありました。

7.モデルのデプロイと予測サービス

7.1 機械学習におけるデプロイの誤解
    7.1.1 誤解1:一度にデプロイする機械学習モデルはひとつか2つしかない
    7.1.2 誤解2:何もしなければモデルのパフォーマンスは変わらない
    7.1.3 誤解3:モデルを頻繁に更新する必要はない
    7.1.4 誤解4:大抵の機械学習エンジニアはスケールを気にする必要がない
7.2 バッチ予測 vs. オンライン予測
    7.2.1 バッチ予測からオンライン予測へ
    7.2.2 バッチパイプラインとストリームパイプラインの統合
7.3 モデル圧縮
    7.3.1 低ランク分解
    7.3.2 知識蒸留
    7.3.3 枝刈り
    7.3.4 量子化
7.4 クラウドとエッジでの機械学習
    7.4.1 エッジデバイス向けのモデルのコンパイルと最適化
    7.4.2 Webブラウザでの機械学習
7.5 まとめ

本書を通じてこの章がもっともエンジニアリング色が強い内容です。その理由を機械学習モデルのデプロイが機械学習の課題ではなく、エンジニアリングの課題として扱っているからと述べてます。

エッジデバイス上でのモデル最適化につい書かれていて、計算グラフの話が登場します。最適手法の詳細部分までには触れていませんが、どういった手法があるのか全体像を把握するのにちょうどいい内容でした。

筆者は今後ハードウェア性能が向上し、機械学習向けに最適化されればエッジデバイス上でのオンライン予測に移行するとだろうと述べてます。

エッジデバイス上でのMLOpsが今後は注目される、もしくは既にされてるのかもしれません。

8.データ分布のシフトと監視

8.1 機械学習システムの障害の原因
    8.1.1 ソフトウェアシステムの障害
    8.1.2 機械学習特有の障害
8.2 データ分布のシフト
    8.2.1 データ分布のシフトの種類
    8.2.2 一般的なデータ分布シフト
    8.2.3 データ分布シフトの検出
    8.2.4 データ分布シフトへの対処
8.3 監視と可観測性
    8.3.1 機械学習特有の指標
    8.3.2 監視ツールボックス
    8.3.3 可観測性
8.4 まとめ

機械学習システムで起きる障害の原因を、ソフトウェアシステムとしての障害と機械学習特有の障害に分け、機械学習特有の問題について深ぼっています。

MLOpsの文脈で機械学習特有の問題は様々挙げられますが、本章では主にデータに関わる問題を取り上げています。

エッジケースやフィードバックループの問題という、取り上げられることが少ない問題にもスポットライトを当てています。

特にデータドリフトについて非常に詳しく書かれています。データドリフトとして有名どころの共変量シフト、ラベルシフト、コンセプトドリフトや、現実世界でよく起きる特徴の変化、ラベルスキーマの変化について取り上げています。

監視の節では機械学習特有の指標をモデル精度、予測、特徴、生の入力の4項目に分けています。

9.実現場での継続学習とテスト

9.1 継続学習
    9.1.1 ステートレス再学習 vs. ステートフル学習
    9.1.2 継続学習の必要性
    9.1.3 継続学習の課題
    9.1.4 継続学習の4つのステージ
    9.1.5 モデルを更新する頻度
    9.1.6 データの鮮度の価値
9.2 実環境でのテスト
    9.2.1 シャドウデプロイ
    9.2.2 A/Bテスト
    9.2.3 カナリアリリース
    9.2.4 インターリービング試験
    9.2.5 バンディット
9.3 まとめ

データドリフトの検知については前節の監視部分で紹介されました。この節ではデータドリフトにモデルをどう対応させるか?の答えとして継続学習について触れている内容です。

モデルを毎回ゼロから訓練するステートレス再学習に対比して、前回のモデル状態から少量のデータで学習を行う方法をステートフル学習としています。多くの企業ではステートレス再学習を行なっている中、筆者はステートフル学習の有効性を取り上げています。

モデルをどれくらいの頻度で更新すべきか?という機械学習エンジニアなら誰しも思ったことがある疑問に対して、本節ではデータ鮮度の価値を定量化することの重要性を説いてます。

一方、現場でそのような理想論は通じないことも述べていて、「可能な限り頻繁に」と言い切っているのは共感を覚えました。

10.MLOpsにおけるインフラとツール

10.1 ストレージとコンピューティング
    10.1.1 パブリッククラウド vs. プライベートデータセンター
10.2 開発環境
    10.2.1 開発環境の構築
    10.2.2 開発環境の標準化
    10.2.3 コンテナ:開発環境から本番環境へ
10.3 リソース管理
    10.3.1 cron、スケジューラー、オーケストレーター
    10.3.2 データサイエンスのワークフロー管理
10.4 機械学習プラットフォーム
    10.4.1 モデル開発
    10.4.2 モデルストア
    10.4.3 特徴ストア
10.5 構築するか購入するか
10.6 まとめ

本書全体の中で唯一具体的なツールの名前が多く登場する節になっています。

各ツールの詳細というよりは、MLOpsを取り組む中で出てくる考慮すべき点や、それに対して各ツールがどのような機能を備わっているのかがメインに書かれています。

データサイエンティストの開発環境について1節も使って書かれているのは、開発生産性を上げることの意義をデータサイエンティストに認知してもらいたい筆者の熱量を感じました。

自分はなんとなくModel StoreやFeature Storeの機能やメリットを理解していましたが、この節を読んで認識の甘さを反省しました。

自分はModel Storeをモデルの再現に必要な実験条件がわかるメタデータを管理するものとしてふわっと理解してましたが、本書の中では以下の情報を管理すると述べてます。

  • モデル定義
  • モデルのパラメーター
  • 特徴抽出関数と予測関数
  • 依存関係
  • データ
  • モデル生成に関するコード
  • 実験のアーティファクト
  • タグ

Feature Storeについて具体的なツール名と使い勝手と共に書かれているのは目新しさを感じました。

自分がFeature Storeの重要性を理解できていなかったので、これを気にFeastを触ってみようと思います。

11.機械学習の人的側面

11.1 ユーザー体験
    11.1.1 ユーザー体験の一貫性の確保
    11.1.2 「ほぼ正しい」予測との戦い
    11.1.3 スムーズな失敗
11.2 チーム構成
    11.2.1 機能横断的なチームのコラボレーション
    11.2.2 エンド・ツー・エンドなデータサイエンティスト
11.3 責任あるAI
    11.3.1 無責任なAI:ケーススタディ
    11.3.2 責任あるAIのフレームワーク
11.4 まとめ

人的側面という章タイトルですが、社内コミュニケーションなどの政治的な話ではありません。

ユーザー体験とチーム構成の節は、技術的な内容というよりは、ビジネス理解やチームビルディングについての話となっています。

チーム構成の節の中にデータサイエンティストがエンドツーエンドで機械学習システムの面倒を見るべきか論争があります。フルスタックのデータサイエンティストが成功を収めるかは、ツールとインフラに依存すると述べられています。NetflixUberが自前の機械学習プラットフォームを作成してる理由がこういった思想の元にあるそうです。

XAI(Explainable AI)についての節を筆者は本書の最も重要なトピックとしています。XAIは概念的な説明にとどまる書籍が多い中、バイアス発見方法やモデルカードの作成など実践的な内容にまで触れられています。

付録.機械学習システムを外部に提供する

株式会社JDSCの宮川大輔氏が書いた、日本語版オリジナルの内容となっています。

付録として扱われていますが、26ページというボリュームで読み応えのある節です。「AI」を通じて業務改善したい企業の要求に対して、外部委託のデータサイエンティストとして取り組む事例の話です。

書かれているモデルケースが妙にリアルで、現場データサイエンティストの苦労を知ってる人が書いてる文章だなというのがヒシヒシと伝わってきました。

機械学習システムデザインという技術色が強い本の付録がソフトスキルに関わる内容で、データサイエンティストという職業に求められるスキルの幅広さを感じました。

HHKBキーボードの修理サポートが最高だった

自分はキーボードに HHKB を愛用しています。プライベートでも仕事でも HHKB キーボードを使用していて、毎日 HHKB キーボードを使っています。

今回、動作異常が発生したので製造元の PFU に修理対応をしてもらったところ、サポートが最高だったので感謝の気持ちをこめて記事を残します。
問い合わせから修理済みキーボードが届くまで3日という早さで、保証期間外にも関わらず無償で修理・清掃を行ってもらえました。

修理の流れ

製造会社のPFUのFAQに修理について記載があります。

故障したようですが、修理はできますか? | Happy Hacking Keyboard よくあるご質問

問い合わせから修理完了までは以下の流れです

  • PFUサポートフォームに問い合わせ
  • PFUサポートセンターの方からメールが来るので、キーボードを指定住所に配送する(送料はユーザー持ち)
  • PFUサポートセンターでの診断
    • 簡易な調整・清掃で改善する事象

      症状改善後に返却

    • 簡易な調整・清掃で改善しない事象

      製品交換対応

      • 保証期間内(購入から1年間)は無償交換
      • 保証期間外は有償交換
  • PFUサポートセンターからキーボードが配送される(送料はPFU持ち)

問い合わせから修理完了まで

自分の不注意でコーヒーをキーボードの左端にこぼしました。その結果、SHIFTキーが機能しなくなり、caps lockが解除できない症状が起きました。

HHKB ProfessionalHYBRID Type-S を使っていたのですが、2年半前に購入して保証期間外で、自分の責任による故障だったので有償交換になると思ってました。

自分の修理にかかった時間は以下のようになりました

  • 8/28 : サポートフォームから問い合わせ・サポートセンターからの返信メールを確認
  • 8/29 : キーボードをPFUプロダクトサポート部に配送
  • 8/30 : サポートセンターからの診断結果メールを確認
  • 8/31 : 修理済みキーボードが手元に届く

問い合わせしてから修理済みのキーボードが届くまで3日という、驚きのサポートの早さでした。

清掃を行ってくれたので新品のように綺麗になっていました。

診断メールでは、基盤部分のコーヒー痕の写真を送ってくれて、今後起きうる腐食による動作異常の可能性を指摘してくれました。

診断写真

修理にかかったお金はキーボードの配送料だけでした。(PFUからのキーボード配送料はPFU持ち)

所感

保証期間外にも関わらず、無償で修理・清掃をしてくれたサポートの手厚さに感動しました。

何より問い合わせから修理済みキーボードが手元に届くまでが早かったのが素晴らしいです。

HHKB キーボードが使えなかったのは4日ほどで、その間Mac Bookの付属キーボードを使っていたのですが手首が痛くなりました。

ただのいちユーザーの感想ですが、HHKB キーボード最高だと改めて思いました。

パケットをゼロから自作してPythonのsocket通信で送る


ゼロから自作したパケットをpythonのsocket通信を用いてサーバーに送信します。
SYNフラグのパケットをサーバーに送り、SYN, ACKフラグのパケットが返ってくるところまで確認します。
実行環境

$ cat /etc/lsb-release
DISTRIB_ID=Ubuntu
DISTRIB_RELEASE=20.04
DISTRIB_CODENAME=focal
DISTRIB_DESCRIPTION="Ubuntu 20.04.6 LTS"

$ uname -srm
Linux 5.4.0-156-generic x86_64

本文中コード: code-for-blogpost/raw_packet at main · nsakki55/code-for-blogpost · GitHub

目次

networkの作成

LinuxのNetwork Namespace機能を使って、図のような仮想ネットワークを作成します。

network構成
コマンドはLinuxで動かしながら学ぶTCP/IPネットワーク入門 を参考にしています。

Network Namespaceを作成します

$ sudo ip netns add ns1
$ sudo ip netns add ns2

2つのNetwork Namespaceを繋ぐvethインターフェイスを作成します

$ sudo ip link add ns1-veth0 type veth peer name ns2-veth0

vethインターフェイスをNetwork Namespaceに所属させます

$ sudo ip link set ns1-veth0 netns ns1
$ sudo ip link set ns2-veth0 netns ns2

ネットワークインターフェイスをupの状態に設定します

$ sudo ip netns exec ns1 ip link set ns1-veth0 up
$ sudo ip netns exec ns2 ip link set ns2-veth0 up

IPアドレスを設定します

$ sudo ip netns exec ns1 ip address add 102.0.2.1/24 dev ns1-veth0
$ sudo ip netns exec ns2 ip address add 102.0.2.2/24 dev ns2-veth0

次にMACアドレスを設定します

$ sudo ip netns exec ns1 ip link set dev ns1-veth0 address 00:00:5E:00:53:01
$ sudo ip netns exec ns2 ip link set dev ns2-veth0 address 00:00:5E:00:53:02

IPアドレス192.0.2.2から192.0.2.1へpingコマンドを実行し、結果が返ってくることを確認できます

$ sudo ip netns exec ns2 ping -c 3 192.0.2.1 -I 192.0.2.2
PING 192.0.2.1 (192.0.2.1) 56(84) bytes of data.
64 bytes from 192.0.2.1: icmp_seq=1 ttl=64 time=1.20 ms
64 bytes from 192.0.2.1: icmp_seq=2 ttl=64 time=0.056 ms
64 bytes from 192.0.2.1: icmp_seq=3 ttl=64 time=0.052 ms

--- 192.0.2.1 ping statistics ---
3 packets transmitted, 3 received, 0% packet loss, time 2015ms
rtt min/avg/max/mdev = 0.052/0.434/1.196/0.538 ms

serverの作成

作成したPacketを受信するためのサーバーを用意します。

socketライブラリのドキュメントに記載されているecho serverのコードを使用します。

socket --- 低水準ネットワークインターフェース — Python 3.11.5 ドキュメント

network namespace ns1で起動するserver.pyです

import socket

HOST = '0.0.0.0' 
PORT = 54321
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
    s.bind((HOST, PORT))
    s.listen(1)
    conn, addr = s.accept()
    with conn:
        print('Connected by', addr)
        while True:
            data = conn.recv(1024)
            if not data: break
            conn.sendall(data)

サーバーを起動します

$ sudo ip netns exec ns1 python3 server.py

packetの作成

Ethernet フレーム

作成したethernetフレームをPythonで表すと次のようになります。

ethernet_frame  = b'\x00\x00\x5e\x00\x53\x01' # Destination MAC Address
ethernet_frame += b'\x00\x00\x5e\x00\x53\x02' # Source MAC Address
ethernet_frame += b'\x08\x00'                 # Protocol-Type

IP ヘッダ

IPヘッダ

上図のIPヘッダを作成します。各フィールドの値を設定していきます。

  • Version

    4bit. IPヘッダのバージョン番号

    IPのバージョン番号はIANAのサイトで確認できます。

    IPv4を使用するので 4 となります。

  • IHL

    4bit. Internet Header Lengthの略. IPヘッダ自体の大きさ.単位は4オクテット(32bit)

    オプションを持たないIPパケットの場合は 5 となります。

    5 × 4オクテット = 20オクテットがオプション無しのIPパケットのヘッダ長。

    今回はオプションを持たないので、 5 となります。

  • DSCP, ECN

    8bit. 送信してるIPパケットの優先度・輻輳制御を表す.DSCPが6bit, ECNが2bit使用.

    DSCP, ECN

    DSCPはIPパケットの優先度を表します(RFC2474)。DSCPの値に応じてクラスセレクタ(CS)という名前がつけられています。
    今回はBestEffortの0を用います。


    (引用: https://ja.wikipedia.org/wiki/Type_of_Service)

    ECNは輻輳(ネットワークの混雑)制御の状態を表します(RFC3168)。2bitは2つの要素で構成されます。

    • ECT: 上位層のプロトコルがECNに対応してるか
    • CE: ネットワーク機器が輻輳状態の場合1となる

    今回はECN非対応とします。

    DSCPとECNの値を合わせて、DSCP,ECNフィールドは16進数表記で 00 となります。

  • Total Length

    16bit. IPヘッダとIPデータを加えたパケット全体のオクテット長

    IPヘッダの長さが20 byte, TCPヘッダの長さが20 byteです。今回はペイロードなしで送るので、20+20 = 40byteとなります。16進数表記で 0028 となります。

  • Identification

    16bit. パケットを分割した時、分割パケット(フラグメント)を復元するための識別子

    今回は自分で決めた値 abcd とします。

  • Flags

    3bit. パケットの分割(フラグメント)を制御するフラグ

    3つのbitは次の意味を持ちます

    • 0 bit: 未使用. 0 を指定する必要がある(Reserved)
    • 1 bit: 分割してよいかを指示する (Don’t Fragment)
      • 0: 分割可能
      • 1: 分割不可能
    • 2 bit: 分割されたパケットの場合、最後のパケットかを表す (More Fragment)
      • 0: 最後のフラグメント
      • 1: 途中のフラグメント

    今回は分割可能不可能とします。bitで表現すると 010 となります。

    16進数で表記するため、次のFragment Offsetに合わせて16進数表記に変換します。

  • Fragment Offset

    13bit. パケットが分割された場合、オリジナルデータのどこに位置していたか(オフセット)を表す

    今回はパケット分割を行わないため、全て0とします。

    Flags, Fragment Offset

    16進数に変換するため、FlagsとFragment Offsetのbitを並べます。

    Flagsは 010, Fragment Offsetは 0 0000 0000 0000 です。2つのフィールドを合わせたbitを16進数に変換すると 4000 となります。

  • TTL

    8bit. パケットが通過できるルーターの最大数.ルーターを通過するたびに 1 減少する

    今回は64回とします。16進数表記は 40 です。

  • Protocol

    8bit. IPヘッダの次のヘッダのプロトコルを表す

    Protocolの一覧をIANAのサイトで確認できます。

    今回はTCPを使用するので6となり、16進数表記で 06 となります。

  • Header Checksum

    16bit. IPヘッダにエラーがないかのチェック

    IPヘッダ作成の最後に計算します。

  • Source Address

    32bit. 送信元IPアドレス

    送信元IPアドレス192.0.2.2を16進数に変換すると、 c0 00 02 02 となります。

  • Destination Address

    32bit. 送信先IPアドレス

    送信先IPアドレス192.0.2.1を16進数に変換すると、 c0 00 02 01 となります。

IPヘッダ checksumの計算

マスタリングTCP/IP入門編 第4章でchecksumの計算方法を次のように記しています。

まずチェックサムのフィールドを0にして、16ビット単位で1の補数の和を求めます。
そして、求まった値の1の補数をチェックサムフィールドに入れます。

自分はこの文章から具体的な計算方法を理解できませんでした。

こちらのブログ記事のchecksumの計算方法の説明が分かりやすいです。この記事を参考にしてchecksumの計算を行います。

【TCP/IP】チェックサムを計算していく - helloworlds

checksum計算手順

  1. checksumを0にして、IPヘッダの16進数を16bitずつ足していく
  2. 計算した16進数の和を2進数に変換する
  3. 16bitを超える(桁上がった)bitと、16bitで区切れた値を足す
  4. 1の補数にする

    3で求めた2進数を反転させる。

  5. 反転させた2進数を16進数に変換する

1. checksumを0にして、IPヘッダの16進数を16bitずつ足していく
checksum以外の作成したIPヘッダの値を並べます

4: version
5: IHL
00: DSCP, ECN
0028: Total Length
abcd: Identification
4000: Flags
40: TTL
06: Protocol
0000: checksum (0に設定する)
c000 0202: Source Address
c000 0201: Destination Address

16bit区切りで並べます。

4500 0028 abcd 4000 4006 0000 c000 0202 c000 0201

16bitずつ足し合わせます

4500 + 0028 + abcd + 4000 + 4006 + 0000 + c000 + 0202 + c000 + 0201
= 2f4fe

2. 計算した16進数の和を2進数に変換する

16進数 2f4fe
2進数  10 1111 0100 1111 1110

3. 16bitを超える(桁上がった)bitと、16bitで区切れた値を足す
16bitを超えた部分は 10 となり、16bitで区切れた値 1111 0100 1111 1110 に足し合わせます

10 + 1111 0100 1111 1110
= 1111 0101 0000 0000

4. 1の補数にする
補数については次のような値を意味します

「補数(complement)」とは、「元の数」と「補数」を足した場合に桁上がりが発生する数のうち「最小」の数のことです。

補数表現とは?1の補数と2の補数の違いと計算方法まとめ | サービス | プロエンジニア

2進数の場合、元の数を反転するだけで補数を求められます。

3で計算したbitを反転させます

1111 0101 0000 0000
  ↓    ↓    ↓   ↓
0000 1010 1111 1111

5. 反転させた2進数を16進数に変換する

0000 1010 1111 1111
→ 0aff

IPヘッダのchecksumは 0aff となります。

作成したIPヘッダをPythonで表すと次のようになります。

ip_header  = b'\x45\x00\x00\x28'  # Version, IHL, DSCP, ECN, Total Length
ip_header += b'\xab\xcd\x40\x00'  # Identification, Flags, Fragment Offset
ip_header += b'\x40\x06\x0a\xff'  # TTL, Protocol, Checksum
ip_header += b'\xc0\x00\x02\x02'  # Source Address
ip_header += b'\xc0\x00\x02\x01'  # Destination Address

TCP ヘッダ

TCPヘッダ

上図のようなTCPヘッダを作成します。各フィールドの値を設定します。

  • Source Port

    16bit. 送信元ポート番号

    送信元ポート番号 12345 を16進数に変換すると、 30 39 となります。

  • Destination Port

    16bit. 送信先ポート番号

    送信先ポート番号 54321 を16進数に変換すると、 d4 31 となります。

  • Sequence Number

    32bit. 送信データの順番や欠落を検出するためのシーケンス番号

    • 初期値をランダムに設定
    • 送信したデータのオクテット数だけ値をインクリメントする
    • SYN, FIN パケットはデータを含んでいなくても1オクテット分インクリメントする

    今回は初期値を0とし、16進数表記で 00 00 00 00 となります。

  • Acknowledgement Number

    32bit. 次に受診すべきデータのシーケンス番号

    データ送信側は返された確認応答番号から正常に通信が行われたか確認できます。

    今回は0とし、16進数表記で 00 00 00 00 となります。

  • Data Offset

    4bit. TCPヘッダの長さを表す.単位は4オクテット(32bit)

    オプションを持たないTCPヘッダの場合は 5 となる。

    5 × 4オクテット = 20オクテットがオプション無しのTCPヘッダ長となる。

    今回はオプションを持たないため、ヘッダ長は16進数表記で 5 となります。

  • Reserved

    3bit. 将来の拡張のために用意されてるフィールド

    0 が入ります。

  • Control Flag

    9bit. 制御ビット. RFC 3540 で追加されたNC(Nonce Sum)を含みます。

    9つのbitは次の意味を持ちます

    • 0 bit: NC. TCP拡張機能を使う
    • 1 bit: CWR. 輻輳ウィンドウ減少. 輻輳ウィンドウ減少を通知
    • 2 bit: ECE. ECN-Echo. 輻輳の発生を相手に通知
    • 3 bit: URG. 緊急フラグ. 緊急データが含まれるセグメントを通知
    • 4 bit: ACK. 応答確認フラグ. データ受信を相手に知らせる
    • 5 bit: PSH. 転送強制フラグ. 受信したデータをすぐにアプリケーションに渡す
    • 6 bit: RST. リセットフラグ. 異常検知した時に通信を中断する
    • 7 bit: SYN. 同期フラグ. 通信の確立
    • 8 bit: FIN. 転送終了フラグ. 通信の終了

    SYNパケットを送るためSYNフラグを 1 とし、その他は 0 とします。

    Data Offset, Reserved, Control Flagsのbit列をまとめて16進数表記にすると 50 02 となります。

    Data Offset, Reserved, Control Flags

  • Window Size

    16bit. 受信可能なデータサイズ(オクテット数)を表す

    今回は適当に 64240 byteとし、16進数表記で fa f0 となります。

  • Checksum

    16bit. TCPヘッダにエラーがないかのチェック

    TCPヘッダ作成の最後に計算します。

  • Urgent Pointer

    16bit. 緊急を要するデータが入るポインタを示す.Contorl FlagのURGが1のときに有効となる

    今回はURGは0としてるので、Urgent Pointerは0となり、16進数表記で 00 00 となります。

TCPヘッダ checksumの計算

TCPヘッダのchecksumの計算には、TCP擬似ヘッダを使用します。

TCPヘッダのchecksumの計算はこちらの記事がわかりやすいです。

Windows計算機でTCPチェックサムを計算する方法:ITエンジニア兼きもの屋のフリーライフ:エンジニアライフ

checksumの計算手順

  1. TCP擬似ヘッダを作成
  2. TCP擬似ヘッダとchecksumを0にしたTCPヘッダを組み合わせる
  3. 組み合わせた16進数を16bitずつ足していく
  4. 計算した16進数の和を2進数に変換する
  5. 16bitを超える(桁上がった)bitと、16bitで区切れた値を足す
  6. 1の補数にする

    5で求めた2進数を反転させる。

  7. 反転させた2進数を16進数に変換する

1. TCP擬似ヘッダを作成

TCP擬似ヘッダ
TCP擬似ヘッダに必要な項目

c000 0202: Source Address
c000 0201: Destination Address
00: padding
06: Protocol
0014: TCP Header length 5×4=20(10進数) → 14(16進数)

16bit区切りで並べます。これがTCP擬似ヘッダです。

c000 0202 c000 0201 0006 0014

2. TCP擬似ヘッダとchecksumを0にしたTCPヘッダを組み合わせる

TCPヘッダを作成します。

3039: Source Port
d431: Destination Port
00000000: Sequence Number
00000000: Acknowledgement Number
5: Data Offset
002: Reserved, Control Flag
faf0: Window Size
0000: checksum. 0を設定
0000: Urgent Pointer

16bit区切りで並べます。

3039 d431 0000 0000 0000 0000 5002 7110 0000 0000

TCP擬似ヘッダとTCPヘッダを組み合わせます

c000 0202 c000 0201 0006 0014 3039 d431 0000 0000 0000 0000 5002 faf0 0000 0000

3. 組み合わせた16進数を16bitずつ足していく

c000 + 0202 + c000 + 0201 + 0006 + 0014 + 3039 + d431 + 0000 + 0000 + 0000 + 0000 + 5002 + faf0 + 0000 + 0000
= 3d379

4. 計算した16進数の和を2進数に変換する

16進数 3d379
2進数  11 1101 0011 0111 1001

5. 16bitを超える(桁上がった)bitと、16bitで区切れた値を足す

11 + 1101 0011 0111 1001
= 1101 0011 0111 1100

6. 1の補数にする

bitを反転し補数を求めます

1101 0011 0111 1100
  ↓    ↓    ↓   ↓
0010 1100 1000 0011

7. 反転させた2進数を16進数に変換する

0010 1100 1000 0011
→ 2c83

TCPヘッダのchecksumは 2c 83 となります。

作成したTCPヘッダをPythonで表すと次のようになります。

tcp_header  = b'\x30\x39\xd4\x31' # Source Port, Destination Port
tcp_header += b'\x00\x00\x00\x00' # Sequence Number
tcp_header += b'\x00\x00\x00\x00' # Acknowledgement Number
tcp_header += b'\x50\x02\xfa\xf0' # Data Offset, Reserved, Flags, Window Size
tcp_header += b'\x2c\x83\x00\x00' # Checksum, Urgent Pointer

自作Packetを送信する

自作パケットをsocket通信で送信します。

socket通信で生パケットを送るには、socket作成時に socket.SOCK_RAW を渡します。

bindにはnetwork namespce ns2のネットワークインターフェイスを指定します。

import socket

s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW)
s.bind(("ns2-veth0", 0))

ethernet_frame  = b'\x00\x00\x5e\x00\x53\x01' # Destination MAC Address
ethernet_frame += b'\x00\x00\x5e\x00\x53\x02' # Source MAC Address
ethernet_frame += b'\x08\x00'                 # Protocol-Type

ip_header  = b'\x45\x00\x00\x28'  # Version, IHL, DSCP, ECN, Total Length
ip_header += b'\xab\xcd\x40\x00'  # Identification, Flags, Fragment Offset
ip_header += b'\x40\x06\x0a\xff'  # TTL, Protocol, Checksum
ip_header += b'\xc0\x00\x02\x02'  # Source Address
ip_header += b'\xc0\x00\x02\x01'  # Destination Address

tcp_header  = b'\x30\x39\xd4\x31' # Source Port, Destination Port
tcp_header += b'\x00\x00\x00\x00' # Sequence Number
tcp_header += b'\x00\x00\x00\x00' # Acknowledgement Number
tcp_header += b'\x50\x02\xfa\xf0' # Data Offset, Reserved, Flags, Window Size
tcp_header += b'\x2c\x83\x00\x00' # Checksum, Urgent Pointer

packet = ethernet_frame + ip_header + tcp_header

s.send(packet)

通信結果を確認するため、tcpdumpでnamespace ns1 の通信内容を保存します

$ sudo ip netns exec ns1 tcpdump -w result -tnl -vv 'tcp'

自作した16進数表記のEthernetフレーム・IPヘッダ・TCPヘッダを結合したPacketを送ります。

$ sudo ip netns exec ns2 python3 raw_packet.py

Wiresharkで通信を確認

wiresharktcpdumpした結果を表示します。

IPアドレス192.0.2.2のクライアントから192.0.2.1のサーバーに対してSYNパケットが送られています。サーバーからは3-way-handshakeのSYN, ACKパケットが返ってきています。

クライアント側ではSYN, ACKパケットを扱う機能を用意していないため、通信を中断するRSTパケットが送られています。

SYNパケットの内容を確認します。

Ethernetフレーム、IPヘッダ、TCPヘッダの各種フィールドに自分で設定した値が入ってることを確認できます。

Frame 1: 54 bytes on wire (432 bits), 54 bytes captured (432 bits)
Ethernet II, Src: ICANNIAN_00:53:02 (00:00:5e:00:53:02), Dst: ICANNIAN_00:53:01 (00:00:5e:00:53:01)
    Destination: ICANNIAN_00:53:01 (00:00:5e:00:53:01)
        Address: ICANNIAN_00:53:01 (00:00:5e:00:53:01)
        .... ..0. .... .... .... .... = LG bit: Globally unique address (factory default)
        .... ...0 .... .... .... .... = IG bit: Individual address (unicast)
    Source: ICANNIAN_00:53:02 (00:00:5e:00:53:02)
        Address: ICANNIAN_00:53:02 (00:00:5e:00:53:02)
        .... ..0. .... .... .... .... = LG bit: Globally unique address (factory default)
        .... ...0 .... .... .... .... = IG bit: Individual address (unicast)
    Type: IPv4 (0x0800)

Internet Protocol Version 4, Src: 192.0.2.2, Dst: 192.0.2.1
    0100 .... = Version: 4
    .... 0101 = Header Length: 20 bytes (5)
    Differentiated Services Field: 0x00 (DSCP: CS0, ECN: Not-ECT)
        0000 00.. = Differentiated Services Codepoint: Default (0)
        .... ..00 = Explicit Congestion Notification: Not ECN-Capable Transport (0)
    Total Length: 40
    Identification: 0xabcd (43981)
    010. .... = Flags: 0x2, Don't fragment
        0... .... = Reserved bit: Not set
        .1.. .... = Don't fragment: Set
        ..0. .... = More fragments: Not set
    ...0 0000 0000 0000 = Fragment Offset: 0
    Time to Live: 64
    Protocol: TCP (6)
    Header Checksum: 0x0aff [validation disabled]
    [Header checksum status: Unverified]
    Source Address: 192.0.2.2
    Destination Address: 192.0.2.1

Transmission Control Protocol, Src Port: 12345, Dst Port: 54321, Seq: 0, Len: 0
    Source Port: 12345
    Destination Port: 54321
    [Stream index: 0]
    [Conversation completeness: Incomplete (35)]
    [TCP Segment Len: 0]
    Sequence Number: 0    (relative sequence number)
    Sequence Number (raw): 0
    [Next Sequence Number: 1    (relative sequence number)]
    Acknowledgment Number: 0
    Acknowledgment number (raw): 0
    0101 .... = Header Length: 20 bytes (5)
    Flags: 0x002 (SYN)
    Window: 64240
    [Calculated window size: 64240]
    Checksum: 0x2c83 [correct]
    [Checksum Status: Good]
    [Calculated Checksum: 0x2c83]
    Urgent Pointer: 0
    [Timestamps]

以上、自作パケットをsocket通信でサーバーに送ることができました。

参考

Pythonメモリプロファイルツールmemrayを使う


Pythonのメモリプロファイルを行うためにmemrayを使用したところ、インストールからプロファイル実行まで手軽にでき、レポート方法が豊富で使い勝手が良かったので使用方法を紹介します。

本文中コード:code-for-blogpost/memray at main · nsakki55/code-for-blogpost · GitHub

Memrayとは

2022年4月にBloombergが公開したPythonのメモリプロファイルツールです。

公式ドキュメント:https://bloomberg.github.io/memray/

公式ドキュメントではMemrayの特徴を以下のように記述しています。

  • トレーシングプロファイルを行うため、呼び出しスタックを正確に表現できる
  • C/C++ライブラリのネイティブ呼び出しも扱えるため、結果には全ての呼び出しスタックが含まれる
  • 高速
  • メモリ使用に関するさまざまなレポートを生成できる
  • Pythonスレッドと連携
  • ネイティブスレッド(ネイティブ拡張機能内のC++スレッド)と連携

対応環境

検証環境

$sw_vers
ProductName:    macOS
ProductVersion: 11.3.1
BuildVersion:   20E241
$ python --version
Python 3.8.7

インストール

pipでインストールすることができます。

$ pip install memray

実行方法

メモリプロファイル実行

run コマンドでプロファイルしたいファイル、モジュールを指定することでメモリプロファイルを実行できます。
The run subcommand - memray

memray run [options] {対象ファイル} [args]
memray run [options] -m {module} [args]

実行が完了すると memray-<script>.<pid>.bin というファイル名で、実行結果のバイナリファイルが作成されます。

memrayでは run コマンドで作成したバイナリファイルをもとに、メモリ使用に関するレポートを作成します。

試しにiris dasasetをLightGBMで学習するスクリプト train.pyでメモリプロファイルを実行してみます。

from typing import Tuple

import lightgbm as lgb
import numpy as np
from sklearn import datasets
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split

def load_dataset() -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
    iris = datasets.load_iris()
    X, y = iris.data, iris.target
    X_train, X_test, y_train, y_test = train_test_split(X, y)

    return X_train, X_test, y_train, y_test

def train(X_train: np.ndarray, y_train: np.ndarray) -> lgb.LGBMClassifier:
    model = lgb.LGBMClassifier()
    model.fit(X_train, y_train)

    return model

def validate(model: lgb.LGBMClassifier, X_test: np.ndarray, y_test: np.ndarray) -> None:
    y_prob = model.predict_proba(X_test)
    y_pred = np.argmax(y_prob, axis=1)

    accuracy = accuracy_score(y_test, y_pred)
    print(f"accuracy: {accuracy}")

def main() -> None:
    X_train, X_test, y_train, y_test = load_dataset()
    model = train(X_train, y_train)
    validate(model, X_test, y_test)

if __name__ == "__main__":
    main()

memray run を実行すると、train.pyのスクリプトが実行され、バイナリファイルが作成されます。

$ memray run train.py
...
[memray] Successfully generated profile results.

You can now generate reports from the stored allocation records.
Some example commands to generate reports:

/Users/satsuki/.pyenv/versions/3.8.7/envs/data-science/bin/python3.8 -m memray flamegraph memray-train.py.99131.bin

実行が完了すると、memray-train.py.99131.binというファイルが作成されます。

結果をリアルタイム出力

run コマンドに —live 引数をつけることで、実行中のスクリプトのメモリ使用状況を見ることができます。
Live Reporting - memray

$ memray run --live {対象ファイル}

train.pyのメモリ使用状況をリアルタイム出力してみます。

$ memray run --live train.py

run live

以下の指標が出力されます

  • Total Memory
    • 関数とその内部で呼び出されてる関数に割り当てられた累積メモリ量
  • Own Memory
    • 関数に直接割り当てられたメモリ量
  • Allocation Count
    • 関数とその内部で呼び出されてる関数によって解放されていない累積割り当て数

--live-remote 引数を加えることで、プロファイラを実行するコンソールとは別のコンソールで結果をリアルタイム出力できます。

$ memray run --remote-live {対象ファイル}

実行すると、別コンソールで結果を出力するためのport番号が表示されます。

$ memray run --remote-live train.py
Run 'memray3.8 live 60627' in another shell to see live results

port番号を使用して、別コンソールで結果を表示することができます

$ memray live <port>

train.pyの実行では、以下のコマンドで結果を確認することができます。

$ memray live 60627

summaryを出力

run コマンドで作成した結果のバイナリファイルを使用して、メモリ使用状況のsummaryをCLIで表示できます。
Summary Reporter - memray

$ memray summary {結果バイナリファイル}

プロセスのメモリ使用量がピーク時のメモリ使用状況の概要を見ることができます。

train.pyのプロファイル結果を使用して、summaryを表示してみます。

$ memray summary memray-train.py.99131.bin

以下のように、コンソールにプロファイル結果の概要が出力されます。
—live 引数でリアルタイム出力した結果と同様の指標が表示されます。

summary report

Flame Graphで結果を出力

Flame Graph形式でプログラムがどの部分でメモリを使用してるか視覚的に確認することができます。
Flame Graph Reporter - memray

$ memray flamegraph {結果バイナリファイル}

メモリ使用状況の時間変化や、メモリ使用量が多い関数を視覚的に表したFlame Graphを作成します。

train.pyのプロファイル結果を使用して、Flame Graphを作成してみます。

$ memray flamegraph memray-train.py.99131.bin
Wrote memray-flamegraph-train.py.99131.html

memray-flamegraph-train.py.99131.html ファイル名のHTMLが作成されます。

出力されたHTMLを開くと、以下のように実行結果を視覚的に確認できます。

flame graph report

メモリ使用状況のグラフをクリックすると、詳細を見ることができます。

memory graph

2種類のメモリサイズが表示されます。

  • Resident size:物理メモリ上に確保されたメモリサイズ
  • Heap size:アプリケーションが確保したメモリサイズ

特定の関数に絞ってメモリ使用状況を確認できます。例えばmain関数をクリックすると、main関数内でのメモリ使用状況を詳しく見ることができます。 train関数で2.8MBのメモリを確保してることが分かります。

memory allocation

表形式で結果を出力

メモリ使用状況を表形式で出力できます。
Table Reporter - memray

$ memray table {結果バイナリファイル}

train.pyのプロファイル結果を使用して、表形式で確認してみます。

$ memray table memray-train.py.99131.bin
Wrote memray-table-train.py.99131.html

memray-table-train.py.99131.html ファイル名のHTMLが作成されます。

以下のように、各処理で確保されたメモリの一覧を表形式で見ることができます。

table

ツリー構造で結果を出力

メモリ使用状況をツリー構造で出力できます。
Tree Reporter - memray

$ memray tree <結果バイナリファイル>

各ノードは関数を表して、関数がどの関数から呼び出されているかが分かります。

train.pyのプロファイル結果を使用して、ツリー構造を出力してみます。

$ memray tree memray-train.py.99131.bin

以下のように、関数ごとのメモリ使用状況をツリー構造で表した結果がコンソールに出力されます。

tree

メモリ使用の統計情報を出力

メモリプロファイルの統計情報を出力できます。
Stats Reporter - memray

$ memray stats <結果バイナリファイル>

train.pyのプロファイル結果を使用して、statsコマンドを実行します。

$ memray stats memray-train.py.99131.bin

6つの値が出力されます

  • メモリ割り当て回数
  • 割り当てられたメモリ合計
  • 割り当てメモリサイズのヒストグラム
  • 割り当てタイプの分布 (MALLOC、CALLOC、MMAP…etc)
  • 割り当てメモリサイズ上位の処理
  • メモリ割り当て回数上位の処理

実行すると、メモリ使用の統計情報がコンソールに出力されます。

stats

外部ツールが扱える形式で出力

外部ツールが扱えるデータ形式に結果を変換できます。
Transform Reporter - memray

$ memray transform <出力形式> <結果バイナリファイル>

サポートされているファイル形式は2つです。

  • gprof2dot
  • csv

gprof2dot

gprof2dotツールと互換性のあるJSONを出力します。

graphvizを使用することで、メモリ使用量をグラフ表示できます。

train.pyのプロファイル結果を使用して、gprof2dotのグラフを書いてみます。

$ memray transform gprof2dot memray-train.py.99131.bin
Wrote memray-gprof2dot-train.py.99131.json

To generate a graph from the transform file, run for example:
gprof2dot -f json memray-gprof2dot-train.py.99131.json | dot -Tpng -o output.png

実行するとJSONが作成され、graphvizでグラフを書くためのコマンドが出力されます。

graphvizがインストールされていない場合、以下のコマンドでインストールしておきます。

$ brew install graphviz

出力されたコマンドを実行すると、グラフが出力されました。

$ gprof2dot -f json memray-gprof2dot-train.py.99131.json | dot -Tpng -o output.png

gprof2dot

各ノードには以下の値が表示されます

  • total %: 関数とその子ノードの使用メモリの割合
  • self %:関数単体の使用メモリの割合
  • num_allocs:関数単体でのメモリ割り当ての回数

csv

データ分析がしやすいcsv形式で出力します。

train.pyのプロファイル結果を使用して、csvを作成してみます。

memray transform csv memray-train.py.99131.bin

memray-csv-train.py.99131.csv というファイル名で以下のようなcsvが出力されました。

csv

jupyter環境で使用する

jupyterノートブック上で、セルごとのメモリ使用状況をmemrayで可視化できます。
Jupyter Integration - memray

IPythonの拡張プラグインを読み込むことで、利用できるようになります。

以下のコマンドをjupyterノートブックで実行することで、memrayの拡張プラグインを使用できるようになります。

%load_ext memray

上記のコマンドを実行後、 %%memray_flamegraph というマジックコマンドが実行可能になります。このコマンドを、メモリプロファイルを行いたいセルの上部に加えて実行することで、プロファイル結果を表示できます。

jupyter integration

Python APIを使用する

memrayではwithステートメントを利用して、pythonコード中にメモリプロファイルを組み込むことができます。
Memray API - memray

memrayのPythonAPIが提供してる、Trackerクラスをコンテクストマネージャーとして使用することで、部分的にメモリプロファイルを実行できます。

with memray.Tracker("結果バイナリファイル名"):

winステートメントを利用して、先程の学習スクリプトの中でモデル学習部分のみメモリプロファイルを実行してみます。

import lightgbm as lgb
import memray
import numpy as np
from sklearn import datasets
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split

def main():
    iris = datasets.load_iris()
    X, y = iris.data, iris.target
    X_train, X_test, y_train, y_test = train_test_split(X, y)

    with memray.Tracker("memray-train.bin"):
        model = lgb.LGBMClassifier()
        model.fit(X_train, y_train)
        y_prob = model.predict_proba(X_test)
        y_pred = np.argmax(y_prob, axis=1)

        accuracy = accuracy_score(y_test, y_pred)
        print(f"accuracy: {accuracy}")

if __name__ == "__main__":
    main()

memray-train.bin というファイル名で結果が出力されます。

このバイナリファイルを使用して、flamegraph形式で結果を出力してみると、モデル学習部分のメモリ使用状況が可視化されました。

$ memray flamegraph memray-train.bin

python api

まとめ

メモリプロファイラツールmemrayを紹介しました。
Pythonメモリプロファイルツールで有名なのはmemory-profilerですが、結果のレポート方法が豊富で、視覚的に分かりやすい memrayを利用してみてはいかがでしょうか。
本文中コード:code-for-blogpost/memray at main · nsakki55/code-for-blogpost · GitHub

参考

GitHub ActionsでPrefectのdeploymentを一括登録する

概要

Prefectを本番運用する場合、Flowの実行設定であるDeploymentをPrefect Cloudに登録する必要があります。

Prefectでワークフローを運用する場合、FlowとDeploymentをGitHubレポジトリでコード管理するケースが多いと思います。

この記事では、レポジトリ管理している複数のDeploymentを、並列でPrefect Cloudに登録するGitHub Actionsを解説します。

コード

github.com

構成

.
├── .github/workflows/deploy_deployment.yaml # deployment登録用 GitHub Actions
│
├── deployments # deploymentをまとめたフォルダ
│   ├── flow1_deployment.py
│   └── flow1_deployment.py
│   
├── flows # flowをまとめたフォルダ
│   ├── flow1.py
│   └── flow2.py
│
└── requirements.txt 

deploymentsフォルダの中にdeploymentファイルをまとめたフォルダ構成を想定します。

deploymentファイルはyaml形式pythonスクリプトのいずれかで記述できますが、今回は以下のようなpythonスクリプトで記述しています。

flowsディレクトリ以下のflow関数を呼び出しています。

prefect-github-actions/deployments/flow1_deployment.py at main · nsakki55/prefect-github-actions · GitHub

from flows.flow1 import hello_flow
from prefect.deployments import Deployment

deployment = Deployment.build_from_flow(
    flow=hello_flow,
    name="flow1-deployment",
)

deployment.apply()

コード

deploymentsフォルダ以下のdeploymentをPrefect Cloudに一括登録するGitHub Actionsです。

name: Run register prefect deployments

on:
  push:
    branches:
      - main

jobs:
  list-deployments:
    name: List prefect deployments
    runs-on: ubuntu-latest
    outputs:
      prefect_deployments: ${{ steps.set-matrix.outputs.deployments }}
    steps:
      - name: Checkout
        uses: actions/checkout@v3
      - id: set-matrix
        run: |
            echo "deployments=$(ls deployments/*.py | jq -R -s -c 'split("\n")[:-1]')" >> $GITHUB_OUTPUT

  deploy:
    name: Deploy
    needs: list-deployments
    runs-on: ubuntu-latest
    strategy:
      matrix:
        deployments: ${{ fromJson(needs.list-deployments.outputs.prefect_deployments) }}
    steps:
      - uses: actions/checkout@v3

      - name: Set up Python 3.8
        uses: actions/setup-python@v2
        with:
          python-version: 3.8

      - name: Install Dependencies
        run: |
          pip install pip --upgrade
          pip install -r requirements.txt

      - name: Register deployments to Prefect Cloud
        env:
          PYTHONPATH: :///home/runner/work/prefect-github-actions
          PREFECT_API_KEY: ${{ secrets.PREFECT_API_KEY }}
          PREFECT_API_URL: ${{ secrets.PREFECT_API_URL }}
        run: |
          python ${{ matrix.deployments }}

実行すると、以下のようにdeployments以下のファイルが実行されます。

GitHub Actions実行画面

解説

deployment一覧の取得

  list-deployments:
    name: List prefect deployments
    runs-on: ubuntu-latest
    outputs:
      prefect_deployments: ${{ steps.set-matrix.outputs.deployments }}
    steps:
      - name: Checkout
        uses: actions/checkout@v3
      - id: set-matrix
        run: |
            echo "deployments=$(ls deployments/*.py | jq -R -s -c 'split("\n")[:-1]')" >> $GITHUB_OUTPUT

deploymentsフォルダ以下にある *.py ファイル名の一覧を以下の部分で取得しています。

echo "deployments=$(ls deployments/*.py | jq -R -s -c 'split("\n")[:-1]')" >> $GITHUB_OUTPUT

deploymentの一括登録

deploy:
    name: Deploy
    needs: list-deployments
    runs-on: ubuntu-latest
    strategy:
      matrix:
        deployments: ${{ fromJson(needs.list-deployments.outputs.prefect_deployments) }}
    steps:
      - uses: actions/checkout@v3

      - name: Set up Python 3.8
        uses: actions/setup-python@v2
        with:
          python-version: 3.8

      - name: Install Dependencies
        run: |
          pip install pip --upgrade
          pip install -r requirements.txt

      - name: Register deployments to Prefect Cloud
        env:
          PYTHONPATH: :///home/runner/work/prefect-github-actions
          PREFECT_API_KEY: ${{ secrets.PREFECT_API_KEY }}
          PREFECT_API_URL: ${{ secrets.PREFECT_API_URL }}
        run: |
          python ${{ matrix.deployments }}

matrix構文を使って、deploymentを登録するjobを並列で実行します。

deployments: ${{ fromJson(needs.list-deployments.outputs.prefect_deployments) }}

Prefect Cloudへの認証を行うために、GitHub Secretから2つの環境変数を設定しています。

  • PREFECT_API_KEY
  • PREFECT_API_URL
    • Prefect CloudのworkspaceのAPI Endpoint
    • フォーマット: https://api.prefect.cloud/api/accounts/[ACCOUNT-ID]/workspaces/[WORKSPACE-ID]
    • ACCOUNT-ID, WORKSPACE-IDはPrefect Cloudにログインした時のURLから確認できます

      https://app.prefect.cloud/account/[ACCOUNT-ID]/workspaces/[WORKSPACE-ID]

レポジトリ直下で python deployments/flow1_deployment.py のようにdeployments以下のpythonファイルを実行するため、PYTHONPATHの設定を行なっています。

PYTHONPATH: :///home/runner/work/prefect-github-actions

参考

PrefectでSub Flowごとに実行環境を分ける方法

概要

PrefectではFlow内部で別のFlowを実行する、Sub Flowという概念があります。
Flows - Prefect Docs

例えばMLの学習パイプラインを

  • 学習データの取得
  • 前処理
  • 学習

のように処理に分ける場合、各処理を1つのFlowとしてグループ化できます。

Sub Flowを呼び出し元Flowから直接実行すると、呼び出し元Flowのインフラ環境で実行されます。

MLの学習パイプラインのように、処理に応じてリソースを分けて実行したいケースが多いと思います。

  • 学習データの取得 (CPU: 1024, MEMORY: 2048)
  • 前処理 (CPU: 512, MEMORY: 1024)
  • 学習 (CPU: 2048, MEMORY: 4096)

Sub Flowごとに実行環境を分ける場合、run_deployment関数を使ってSub Flowを呼び出す方法で実現できます。 Deployments - Prefect Docs

run_deploymentを使用して、Sub Flowごとに異なる実行環境で実行する方法をまとめます。

実行環境
- python 3.8.7
- prefect 2.10.18
- prefect-aws 0.3.5

本文中コード
github.com

前準備

ECS TaskとしてFlowを実行するために、ECS ServiceとしてAgentを起動します。

TerrafromでAgentをホスティングするECS Serviceを立ち上げます。

$ git clone https://github.com/nsakki55/code-for-blogpost
$ cd prefect_subflow_orchestration/src/infra
$ terraform init
$ terraform apply -var-file=my_secret.tfvars

Terrafromの実装はこちらです: code-for-blogpost/prefect_subflow_orchestration/src/infra at main · nsakki55/code-for-blogpost · GitHub

Prefect Cloudにログインを行います。

$ prefect cloud login

ECS Task BlockをPrefect Cloudに登録します。

from prefect_aws.ecs import ECSTask

ecs = ECSTask(
    env={"EXTRA_PIP_PACKAGES": "s3fs"},
    cluster="arn:aws:ecs:ap-northeast-1:*****:cluster/prefect-ecs",
    cpu="256",
    memory="512",
    stream_output=True,
    configure_cloudwatch_logs=True,
    execution_role_arn="arn:aws:iam::*****:role/prefectEcsTaskExecutionRole",
    task_role_arn="arn:aws:iam::*****:role/prefectEcsTaskRole",
    vpc_id="vpc-*****",
    task_customizations=[
        {"op": "replace", "path": "/networkConfiguration/awsvpcConfiguration/assignPublicIp", "value": "DISABLED"},
        {"op": "add", "path": "/networkConfiguration/awsvpcConfiguration/subnets", "value": ["subnet-******"]},
    ],
)
ecs.save("ecs-task-block", overwrite=True)
$ python ecs_task_block.py

今回はFlowのコードをS3で保持するため、S3 BlockをPrefect Cloudに登録します。

import os

from prefect.filesystems import S3

block = S3(
    bucket_path="prefect-subflow",
    aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
    aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"],
)
block.save("s3-block", overwrite=True)
$ python s3_block.py

Sub Flowを作成

別のFlowから呼び出すSub Flowを作成します。

以下のように、3つのFlowをSub Flowとして呼び出してみようと思います。

import time

from prefect import flow, task

@task()
def a():
    print("task A")
    time.sleep(5)

@task()
def b():
    print("task B")
    time.sleep(5)

@task()
def c():
    print("task C")
    time.sleep(5)

@flow(name="Sub Flow1")
def sub_flow1():
    print("sub flow1")
    a()
    b()

@flow(name="Sub Flow2")
def sub_flow2():
    print("sub flow2")
    b()
    c()

@flow(name="Sub Flow3")
def sub_flow3():
    print("sub flow3")
    a()
    c()

Sub Flowを直接実行する

公式ドキュメントで紹介されてる、sub flowを直接呼び出す方法で実行してみます。
Flows - Prefect Docs

以下のように、3つのsub flowを、main flowから呼び出します。

from prefect import flow

from sub_flow import sub_flow1, sub_flow2, sub_flow3

@flow(name="Main Flow")
def main_flow():
    print("main flow")
    sub_flow1()
    sub_flow2()
    sub_flow3()

if __name__ == "__main__":
    main_flow()

main flowのdeploymentをPrefect Cloudに登録します。

作成済みのECS Task Blockと、S3 Blockを使用します。

from main_flow import main_flow
from prefect.deployments import Deployment
from prefect.filesystems import S3
from prefect_aws.ecs import ECSTask

ecs_task_block = ECSTask.load("ecs-task-block")
s3_block = S3.load("s3-block")

deployment = Deployment.build_from_flow(
    flow=main_flow,
    name="main-flow1-deployment",
    work_pool_name="ecs",
    infrastructure=ecs_task_block,
    storage=s3_block,
)

deployment.apply()
$ python main_flow1_deployment.py

Prefect Cloudからmainのflowを実行します。

実行画面を見ると、Sub Flowの詳細をmainのflow画面から確認することができます。

実行されるECS Taskは、main flowが実行されるECS Taskのみで、sub flowがmain flowのECS Taskの環境で実行されています。

Sub Flowごとに実行環境を分ける方法

run_deploymentを用いて、mainのflowからsub flowのdeploymentを実行します。

sub flowのdeploymentをPrefect Cloudに登録します。

作成したECS Task Blockを使用しますが、infra_overrides に値を指定することで、使用するCPU, MEMORYを変更できます。

3つのsub flowごとに異なるリソースを設定します。

from prefect.deployments import Deployment
from prefect.filesystems import S3
from prefect_aws.ecs import ECSTask

from sub_flow import sub_flow1, sub_flow2, sub_flow3

ecs_task_block = ECSTask.load("ecs-task-block")
s3_block = S3.load("s3-block")

deployment_sub_flow1 = Deployment.build_from_flow(
    flow=sub_flow1,
    name="sub-flow1-deployment",
    work_pool_name="ecs",
    infrastructure=ecs_task_block,
    storage=s3_block,
    infra_overrides={"cpu": 512, "memory": 1024},
)

deployment_sub_flow1.apply()

deployment_sub_flow2 = Deployment.build_from_flow(
    flow=sub_flow2,
    name="sub-flow2-deployment",
    work_pool_name="ecs",
    infrastructure=ecs_task_block,
    storage=s3_block,
    infra_overrides={"cpu": 1024, "memory": 2048},
)

deployment_sub_flow2.apply()

deployment_sub_flow3 = Deployment.build_from_flow(
    flow=sub_flow3,
    name="sub-flow3-deployment",
    work_pool_name="ecs",
    infrastructure=ecs_task_block,
    storage=s3_block,
    infra_overrides={"cpu": 2048, "memory": 4096},
)

deployment_sub_flow3.apply()
$ python sub_flow_deployment.py

mainのflowから、登録したsub flowのdeploymentを呼び出します。

run_deployment 関数を使用することで、指定したdeploymentをflowから実行することができます。実行パラメータ、tag、実行スケジュールを引数から設定することができます。
prefect/src/prefect/deployments/deployments.py at 4c8d8e4fe055cec7fa07da80cc21ff08e32f03f6 · PrefectHQ/prefect · GitHub

from prefect import flow
from prefect.deployments import run_deployment

@flow(name="Main Flow")
def main_flow():
    print("main flow")
    run_deployment(name="Sub Flow1/sub-flow1-deployment")
    run_deployment(name="Sub Flow2/sub-flow2-deployment")
    run_deployment(name="Sub Flow3/sub-flow3-deployment")

if __name__ == "__main__":
    main_flow()

mainのflowのdeploymentをPrefect Cloudに登録します。

from prefect.deployments import Deployment
from prefect.filesystems import S3
from prefect_aws.ecs import ECSTask

from main_flow2 import main_flow

ecs_task_block = ECSTask.load("ecs-task-block")
s3_block = S3.load("s3-block")

deployment = Deployment.build_from_flow(
    flow=main_flow,
    name="main-flow2-deployment",
    work_pool_name="ecs",
    infrastructure=ecs_task_block,
    storage=s3_block,
)

deployment.apply()
$ python main_flow2_deployment.py

Prefect Cloudからmainのflowを実行します。

直接sub flowを呼び出す場合と同様、sub flowの詳細をmainのflow画面から確認することができます。

sub flowごとにECS Taskを起動するため、Sub Flowの実行にダウンタイムがあります。

ECS Taskを確認すると、mainのflowのECS Taskとは別に、sub flowのECS Taskが実行されています。

使用されるCPU・MEMORYは、sub flowごとのdeploymentで設定した値が使用されます。

まとめ

run_deploymentを使用して、sub flowを実行することで、sub flowごとに異なる計算リソースで実行する方法を紹介しました。

MLの学習パイプラインのように、処理単位で必要な計算リソースが異なるワークフローでは、各処理をsub flowにまとめて、別々のリソースを割り当てるのが便利です。

参考