nsakki55 のアウトプットブログ

MLエンジニアの技術周りの備忘録です

SageMakerでハイパーパラメータチューニングをWarmStartで実行してみる

SageMaker Hyperparameter Tuning JobでWarm Startを実行してみます 本文中のコードです

github.com

目次

データセット・モデル

SageMaker Hyperparameter Tuningについて

Hyperparameter Tuning

SagaMaker Hyperparameter Tuning でWarm Startを行う場合、内部で行なっていることは以下の流れになります。

  1. 最適化する指標を設定
  2. 探索するハイパラの設定
  3. 1,2 の設定を元に学習ジョブを作成し、ハイパラを探索

ハイパーパラメータを引数として受けとるように設定したEstimatorの学習ジョブを複数作成し、最適化指標が最もいい学習ジョブを選んでいます。

Estimator作成

学習データの準備

AWSリソースの設定を行います。

import yaml
import sagemaker
import boto3
import json

SETTING_FILE_PATH = "../config/settings.yaml"
DATA_FOLDER_PATH = "../avazu-ctr-prediction"

# AWS リソース設定
with open(SETTING_FILE_PATH) as file:
    aws_info = yaml.safe_load(file)
        
sess = sagemaker.Session()
role = aws_info['aws']['sagemaker']['role']
bucket = aws_info['aws']['sagemaker']['s3bucket']

s3 = boto3.client('s3')

古いデータでハイパラ探索を行った後、新しいデータでWarm Startで探索を行う想定で、データを時系列で二分割します。
それぞれのデータで train, validation データに分割しておきます。

import os

import pandas as pd
from sklearn.model_selection import train_test_split

df_train = (
    pd.read_csv(os.path.join(DATA_FOLDER_PATH, "train.csv"), dtype="object")
    .sort_values(by="hour")
    .reset_index(drop=True)
)

# WarmStartのための親・子データを作成
df_train_parent = df_train[: int(len(df_train) * 0.5)]
df_train_child = df_train[int(len(df_train) * 0.5) :]

# train, validation データに分割
df_train_parent, df_validation_parent = train_test_split(df_train_parent, train_size=0.8, random_state=0, shuffle=False)
df_train_child, df_validation_child = train_test_split(df_train_child, train_size=0.8, random_state=0, shuffle=False)

S3にデータをアップロードしておきます

# S3にアップロード
prefix = 'sagemaker-hyperparameter-tuning'
s3_resource_bucket = boto3.Session().resource("s3").Bucket(bucket)

train_parent_file = "train_parent.csv"
validation_parent_file = "validation_parent.csv"
train_child_file = "train_child.csv"
validation_child_file = "validation_child.csv"

df_train_parent.to_csv(train_parent_file, index=False)
df_validation_parent.to_csv(validation_parent_file, index=False)
df_train_child.to_csv(train_child_file, index=False)
df_validation_child.to_csv(validation_child_file, index=False)

s3_resource_bucket.Object(os.path.join(prefix, "train_parent", train_parent_file)).upload_file(train_parent_file)
s3_resource_bucket.Object(os.path.join(prefix, "validation_parent", validation_parent_file)).upload_file(validation_parent_file)
s3_resource_bucket.Object(os.path.join(prefix, "train_child", train_child_file)).upload_file(train_child_file)
s3_resource_bucket.Object(os.path.join(prefix, "validation_child", validation_child_file)).upload_file(validation_child_file)

学習ジョブに渡すための学習データ、モデルのS3 URIを保持します

output_location = f"s3://{bucket}/{prefix}/output"

s3_train_parent_data = f"s3://{bucket}/{prefix}/train_parent/{train_parent_file}"
s3_validation_parent_data = f"s3://{bucket}/{prefix}/validation_parent/{validation_parent_file}"

s3_train_child_data = f"s3://{bucket}/{prefix}/train_child/{train_child_file}"
s3_validation_child_data = f"s3://{bucket}/{prefix}/validation_child/{validation_child_file}"

学習スクリプト

SageMaker Hyperparameter Tuning で実行する学習スクリプトtrainer.pyを用意します。

SageMaker hyperparameter Tuningで実行する場合、以下の点を実装する必要があります

  • チューニングを行うハイパーパラメータを実行引数に加える
  • 最適化対象のメトリクスが分かるように標準出力する

今回は、alpha, penalty, fit_intercept をチューニングするのでargparseで受け取る引数に追加します。

また、訓練・検証データのloglossとaccuracyをメトリクスとして標準出力するようにします。

import argparse
import os

import joblib
import numpy as np
import pandas as pd
from sagemaker_training import environment
from sklearn.feature_extraction import FeatureHasher
from sklearn.linear_model import SGDClassifier
from sklearn.metrics import accuracy_score, log_loss

feature_columns = [
    "id",
    "click",
    "hour",
    "C1",
    "banner_pos",
    "site_id",
    "site_domain",
    "site_category",
    "app_id",
    "app_domain",
    "app_category",
    "device_id",
    "device_ip",
    "device_model",
    "device_type",
    "device_conn_type",
    "C14",
    "C15",
    "C16",
    "C17",
    "C18",
    "C19",
    "C20",
    "C21",
]

target = "click"


def parse_args():
    env = environment.Environment()
    parser = argparse.ArgumentParser()

    # チューニング対象のハイパーパラメータ
    parser.add_argument("--penalty", type=str, choices=["l1", "l2", "elasticnet"], default="l1")
    parser.add_argument("--alpha", type=float, default=0.00001)
    parser.add_argument("--fit_intercept", type=bool, choices=[True, False], default=True)

    # data directories
    parser.add_argument("--train", type=str, default=os.environ.get("SM_CHANNEL_TRAIN"))
    parser.add_argument("--validation", type=str, default=os.environ.get("SM_CHANNEL_VALIDATION"))

    # model directory
    parser.add_argument("--model_dir", type=str, default=os.environ.get("SM_MODEL_DIR"))

    return parser.parse_known_args()


def load_dataset(path: str) -> (pd.DataFrame, np.array):
    # Take the set of files and read them all into a single pandas dataframe
    files = [os.path.join(path, file) for file in os.listdir(path) if file.endswith("csv")]

    if len(files) == 0:
        raise ValueError("Invalid # of files in dir: {}".format(path))

    raw_data = [pd.read_csv(file, sep=",") for file in files]
    data = pd.concat(raw_data)

    y = data[target]
    X = data[feature_columns]
    return X, y


def preprocess(df: pd.DataFrame):
    feature_hasher = FeatureHasher(n_features=2**24, input_type="string")
    hashed_feature = feature_hasher.fit_transform(np.asanyarray(df.astype(str)))

    return hashed_feature


def start(args):
    print("Training Start.")

    X_train, y_train = load_dataset(args.train)
    X_validation, y_validation = load_dataset(args.validation)

    y_train = np.asarray(y_train).ravel()
    X_train_hashed = preprocess(X_train)

    y_validation = np.asarray(y_validation).ravel()
    X_validation_hashed = preprocess(X_validation)

    hyperparameters = {
        "alpha": args.alpha,
        "penalty": args.penalty,
        "fit_intercept": args.fit_intercept,
        "n_jobs": args.n_jobs,
    }

    model = SGDClassifier(loss="log", random_state=42, **hyperparameters)
    print(model.__dict__)
    model.partial_fit(X_train_hashed, y_train, classes=[0, 1])

    # 最適化メトリクスを標準出力
    print("train logloss: {}".format(log_loss(y_train, model.predict_proba(X_train_hashed))))
    print("train accuracy: {}".format(accuracy_score(y_train, model.predict(X_train_hashed))))
    print("validation logloss: {}".format(log_loss(y_validation, model.predict_proba(X_validation_hashed))))
    print("validation accuracy: {}".format(accuracy_score(y_validation, model.predict(X_validation_hashed))))

    joblib.dump(model, os.path.join(args.model_dir, "model.joblib"))

    print("Training Finished.")


if __name__ == "__main__":
    args, _ = parse_args()
    start(args)

Estimatorクラスの作成

SageMaker Hyperparameter Tuning では異なる引数を与えた学習ジョブを作成するため、学習ジョブのEstimatorを用意する必要があります。

注意点として、Hyperparameter Tuning Jobではlocal modeで実行することはできません。

Error when running hyperparameter tuning job locally · Issue #1960 · aws/sagemaker-python-sdk · GitHub

AWS提供のscikit-learnコンテナ環境で学習スクリプトを実行するため、SKLearnクラスを利用します。

以下のディレクトリ構成でEstimatorクラスを作成します

src
 ├── hyperparameter_tuning_warm_start.ipynb
 │
 └─── model
       │
       └── trainer.py
from time import gmtime, strftime

from sagemaker.sklearn.estimator import SKLearn

parent_train_job_name = "hpt-parent-training-job" + strftime("%Y%m%d%H%M", gmtime())

parent_estimator_parameters = {
    "entry_point": "trainer.py",
    "source_dir": "model",
    "framework_version": "0.23-1",
    "py_version": "py3",
    "instance_type": "ml.m5.large",
    "instance_count": 1,
    "output_path": output_location,
    "role": role,
    "base_job_name": parent_train_job_name,
}

parent_estimator = SKLearn(**parent_estimator_parameters)

親Tuning Jobを実行

メトリクスの設定

Hyperparameter Tuning で監視するメトリクスを設定します。
メトリクスの定義には、各メトリクスの名前と、正規表現を指定します。
以下の正規表現部分は、学習スクリプトに合わせて設定します。
メトリクスを定義します - Amazon SageMaker

metrics_definitions = [
    {"Name": "train loss", "Regex": "train logloss: ([0-9\\.]+)"},
    {"Name": "train accuracy", "Regex": "train accuracy: ([0-9\\.]+)"},
    {"Name": "validation loss", "Regex": "validation logloss: ([0-9\\.]+)"},
    {"Name": "validation accuracy", "Regex": "validation accuracy: ([0-9\\.]+)"},
]

探索パラメータの設定

探索するハイパーパラメータの範囲を事前に設定します。

カテゴリー・連続値・整数値の3種類のパラメータがサポートされています。

ハイパラは学習スクリプトに渡す引数名と同じ変数名にします。
ハイパーパラメータの範囲を定義する - Amazon SageMaker

from sagemaker.tuner import ContinuousParameter
from sagemaker.tuner import CategoricalParameter

parent_hyperparameter_ranges = {
    "alpha": ContinuousParameter(0.00001, 0.00009, scaling_type="Linear"),
    "penalty": CategoricalParameter(['l1', 'l2']),
    "fit_intercept": CategoricalParameter([True, False]),
}

Turning jobの作成

Estimatorクラス、最適化メトリクス名、探索パラメータの設定、チューニング戦略、early stoppingを設定したTuning Jobを作成します。

以下の設定では、Bayesian最適化戦略で5個の学習ジョブを並列実行し、合計20個の学習ジョブを実行します。

チューニング戦略: ハイパーパラメータ調整の仕組み - Amazon SageMaker

from sagemaker.tuner import HyperparameterTuner

objective_metric_name = "validation loss"
base_parent_tuning_job_name = "sgd-classifier-parent"

tuner_parent = HyperparameterTuner(
    estimator=parent_estimator,
    base_tuning_job_name=base_parent_tuning_job_name,
    objective_type="Minimize",
    objective_metric_name=objective_metric_name,
    hyperparameter_ranges=parent_hyperparameter_ranges,
    metric_definitions=metrics_definitions,
    max_jobs=20,
    max_parallel_jobs=5,
    strategy="Bayesian",
    early_stopping_type="Auto",
)

fitメッドを呼び出すことで、チューニングジョブを実行することができます。

この時、S3の学習データのURIを指定します。

parent_inputs = {"train": s3_train_parent_data, "validation": s3_validation_parent_data}
parent_job_name = "hpt-parent-" + strftime("%Y%m%d%H%M", gmtime())

tuner_parent.fit(
    inputs=parent_inputs,
    job_name=parent_job_name,
    wait=False,
)

コンソール画面からチューニングジョブが実行されていることを確認できます。

親Tuning Job

チューニングジョブに紐づいて、20個の学習ジョブが実行されています。

親Tuning Job学習ジョブ

学習ジョブの一つの実行ログを確認すると、学習スクリプトの実行引数にハイパーパラメータが渡されていることがわかります

実行ログ

子Tuning Jobを実行

Warm Start 設定

warm startを行うためにWarmStartConfigを設定します。

parentsに親チューニングジョブ名を設定し、warm_start_typeにはWarmStartTypes.TRANSFER_LEARNINGを設定します。

WarmStartTypesは2つのタイプがサポートされていて、以下の違いがあります。

  • IDENTICAL_DATA_AND_ALGORITHM: 子チューニングジョブは親チューニングジョブと同じ入力データ・学習アルゴリズムを利用
  • TRANSFER_LEARNING: 子チューニングジョブは親チューニングジョブと異なる入力データ・学習アルゴリズムを利用

今回は、新しいデータで学習を行うため、TRANSFER_LEARNINGを利用します。

transfer learnigと命名されていますが、モデルの学習済みパラメータを前回のチューニングジョブから引き継ぐわけではないので注意してください。

from sagemaker.tuner import WarmStartConfig
from sagemaker.tuner import WarmStartTypes

warm_start_config = WarmStartConfig(
    warm_start_type=WarmStartTypes.TRANSFER_LEARNING, parents={parent_tuning_job_name}
)

探索パラメータの設定

子チューニングジョブの探索パラメータの設定を用意します。

今回はalphaの範囲を親ジョブより広げた設定にします

child_hyperparameter_ranges = {  
    "alpha": ContinuousParameter(0.00001, 0.00025, scaling_type="Linear"),
    "penalty": CategoricalParameter(['l1', 'l2']),
    "fit_intercept": CategoricalParameter([True, False]),
}

Turning Jopの作成

warm_start_configの設定を渡すことで、warm startを利用したチューニングジョブを実行することができます。

objective_metric_name = "validation loss"
base_child_tuning_job_name = "sgd-classifier-child"

tuner_child = HyperparameterTuner(
    estimator=child_estimator,
    base_tuning_job_name=base_child_tuning_job_name,
    objective_type="Minimize",
    objective_metric_name=objective_metric_name,
    hyperparameter_ranges=child_hyperparameter_ranges,
    metric_definitions=metrics_definitions,
    max_jobs=20,
    max_parallel_jobs=1,
    strategy="Bayesian",
    early_stopping_type="Auto",
    warm_start_config=warm_start_config,
)
child_inputs = {"train": s3_train_child_data, "validation": s3_validation_child_data}
child_job_name = "hpt-child-" + strftime("%Y%m%d%H%M", gmtime())

tuner_child.fit(
    inputs=child_inputs,
    job_name=child_job_name,
    wait=False,
)

コンソール画面から子チューニングジョブが実行されていることを確認できます

子Tuning Job

結果の確認

warm startによるハイパラチューニングの結果を確認します。

boto3 のsagemaker client のdescribe_hyper_parameter_tuning_jobでTunig jobの詳細を見ることができます。

ベストパラメータを取得したい場合はこちらの方法が簡単に取得できます。

tuning_job_result = sm.describe_hyper_parameter_tuning_job(
    HyperParameterTuningJobName=child_job_name
)
print(tuning_job_result['BestTrainingJob'])
# {'alpha': '2.3900957827979537e-05', 'fit_intercept': '"True"', 'penalty': '"l2"'}

Tuning jobで実行された学習ジョブを一覧で見る場合、sagemaker sdk のHyperparameterTuningJobAnalyticsを利用します。

データフレーム形式で全ての学習ジョブの内容を表示させます。

from sagemaker.analytics import HyperparameterTuningJobAnalytics

child_results = HyperparameterTuningJobAnalytics(sagemaker_session=sess, hyperparameter_tuning_job_name=child_job_name)
df_child = child_results.dataframe()
df_child

dataframe result

以下のグラフは、親tuning job(青)と子tuning job(赤)の最適化指標がチューニングされている結果を表しています。

検証データが異なるので大小比較に意味はありませんが、子tuning jobの最適化指標が小さい値からチューニングが始まっていて、warm startできていることがわかります。

validation logloss vs timestamp

以下は、子チューニングジョブの各ハイパラとvalidation lossの関係をプロットしたグラフです。

Bayesian最適化戦略で、validation loss が小さい区間での探索が多く行われていることが確認できます。

まとめ

SageMaker HypeparameterTuning を使用してwarm startでハイパラ探索を実行しました。使う利点としては並列で学習を実行できるため、学習に時間がかかるようなモデルに対しては恩恵が受けられると思います。

探索する学習ジョブ数分、インスタンスが立ち上がるため、課金には十分注意してください。軽い気持ちで使用したら結構な額が課金されていて肝を冷やしました。

計算コストが小さいモデルでしたら、OptunaやGridSearchなどによるハイパラ探索を学習スクリプト中に実装する方がコスパいいなと思います。

参考

SageMakerでさくっとABテスト環境を作ってみる

特徴量の異なる2つの予測モデルのABテスト環境を、SageMaeker推論エンドポイントを用いて作ってみます。

本文中のコードです github.com

目次

データ・モデル

  • データ

  • モデル

    • 特徴量をFeature Hasing した、線形分類モデル
    • 広告をクリックする確率(CTR)を予測する二値分類モデル
    • ABテスト用に二つの特徴量の組み合わせのモデルを用意

modelA

feature_names = {
    0: "id",
    1: "hour",
    2: "C1",
    3: "banner_pos",
    4: "site_id",
    5: "site_domain",
    6: "site_category",
    7: "app_id",
    8: "app_domain",
    9: "app_category",
    10: "device_id",
    11: "device_ip",
    12: "device_model",
    13: "device_type",
    14: "device_conn_type",
}

modelB

feature_names = {
    0: "id",
    1: "hour",
    2: "C1",
    3: "banner_pos",
    4: "site_id",
    5: "site_domain",
    6: "site_category",
    7: "app_id",
    8: "app_domain",
    9: "app_category",
    10: "device_id",
    11: "device_ip",
    12: "device_model",
    13: "device_type",
    14: "device_conn_type",
    15: "C14",
    16: "C15",
    17: "C16",
    18: "C17",
    19: "C18",
    20: "C19",
    21: "C20",
    22: "C21",
}

学習

データ準備

以下は、jupyter notebook で実行するコードです。 AWSリソースの設定の読み込みを行います

import yaml
import sagemaker
import boto3
import json

SETTING_FILE_PATH = "../config/settings.yaml"
DATA_FOLDER_PATH = "../avazu-ctr-prediction"

# AWS リソース設定
with open(SETTING_FILE_PATH) as file:
    aws_info = yaml.safe_load(file)
        
sess = sagemaker.Session()
role = aws_info['aws']['sagemaker']['role']
bucket = aws_info['aws']['sagemaker']['s3bucket']
region = aws_info['aws']['sagemaker']['region']

sm = boto3.client('sagemaker')
s3 = boto3.client('s3')

学習データを読み込み、訓練・テストデータにデータ分割します

import os
import pandas as pd

from sklearn.model_selection import train_test_split

# train, validation, test データを用意
df_train = pd.read_csv(os.path.join(DATA_FOLDER_PATH, "train"), dtype="object")
df_train, df_test = train_test_split(df_train, train_size=0.8, random_state=0, shuffle=True)

SageMaker学習ジョブにデータを渡すため、S3にデータを保存します

# S3にアップロード
prefix = 'sagemaker-ab-testing'

train_file = "train.csv"
test_file = "test.csv"

df_train.to_csv(train_file, index=False)
df_test.to_csv(test_file, index=False)

s3_resource_bucket = boto3.Session().resource("s3").Bucket(bucket)

s3_resource_bucket.Object(os.path.join(prefix, "train", train_file)).upload_file(train_file)
s3_resource_bucket.Object(os.path.join(prefix, "test", test_file)).upload_file(test_file)

S3にデータが保存されているのを確認できます

訓練データ

学習スクリプト

SageMaker のScitkit-Learnの環境が用意された公式コンテナで、独自の学習スクリプトを実行する方法で学習ジョブを実行します。
SageMaker学習ジョブで独自スクリプトを実行する詳しい内容はこちらの記事で紹介してます
SageMaker で学習ジョブを実行する ~独自スクリプト~ - nsakki55 のアウトプットブログ

コマンドライン引数からハイパーパラメータ・学習データパスを渡し、学習モデルを保存する train.pyを用意します。
以下はmodelAのtrain.pyです。modelBのtrain.pyは、feature_namesが異なるだけで、その他の処理内容は同じです。

import argparse
import os

import joblib
import numpy as np
import pandas as pd
from sagemaker_training import environment
from sklearn.feature_extraction import FeatureHasher
from sklearn.linear_model import SGDClassifier

feature_names = {
    0: "id",
    1: "hour",
    2: "C1",
    3: "banner_pos",
    4: "site_id",
    5: "site_domain",
    6: "site_category",
    7: "app_id",
    8: "app_domain",
    9: "app_category",
    10: "device_id",
    11: "device_ip",
    12: "device_model",
    13: "device_type",
    14: "device_conn_type",
}


target = "click"


def parse_args():
    env = environment.Environment()

    parser = argparse.ArgumentParser()

    # hyperparameters sent by the client are passed as command-line arguments to the script
    parser.add_argument("--alpha", type=float, default=0.00001)
    parser.add_argument("--n-jobs", type=int, default=env.num_cpus)
    parser.add_argument("--eta0", type=float, default=2.0)

    # data directories
    parser.add_argument("--train", type=str, default=os.environ.get("SM_CHANNEL_TRAIN"))
    parser.add_argument("--test", type=str, default=os.environ.get("SM_CHANNEL_TEST"))

    # model directory: we will use the default set by SageMaker, /opt/ml/model
    parser.add_argument("--model_dir", type=str, default=os.environ.get("SM_MODEL_DIR"))

    return parser.parse_known_args()


def load_dataset(path):
    # Take the set of files and read them all into a single pandas dataframe
    files = [os.path.join(path, file) for file in os.listdir(path) if file.endswith("csv")]

    if len(files) == 0:
        raise ValueError("Invalid # of files in dir: {}".format(path))

    raw_data = [pd.read_csv(file, sep=",") for file in files]
    data = pd.concat(raw_data)

    y = data[target]
    X = data.drop(target, axis=1)
    return X, y


def preprocess(df):
    feature_hasher = FeatureHasher(n_features=2**24, input_type="string")
    hashed_feature = feature_hasher.fit_transform(np.asanyarray(df[[col for col in feature_names.values()]].astype(str)))
    return hashed_feature


def main(args):
    print("Training mode")

    X_train, y_train = load_dataset(args.train)
    X_test, y_test = load_dataset(args.test)

    y_train = np.asarray(y_train).ravel()
    X_train_hashed = preprocess(X_train)

    y_test = np.asarray(y_test).ravel()
    X_test_hashed = preprocess(X_test)

    hyperparameters = {
        "alpha": args.alpha,
        "n_jobs": args.n_jobs,
        "eta0": args.eta0,
    }
    print("Training the classifier")
    model = SGDClassifier(loss="log", penalty="l2", random_state=42, **hyperparameters)

    model.partial_fit(X_train_hashed, y_train, classes=[0, 1])

    print("Score: {}".format(model.score(X_test_hashed, y_test)))
    joblib.dump(model, os.path.join(args.model_dir, "model.joblib"))


if __name__ == "__main__":
    args, _ = parse_args()
    main(args)

学習ジョブの実行

ab_test_deploy.ipynb を作成し、独自スクリプトの学習ジョブを実行します。
Scikit-Learn環境を利用するため、SKLearn Estimator クラスを使用します。
source_dirに学習スクリプトがあるディレクトリ、entry_pointに学習スクリプト名を指定します

以下のフォルダ構成で実行します

src
 ├── ab_test_deploy.ipynb
 │
 ├── modelA
 │     │
 │     └── train.py
 │
 └── modelB
       │
       └── train.py

modelAの学習ジョブを実行します。modelBの学習ジョブの実行はsource_dirを変更すればOKです。

from time import gmtime, strftime

from sagemaker.sklearn.estimator import SKLearn

job_name = "modelA-training-job" + strftime("%Y%m%d-%H-%M-%S", gmtime())

hyperparameters = {"alpha": 0.00001, "eta0": 2.0}
enable_local_mode_training = False

if enable_local_mode_training:
    train_instance_type = "local"
    inputs = {"train": f"file://{train_file}", "test": f"file://{test_file}"}
else:
    train_instance_type = "ml.m5.large"
    inputs = {"train": s3_train_data, "test": s3_test_data}

estimator_parameters = {
    "entry_point": "train.py",
    "source_dir": "modelA",
    "framework_version": "0.23-1",
    "py_version": "py3",
    "instance_type": train_instance_type,
    "instance_count": 1,
    "hyperparameters": hyperparameters,
    "output_path": output_location,
    "role": role,
    "base_job_name": job_name,
}

model_a_estimator = SKLearn(**estimator_parameters)
model_a_estimator.fit(inputs)

学習ジョブが実行されて、S3に学習済みモデルが保存されていることを確認できます

学習ジョブ

学習済みモデル

モデル登録

推論スクリプト

AWSが提供する公式コンテナ環境で独自の推論スクリプトを実行します。

公式コンテナ環境にはsagemkaker-inference-toolkit がインストールされていて、ツールキットの使用に合わせて、独自推論スクリプトを実行するには以下の関数を記述する必要があります

  • model_fn: 学習済みモデルの読み込み処理
  • input_fn: リクエストデータをモデルに渡すデータに変換する処理
  • predict_fn: モデルから予測値を取得する処理
  • output_fn : predict_fnの予測値をレスポンスとして返すための処理

記事の本題からズレるので、詳しい説明は以下の記事でご確認ください。
独自の推論コンテナを適応させる - Amazon SageMaker
Deploying A Pre-Trained Sklearn Model on Amazon SageMaker | by Ram Vegiraju | Towards Data Science

modelAの推論処理を記述したinference.pyを用意します。modelB用の推論処理はfeature_namesが変更されるだけで、他は同じ処理内容です。

import os
from io import StringIO

import joblib
import numpy as np
import pandas as pd
from sklearn.feature_extraction import FeatureHasher

feature_names = {
    0: "id",
    1: "hour",
    2: "C1",
    3: "banner_pos",
    4: "site_id",
    5: "site_domain",
    6: "site_category",
    7: "app_id",
    8: "app_domain",
    9: "app_category",
    10: "device_id",
    11: "device_ip",
    12: "device_model",
    13: "device_type",
    14: "device_conn_type",
}


def preprocess(df: pd.DataFrame):
    feature_hasher = FeatureHasher(n_features=2**24, input_type="string")
    hashed_feature = feature_hasher.fit_transform(np.asanyarray(df[[idx for idx in feature_names.keys()]].astype(str)))
    return hashed_feature


def model_fn(model_dir):
    """
    Deserialize fitted model
    """
    model = joblib.load(os.path.join(model_dir, "model.joblib"))
    return model


def input_fn(request_body, request_content_type):
    """
    request_body: The body of the request sent to the model.
    request_content_type: (string) specifies the format/variable type of the request
    """
    if request_content_type == "text/csv":
        # Read the raw input data as CSV.
        df_input = pd.read_csv(StringIO(request_body), header=None)
        input_data_hashed = preprocess(df_input)
        return input_data_hashed
    else:
        raise ValueError("This model only supports text/csv input")


def predict_fn(input_data, model):
    """
    input_data: returned array from input_fn above
    model (sklearn model) returned model loaded from model_fn above
    """

    return model.predict_proba(input_data)


def output_fn(prediction, content_type):
    """
    prediction: the returned value from predict_fn above
    content_type: the content type the endpoint expects to be returned. Ex: JSON, string
    """

    response = {"result": [{"prediction": prediction[0][1], "model": "modelA"}]}
    return response

モデル登録

ab_test_deploy.ipynbからエンドポイントに使用するためのモデル登録を行います。
AWS提供のScikit-Learn環境で、独自推論スクリプトを実行するモデルを登録するため、SKLearnModelクラスを使用します。
model_data引数には先程の学習ジョブで保存されたモデルのS3 uriを指定します。
source_dir には推論スクリプトディレクトリ、entri_pointには推論スクリプト名を渡します。

sagemaker session のcreate_modelメソッドでモデルの登録を行います。

モデル登録にはこの方法以外に

の方法がありますが、自分が試したところ、上記二つの方法で登録したモデルでは独自推論処理のエンドポイントが立てられませんでした。うーん、SageMaker 難しい…

以下のフォルダ構成で実行します

src
 ├── ab_test_deploy.ipynb
 │
 ├── modelA
 │     │
 │     └── inference.py
 │
 └── modelB
       │
       └── inference..py
import sagemaker
from sagemaker.sklearn.model import SKLearnModel

sess = sagemaker.Session()

modelA = SKLearnModel(
    role=role,
    model_data=model_a_estimator.model_data,
    framework_version="0.23-1",
    py_version="py3",
    source_dir="modelA",
    entry_point="inference.py",
    sagemaker_session=sess
)

model_a_name = "{}-{}".format("modelA", timestamp)

# モデル登録
sess.create_model(
    model_a_name,
    role,
    modelA.prepare_container_def(
        instance_type='ml.t2.medium'
    )
)

モデルが登録されていることを確認できます。

モデル登録

推論エンドポイント作成

バケット振り分け設定

1つの推論エンドポイントにmodelA, modelBの2つの予測モデルをデプロイし、エンドポイント設定でリクエストのトラフィックを分配することでABテストを実現できます。

Traffic分配
2つのモデルのバケット振り分け設定・インスタンス設定をproduction_variant に渡します。
initial_weightが割り当て比率になり、50に設定します。
作成したproduction_variantを元に、create_endpoint_configでエンドポイントの設定を作成します。

from sagemaker.session import production_variant
import time

timestamp = "{}".format(int(time.time()))
endpoint_config_name = "{}-{}".format("abtest", timestamp)

modelA_variant = production_variant(
    model_name=model_a_name,
    instance_type="ml.t2.medium",
    initial_instance_count=1,
    variant_name="VariantA",
    initial_weight=50,
)

modelB_variant = production_variant(
    model_name=model_b_name,
    instance_type="ml.t2.medium",
    initial_instance_count=1,
    variant_name="VariantB",
    initial_weight=50,
)

# 推論エンドポイント設定
endpoint_config = sm.create_endpoint_config(
    EndpointConfigName=endpoint_config_name, ProductionVariants=[modelA_variant, modelB_variant]
)

エンドポイント設定が作成されていることを確認できます

エンドポイント設定

推論エンドポイント作成

create_endpointに設定を渡すことで、任意の割り当て比率の推論エンドポイントを作成できます。

model_ab_endpoint_name = "{}-{}".format("abtest", timestamp)

# 推論エンドポイント作成
endpoint_response = sm.create_endpoint(EndpointName=model_ab_endpoint_name, EndpointConfigName=endpoint_config_name)

バケット振り分けをしたエンドポイント設定を元に、推論エンドポイントが作成されていることを確認できます。

推論エンドポイント
推論エンドポイント2

リクエス

boot3のsagemaker-runtime clientで推論エンドポイントにリクエストを送ります。 適切に2つのモデルにリクエストが振り分けられ、ABテストになっているか確認します。

runtime = boto3.Session().client('sagemaker-runtime')
model_list = []
prediction_list = []

with open('test.csv') as f:
    for line in f:
        response = runtime.invoke_endpoint(EndpointName=model_ab_endpoint_name, 
                                   ContentType='text/csv', 
                                   Body=line,
                                  Accept='application/json')
        df_pred = pd.read_csv(response['Body'], header=None, delimiter='\t')
        model = json.loads(df_pred[0][0])['result'][0]['model']
        prediction = json.loads(df_pred[0][0])['result'][0]['prediction']
        model_list.append(model)
        prediction_list.append(prediction)

リクエストの振り分けを確認すると、ほぼ50:50の割り当て比率であることを確認できます。

print("[modelA] response:{}, bucket: {}".format(model_list.count('modelA'), model_list.count('modelA') / len(model_list)))
print("[modelB] response:{}, bucket: {}".format(model_list.count('modelB'), model_list.count('modelB') / len(model_list)))

# 出力
#[modelA] response:5073, bucket: 0.5073
#[modelB] response:4927, bucket: 0.4927

まとめ

SageMaker Endpoint を用いてABテスト環境を作成しました。 注意点として、SageMakerがTrafficをランダムに複数モデルに割り当てるため、リクエストベースでのABテストとなります。ユーザーベースなど、任意のセグメントでバケットを振り分けたい場合、クライアント側でリクエストの割り振りを行う必要があります。

参考

実行中のECS TaskのCloudWatchログを標準出力し、タスクの正常終了を判定する

本文中のコードです github.com

やりたいこと

  • boto3 で実行したECS Taskが成功・失敗したか実行側で判定したい
  • ECS Task のCloud Watchログをポーリングし続けて、実行ログを標準出力し続けたい

実装したコード

task_arn, ecs_cluster を引数に受け取り、ECS TaskのCloud Watchログをポーリングして標準出力し、ECS Taskが正常・異常終了したか判定する EcsTaskWatcher クラスを作成しました。
10秒ごとboto3のdescribe_tasksを実行し、前回の実行から新しいログが追加された場合、新しいログを標準主力します。
exit_code = 0 の場合は正常終了、それ以外は異常終了としてログのポーリングを終了します。

import time
from typing import Any, List, Tuple

import boto3


class ECSTaskExecutionError(Exception):
    """ECS Task is failed."""

    pass


class EcsTaskWatcher:
    def __init__(self, cluster: str, task_arn: str) -> None:
        self.ecs = boto3.client("ecs")
        self.cloudwatch = boto3.client("logs")
        self.cluster = cluster
        self.task_arn = task_arn

        self.log_group_name, self.log_stream_name = self._get_log_setting()

        self.previous_logs = []

    def _get_log_setting(self) -> Tuple[str, str]:
        task_description = self.ecs.describe_tasks(
            cluster=self.cluster,
            tasks=[
                self.task_arn,
            ],
        )
        task_definition_arn = task_description["tasks"][0]["taskDefinitionArn"]
        task_definition = self.ecs.describe_task_definition(taskDefinition=task_definition_arn)

        log_group_name = task_definition["taskDefinition"]["containerDefinitions"][0]["logConfiguration"]["options"][
            "awslogs-group"
        ]

        task_name = task_definition["taskDefinition"]["containerDefinitions"][0]["name"]
        log_stream_prefix = task_definition["taskDefinition"]["containerDefinitions"][0]["logConfiguration"]["options"][
            "awslogs-stream-prefix"
        ]
        task_id = self.task_arn.split("/")[-1]
        log_stream_name = f"{log_stream_prefix}/{task_name}/{task_id}"

        return log_group_name, log_stream_name

    def _stream_log(self) -> None:
        try:
            logs = self.cloudwatch.get_log_events(
                logGroupName=self.log_group_name, logStreamName=self.log_stream_name, startFromHead=True
            )["events"]

            new_logs = self._subtract_list(logs, self.previous_logs)
            if new_logs:
                for line in new_logs:
                    print(line["message"])

            self.previous_logs = logs

        except:
            pass

    def _subtract_list(self, list1, list2) -> List[Any]:
        list_diff = list1.copy()
        for l in list2:
            try:
                list_diff.remove(l)
            except ValueError:
                continue
        return list_diff

    def watch_task_condition(self) -> None:
        running_status = True
        while running_status:
            response = self.ecs.describe_tasks(
                cluster=self.cluster,
                tasks=[
                    self.task_arn,
                ],
            )
            last_status = response["tasks"][0]["lastStatus"]

            if last_status == "STOPPED":
                running_status = False
                self._stream_log()
                exit_code = response["tasks"][0]["containers"][0]["exitCode"]
                if exit_code == 0:
                    print("ECS Task Success")
                else:
                    print("ECS Task Failed")
                    raise ECSTaskExecutionError
            else:
                self._stream_log()
                time.sleep(10)

実行例

boto3のrun_taskで実行したECS Taskのtask_arnと、ecs_clusterをEcsTaskWatcherに渡し、watch_task_conditionメソッドを呼び出します。
ECS Taskが終了するまで次の処理に進みたくない場合に便利です。

import boto3
from ecs_task_watcher import EcsTaskWatcher

ECS_CLUSTER = "cluster-name"
TASK_DEFINITION_ARN = "task_definition_arn"


def main():
    ecs = boto3.client("ecs")
    ecs_task_reponse = ecs.run_task(
        cluster=ECS_CLUSTER,
        taskDefinition=TASK_DEFINITION_ARN,
    )

    task_arn = ecs_task_reponse["tasks"][0]["taskArn"]
    ecs_task_watcher = EcsTaskWatcher(ECS_CLUSTER, task_arn)
    ecs_task_watcher.watch_task_condition()


if __name__ == "__main__":
    main()

参考

SageMaker で学習ジョブを実行する ~独自コンテナ~

SageMaker で学習ジョブを実行する ~独自スクリプト~ - nsakki55 のアウトプットブログの続きです

記事中のコード github.com

SagaMaker を用いたモデルの学習

SageMaker では学習に必要なスクリプトやライブラリをコンテナベースで管理し、学習に必要なインフラ管理を自動で行なってくれます。

思想としては、データサイエンティストが面倒なインフラ管理や推論サービスの作業を行わず、MLモデルの開発に集中できることが SageMaker を使用するメリットとなっています。

SageMaker Training Jobが行なっていることは、大きく分けると以下の流れになります

  • S3から入力データを読み込み
  • 学習用コンテナを実行
  • モデルアーティファクトをS3に書き戻す

SageMaker Training Jobの概要
引用:Amazon SageMaker でモデルをトレーニングする - Amazon SageMaker

データ入出力を利用するためのデータパスの対応や、コンテナ環境の準備など、SageMaker で学習を実行するためのルールがあります。

SageMakerで学習を実行する場合、3つパターンが存在します

コード量を少なくし手軽にモデルの学習を実行できるのは組み込みアルゴリズムを利用する場合ですが、実装の柔軟性は低くなります。一方、独自スクリプトや独自コンテナを利用する場合、コード量は増えますが実装の柔軟性は高くなり、独自のアルゴリズムでの学習を実行することができるようになります。

SageMaker で学習ジョブを実行するパターン

初めてSageMakerでモデルの学習を実行する際、実行パターンの多さ・守るルールの多さに面食らってしまう人が多いと思います。ドキュメントやサンプルコードは豊富に提供されていますが、どのパターンの学習方法で、最低限必要な要素は一体何なのかを把握するのが大変です。

今回は3つのパターンで独自の予測モデルの学習を行い、学習を実行する流れを整理ます。

SageMaker ではモデル学習のために以下のようなサービスを提供してくれていますが、本記事では扱いません。

  • HyperparameterTuning
  • SagaMaker Model Monitoring
  • Clarify
  • Debugger
  • Endpoint

...

本記事では独自コンテナを用いてSageMaker 学習ジョブで実行します SageMaker 学習ジョブを実行するには AWS SDK, SageMaker SDKを用いる2つの方法がありますが、今回は jupyter notebook から学習をジョブを実行するため、SageMaker SDKを使用します。

データセット・モデル

今回は広告をクリックする確率(CTR)を予測を予測する二値分類モデルの学習を行います。

データセットは kaggle の avazu-ctr-predictionのデータセットを使用します。

https://www.kaggle.com/c/avazu-ctr-prediction

Feature Engineering・モデルは

  • FeatureHashing・線形回帰モデル
  • OneHotEncoding・Factorization Machines

の2パターンを実装します

学習データの準備

学習ジョブに渡すデータをS3にアップロードしておきます。

今回はtrain , validation , test にデータを分割します

gist82e34280293abba2b1743f49e1cc7102

train data

独自コンテナ

独自コンテナを用いて学習ジョブを実行する方法は、3パターンあります

  1. AWS が提供しているコンテナイメージを拡張する方法
  2. 独自のコンテナイメージに SageMaker Training Toolkit をインストールする方法
  3. クラッチでコンテナイメージを作成する方法

カスタムコンテナによる学習パターン
引用:Amazon SageMaker におけるカスタムコンテナ実装パターン詳説 〜学習編〜 | Amazon Web Services ブログ

公式サンプルコードやドキュメントの多くは、2つめのパターンの独自のコンテナイメージにSageMaker Training Toolkit で実装しているものが多いです。

自分がSageMaker で独自コンテナを利用しようと思い立った時、色々なやり方が錯綜していて困ったので、3つのパターンの学習ジョブを実行する流れを整理したいとおもいます。

独自コンテナで学習ジョブを実行する流れ

独自コンテナで学習ジョブを実行する流れ
独自コンテナの学習ジョブを実行する流れは以下になります

  • 学習スクリプトを用意
  • 学習ライブラリ・スクリプトを設定したDockerfileの作成
  • ECRへdocker image をpush
  • Estimatorクラスにdocker image, 学習設定を渡して学習ジョブを実行

AWS が提供しているコンテナイメージを拡張する方法

独自スクリプトを利用する場合も、AWSが提供するコンテナ環境に独自実装のライブラリやthird party のライブラリをrequirements.txt を配置しておくことで利用することができます。

requriements.txt による公式コンテナ環境の拡張方法はこちらの記事でまとめています SageMaker で学習ジョブを実行する ~独自スクリプト~ - nsakki55 のアウトプットブログ

AWSが提供するコンテナイメージをベースとして、新しく作成したコンテナイメージを利用する方法を説明します。

まずベースとなるAWS提供のコンテナイメージのimage uriを取得します。

Scikit-Learn 環境のimage: https://docs.aws.amazon.com/ja_jp/sagemaker/latest/dg/pre-built-docker-containers-scikit-learn-spark.html

利用したいライブラリのEstimator クラスのtraining_image_uriメソッドでimage uri を取得することができます。

from sagemaker.sklearn.estimator import SKLearn

estimator = SKLearn(entry_point="",
                    framework_version="0.23-1",
                    py_version="py3",
                    role=role,
                    instance_type='local')

print(estimator.training_image_uri())
# 354813040037.dkr.ecr.ap-northeast-1.amazonaws.com/sagemaker-scikit-learn:0.23-1-cpu-py3

third party library のoptuna を利用してハイパーパーパラメータチューニングを行う学習スクリプトを用意します。
AWS提供のimage にはsagemaker training toolkit がインストールされているため、環境変数を用いて学習データ・ハイパーパラメータを学習スクリプトに受け渡すことができます。
sagemaker training toolkit を用いた学習スクリプトの記述についてはSageMaker で学習ジョブを実行する ~独自スクリプト~ - nsakki55 のアウトプットブログの記事で取り上げています。

gist39c43d40dd3232b9c41433c485d3e0d5

SageMaker の公式コンテナイメージをベースイメージにした Dockerfile を作成します。

拡張ライブラリ・学習スクリプトをコピーします。

SageMaker の公式コンテナイメージにには、sagemaker-training-toolkit がインストールされているため、学習ジョブを開始する際の実行スクリプト環境変数 SAGEMAKER_PROGRAM で設定します。

gistf7d8917f9dc5fdf54ae29e71fad71f86

Dockerfileを元に、ECRへイメージをpushするスクリプト

gist47fa76080719b17210c1327a2f17d1a2

docker image をECRへpushします

./buiid_push_image.sh ctr-prediction-extend-sagemaker-container extend_sagemaker_container

作成したコンテナイメージで学習ジョブを実行するには、Estimator クラスの image_uri にECRに登録した image のURIを設定します

以下のディレクトリ構成で実行します

src
 ├── extend_sagemaker_container_trainer.ipynb
 │
 ├── build_push_image.sh
 │
 └── extend_sagemaker_container
       │
       ├── Dockerfile
       │
       └── extend_sagemaker_container_trainig_script.py

gistbfabcc8d05f9cc82ebfa04571749dc86

管理画面から学習ジョブが実行されてるのを確認できます

extend sagemaker container training job

学習ジョブを実行すると、Dockerfile中に環境変数 SAGEMAKER_PROGRAM に設定したスクリプトが実行されます。 ログを確認すると、extend_sagemaker_container_trainig_script.py が実行されていることを確認できます。

extend sagemaker container training job

参考: 公式コンテナイメージの拡張を行う場合の公式サンプルコードamazon-sagemaker-examples/advanced_functionality/pytorch_extending_our_containers at main · aws/amazon-sagemaker-examples · GitHub

独自のコンテナイメージに SageMaker Training Toolkit をインストールする方法

AWSが提供するコンテナイメージを拡張する場合、sagemaker-training-toolkit を自分でインストールせずに利用することができます。

完全に新しく自作のコンテナイメージを作成し、AWSが提供するコンテナイメージと同じように実行する場合、sagemaker-training-toolkit をインストールする必要があります。

ここでは、fastFMをインストールした独自コンテナイメージで学習ジョブを実行してみます。 独自のトレーニングコンテナを適応させる - Amazon SageMaker

fastFMライブラリを利用する学習スクリプトを用意します。

AWS提供のコンテナイメージを利用する場合と同様、sagemaker-training-toolkit の環境変数を用いてデータ、ハイパーパラメータの受け渡しを行います。

gista8f890bad85b6fedce7656850956be99

必要なライブラリを含めたDockerfile を用意します。

この時、sagemaker-training をインストールすることで、sagemaker-training-toolkitの環境変数によるデータ・ハイパーパラメータの受け渡しができるようになります。

環境変数 SAGEMAKE_PROGRAM で実行スクリプトを指定できます。

gistd615eda7343d433c88b6fcdec8fd5fe5

環境変数 SAGEMAKER_PROGRAM を指定しない場合、以下のコマンドでコンテナイメージが実行されます。 Amazon SageMaker がトレーニングイメージを実行する方法 - Amazon SageMaker

docker run image名 train

この場合、学習スクリプトをキックするtrainというファイル名のシェルスクリプトを用意します

#! /bin/sh
python /opt/ml/code/custom_toolkit_container_training_script.py

Dockerfile にtrain ファイルのコピーを追加します

gist218d3893ec4fe3ec6b28770db2d277a2

ECRへ imageをpushします

./buiid_push_image.sh ctr-prediction-custom-toolkit-container custom_toolkit_container

Estimator クラスのimage_uri にECRにpushした独自コンテナを指定します。

ハイパーパラメータをAWS提供のコンテナイメージを利用する場合と同様に、Estimatorクラスに渡します。

src
 ├── custom_toolkit_container_trainer.ipynb
 │
 ├── build_push_image.sh
 │
 └── custom_toolkit_container
       │
       ├── Dockerfile
       │
       └── custom_toolkit_container_trainig_script.py

gist4082bd52bf45468acb96395d0f41c3d7

独自コンテナを指定した学習ジョブが実行されていることを確認できます。

training toolkit 学習ジョブ

クラッチでコンテナイメージを作成する方法

sagemaker-training-toolkit を利用せず、独自コンテナをSageMaker の学習上で実行する場合、以下の機能を実装する必要があります

  • コンテナ中の学習スクリプトを ENTRYPOINTとして指定できる機能
  • ハイパーパラメータを学習スクリプトの実行時引数として渡される機能
  • 学習ジョブの正常・異常終了をサービスに通知する機能

sagemaker-training-toolkit をインストールする場合、上記の機能が自動で追加されます。

公式サンプルコードを参考に実装していきます amazon-sagemaker-examples/advanced_functionality/custom-training-containers at main · aws/amazon-sagemaker-examples · GitHub

SageMaker Training Jobではハイパーパラメータ・データの情報がそれぞれ /opt/ml/input/config/hyperparameters.json , /opt/ml/input/config/inputdataconfig.json に書き込まれます。

Amazon SageMaker がトレーニング情報を提供する方法 - Amazon SageMaker

データは/opt/ml/input/data/チャンネル名以下に自動的に配置されます。

学習が正常終了した際は sys.exit(0) , 異常終了した際は sys.exit(1) を返すようにします。

上記の機能を追加した学習スクリプトを作成します

gista6729d6984de03fd7473f3cbeda15712

Dockerfile に必要なライブラリのインストールと、ENTRYPOINT に学習スクリプトを実行するコマンド設定を記述します

gist082eeba3782b1a5da3690f914f88c0f4

image を ECR へ pushします

./buiid_push_image.sh ctr-prediction-scratch-container scratch_container

以下のフォルダ構成で学習ジョブを実行します

src
 ├── scratch_container_trainer.ipynb
 │
 ├── build_push_image.sh
 │
 └── scratch_container
       │
       ├── Dockerfile
       │
       └── scratch_container_trainig_script.py

学習ジョブの実行は、sagemaker-training-toolkitを利用する場合と同様の方法で行います。

gist61eac4ca470901c1569042878978677b

学習ジョブが実行されていることを確認できます。

scratch container training job

実行ログを見ると、ハイパーパラメータや学習データが学習ジョブに渡せていることを確認できます。

extend sagemaker container log

まとめ

独自コンテナでSageMaker 学習ジョブを実行する3つの方法を試してみました。データ・ハイパーパラメータの設定・学習ジョブを実行するためのentrypointの設定など、利用するために覚えることが多いなと思います。一度ルールを覚えてしまえば、インフラ管理を気にする必要なくなるため実験を沢山実行したいDS・MLエンジニアに便利なサービスだと思います。

参考

SageMaker で学習ジョブを実行する ~独自スクリプト~

SageMaker で学習ジョブを実行する ~組み込みアルゴリズム~ - nsakki55 のアウトプットブログの続きの内容です
記事中のコード

github.com

SagaMaker を用いたモデルの学習

SageMaker では学習に必要なスクリプトやライブラリをコンテナベースで管理し、学習に必要なインフラ管理を自動で行なってくれます。

思想としては、データサイエンティストが面倒なインフラ管理や推論サービスの作業を行わず、MLモデルの開発に集中できることが SageMaker を使用するメリットとなっています。

SageMaker Training Jobが行なっていることは、大きく分けると以下の流れになります

  • S3から入力データを読み込み
  • 学習用コンテナを実行
  • モデルアーティファクトをS3に書き戻す

SageMaker Training Jobの概要

引用: Amazon SageMaker でモデルをトレーニングする - Amazon SageMaker

データ入出力を利用するためのデータパスの対応や、コンテナ環境の準備など、SageMaker で学習を実行するためのルールがあります。

SageMakerで学習を実行する場合、3つパターンが存在します

コード量を少なくし手軽にモデルの学習を実行できるのは組み込みアルゴリズムを利用する場合ですが、実装の柔軟性は低くなります。一方、独自スクリプトや独自コンテナを利用する場合、コード量は増えますが実装の柔軟性は高くなり、独自のアルゴリズムでの学習を実行することができるようになります。

SageMaker で学習ジョブを実行するパターン

初めてSageMakerでモデルの学習を実行する際、実行パターンの多さ・守るルールの多さに面食らってしまう人が多いと思います。ドキュメントやサンプルコードは豊富に提供されていますが、どのパターンの学習方法で、最低限必要な要素は一体何なのかを把握するのが大変です。

SageMaker ではモデル学習のために以下のようなサービスを提供してくれていますが、本記事では扱いません。

  • HyperparameterTuning
  • SagaMaker Model Monitoring
  • Clarify
  • Debugger
  • Endpoint

...

本記事では独自スクリプトを用いてSageMaker 学習ジョブで実行します SageMaker 学習ジョブを実行するには AWS SDK, SageMaker SDKを用いる2つの方法がありますが、今回は jupyter notebook から学習をジョブを実行するため、SageMaker SDKを使用します。

データセット・モデル

今回は広告をクリックする確率(CTR)を予測を予測する二値分類モデルの学習を行います。

データセットは kaggle のavazu-ctr-predictionのデータセットを使用します。

Click-Through Rate Prediction | Kaggle

特徴量前処理はFeatureHashing を行い、線形回帰モデルを使用します。

独自スクリプト

独自スクリプト(script mode)で学習ジョブを実行する場合、AWSが提供するScikit-Learn や PyTorchなどのライブラリ実行環境が整えられたコンテナを利用して、独自スクリプトを実行します。 Bring Your Own Model with SageMaker Script Mode — Amazon SageMaker Examples 1.0.0 documentation

AWS提供コンテナ環境一覧

公式サンプルコードに、各ライブラリ環境での独自スクリプトの利用ユースケースがまとまっています

github.com

AWSが提供するコンテナ環境の実装は公開されています
- TensorFlow/Keras
- PyTorch
- MXNet
- Chainer
- Scikit-Learn
- XGBoost
- Spark ML
- Reinforcement Learning

独自スクリプトの実行パターン

AWSが提供するコンテナ環境で独自スクリプトを実行する場合、3つのパターンがあります

  • AWS提供のコンテナ環境で実行する方法
  • AWS提供のコンテナ環境に3rd party ライブラリを追加して実行する方法
  • AWS提供のコンテナ環境に自前ライブラリを追加して実行する方法

下二つの方法を用いれば、AWS提供のライブラリに環境を簡単に拡張することができます。

より独自の実行環境を用意したい場合はカスタムコンテナの利用を検討することになりますが、本記事では扱いません。

独自スクリプトを実行する方法

SageMakerで独自スクリプトを実行する流れ

Script Mode で学習ジョブをSageMaker上で実行する流れは以下のようになっています

SageMaker 学習ジョブ概要

  1. 学習データの準備 (S3にアップロード)
  2. 学習スクリプトの準備
  3. 学習設定を渡した学習ジョブをSageMakerで実行

AWS提供のコンテナにインストールされているSageMaker Training Toolkit ライブラリの環境変数を用いて、SageMaker のコンテナ内に学習データ・学習スクリプト・学習設定を受け渡すことができます。

SageMaker で独自スクリプトを実行する場合、これらのデータ受け渡しのルールに従って学習スクリプトを記述する必要があります。

個人的な所感ですが、DSが手軽にSageMakerで学習を実行するまでの学習コストが高いなと思います。SageMaker がホスティングするコンテナにデータ・ハイパーパラメータを受け渡すためのルールが独特で、慣れるまで時間がかかると思います。

SageMaker Trainig Toolkit の環境変数

AWS提供のコンテナ環境には、SageMaker Training Toolkit という、ユーザーが用意したデータ・スクリプト・学習設定をコンテナ内に受け渡し、学習ジョブを実行するためのライブラリがインストールされています。

SageMaker Training Tookit によって、SageMaker がスクリプトに渡す環境変数の一部を紹介します

独自スクリプトを実行する際は、下記の環境変数スクリプト中に記述して、外部からのデータの受け渡しを行います。 sagemaker-training-toolkit/ENVIRONMENT_VARIABLES.md at master · aws/sagemaker-training-toolkit · GitHub

  • SM_MODEL_DIR
  • SM_INPUT_DIR
  • SM_INPUT_CONFIG_DIR
  • SM_CHANNELS
    • train, validation, test など入力データのS3ロケーション
  • SM_CHANNEL_{channel_name}
    • 各channel(入力データ)を格納するディレクトリ. channel=training の場合 SM_CHANNEL_TRAINING=/opt/ml/input/data/training
  • SM_OUTPUT_DATA_DIR
    • 評価結果と他の訓練に関係しない出力データを格納するディレクトopt/ml/output
  • SM_HPS
  • SM_CURRENT_HOST
  • SM_HOSTS
  • SM_NUM_GPUS
  • SM_NUM_CPUS
  • SM_LOG_LEVEL
  • SM_USER_ARGS
    • ユーザーが指定する追加引数SM_USER_ARGS='["--batch-size","256","--learning_rate","0.0001"]'

AWSが提供するコンテナ環境で行われる処理

AWS提供のコンテナ環境内部で行われている処理を確認します。今回はScikit-Learn環境を使用するため、https://github.com/aws/sagemaker-scikit-learn-containerの実装を取り上げます。

学習ジョブを実行すると、sagemaker training toolkit の entry_point.py中のrunメソッドを呼び出しています。

from __future__ import absolute_import
import logging

from sagemaker_training import entry_point, environment, runner

logger = logging.getLogger(__name__)


def train(training_environment):
    """Runs Scikit-learn training on a user supplied module in local SageMaker environment.
    The user supplied module and its dependencies are downloaded from S3.
    Training is invoked by calling a "train" function in the user supplied module.
    Args:
        training_environment: training environment object containing environment variables,
                               training arguments and hyperparameters
    """
    logger.info('Invoking user training script.')
    entry_point.run(uri=training_environment.module_dir,
                    user_entry_point=training_environment.user_entry_point,
                    args=training_environment.to_cmd_args(),
                    env_vars=training_environment.to_env_vars(),
                    runner_type=runner.ProcessRunnerType)


def main():
    train(environment.Environment())

sagemaker-scikit-learn-container/training.py at cff4e19e29b7f4e3388f7dd69c2551ff8dca820f · aws/sagemaker-scikit-learn-container · GitHub

sagemaker training toolkit の entry_point.py のrunメソッドのdocstring を見てみます。

「entry point として設定されたS3, フォルダから圧縮したtarファイルをダウンロード・準備を行い、環境変数(env_vars)と、コマンドライン引数(args)を渡してユーザーが設定した entry point を実行する」と実行内容が記述されています

entry point ごとに以下のコマンドが実行されます

python パッケージ : env_vars python -m module_name + args

python スクリプト : env_vars python module_name + args

その他 : env_vars /bin/sh -c ./module_name + args

def run(
    uri,
    user_entry_point,
    args,
    env_vars=None,
    wait=True,
    capture_error=False,
    runner_type=runner.ProcessRunnerType,
    extra_opts=None,
):
    """Download, prepare and execute a compressed tar file from S3 or provided directory as a user
    entry point. Run the user entry point, passing env_vars as environment variables and args
    as command arguments.
    If the entry point is:
        - A Python package: executes the packages as >>> env_vars python -m module_name + args
        - A Python script: executes the script as >>> env_vars python module_name + args
        - Any other: executes the command as >>> env_vars /bin/sh -c ./module_name + args

sagemaker-training-toolkit/entry_point.py at a5b15b37c2d349d71cd050faf3680051a9e0f391 · aws/sagemaker-training-toolkit · GitHub

独自スクリプトAWS提供コンテナ環境で実行する場合、環境変数コマンドライン引数を渡してスクリプトが内部で実行されていることが確認できます。

SagaMaker 学習ジョブの内部処理が気になる方は、各ライブラリ環境の実装と、sagemaker trainig toolkit の実装を一度見てみることをお勧めします。

AWS提供のコンテナ環境で実行する方法

SageMaker で独自スクリプトを実行する手順は3つです

  1. 学習データの準備 (S3にアップロード)
  2. 学習スクリプトの準備
  3. 学習設定を渡した学習ジョブをSageMakerで実行

学習データの準備 (S3にアップロード)

学習ジョブ実行時に渡す学習用データをS3にアップロードします

SageMaker SDK に渡す設定を読み込み、データをtrain, validation, test に分割してS3にアップロードします

gistdf57800403f45b3a2f5a505b65679ec4

S3 にデータがアップロードされていることを確認できます

train data

学習スクリプトの準備

データの読み込み→学習→モデル保存の一連の流れを実装した学習スクリプトを用意します。

以下のファイルをsklearn_script_mode.py として保存します. ポイントは以下の点になります
- コマンドライン引数で学習ハイパーパラメータを受け取る
- 入力データ・学習モデルの読み込み、書き込みパスを環境変数で取得する
独自スクリプトを記述する際は、上記2点さえ注意すれば基本的にMLモデルの学習に限らずどのような処理を記述しても動作します。

gist5625192bbe12d745020cb329ae6b5291

学習設定を渡した学習ジョブをSageMakerで実行

今回はscikit-learn を使用するため、SKLearn Estimatorクラスを使用します。

独自スクリプトを実行する場合、entrypoint に実行スクリプト名、source_dir に実行スクリプトディレクトリ名を設定します。

entrypoint に指定するスクリプトはS3上のファイルも指定することが可能です。

instance_type に local を指定すると、ローカル環境上で学習ジョブを実行することができます。

以下のようなディレクトリ構成で実行します。

src
 ├── script_mode_trainer.ipynb
 │
 └── custom_script
         └── sklearn_script_mode.py

fit メソッドを呼び出すことで学習ジョブを SageMaker で実行できます。この時学習データのS3パスを指定します。

gistc68517790baaa738443c0db77aa40177

SageMaker のコンソール画面から学習ジョブが実行できていることを確認できます

custom script training job

学習ジョブのCloud Watchログを見ると、スクリプト実行開始時にsagemaker training toolkit の環境変数一覧が表示されています。

custom script log1
custom script log2

S3に学習済みモデルや、学習ジョブに関わる設定ファイルがアップロードされています

model aritfact

AWS提供のコンテナ環境に3rd party ライブラリを追加して実行する方法

3rd party ライブラリとしてOptuna をインストールして、学習スクリプト中でハイパーパラメータチューニングを行う学習ジョブを実行してみます

学習データの準備 (S3にアップロード)

AWS提供のコンテナ環境で実行する方法」で既にS3にデータをアップロード済みなので省略

学習スクリプトの準備

3rd partyライブラリをAWS提供のコンテナ環境にインストールして使用する場合、学習スクリプトと同じフォルダ以下にrequirements.txt を配置すると、学習ジョブ実行時に自動でインストールが行われます。

sagemaker trainig toolkit の requirements.txt を検出してインストールする部分の実装を確認してみます。

entry_point.run 内部でinstall メソッドが呼び出され、依存関係のインストールが行われています

def install(name, path=environment.code_dir, capture_error=False):
    """Install the user provided entry point to be executed as follows:
        - add the path to sys path
        - if the user entry point is a command, gives exec permissions to the script
    Args:
        name (str): Name of the script or module.
        path (str): Path to directory where the entry point will be installed.
        capture_error (bool): Default false. If True, the running process captures the
            stderr, and appends it to the returned Exception message in case of errors.
    """
    if path not in sys.path:
        sys.path.insert(0, path)

    entry_point_type = _entry_point_type.get(path, name)

    if entry_point_type is _entry_point_type.PYTHON_PACKAGE:
        modules.install(path, capture_error)
    elif entry_point_type is _entry_point_type.PYTHON_PROGRAM and modules.has_requirements(path):
        modules.install_requirements(path, capture_error)

    if entry_point_type is _entry_point_type.COMMAND:
        os.chmod(os.path.join(path, name), 511)

sagemaker-training-toolkit/entry_point.py at a5b15b37c2d349d71cd050faf3680051a9e0f391 · aws/sagemaker-training-toolkit · GitHub

has_requirements メソッドでrequirements.txt の存在有無をチェックしています

def has_requirements(path):  # type: (str) -> None
    """Check whether a directory contains a requirements.txt file.
    Args:
        path (str): Path to the directory to check for the requirements.txt file.
    Returns:
        (bool): Whether the directory contains a requirements.txt file.
    """
    return os.path.exists(os.path.join(path, "requirements.txt"))

sagemaker-training-toolkit/modules.py at a5b15b37c2d349d71cd050faf3680051a9e0f391 · aws/sagemaker-training-toolkit · GitHub

requirements.txt が存在する場合、 install_requreiments が呼び出されpip install -r requirements.txt が実行されることを確認できました。 以上の処理が学習ジョブの内部で実行されることで、3rd party ライブラリがインストールされます。

third_party_library フォルダ以下に requirements.txt を用意します

gist89220b7838c4ec95873fec086bdb1bdd

optuna を読み込み、ハイパーパラメータチューニングを行う処理を学習スクリプトに実装します。

gistf5a0f126bef449db4d09bfe0c5b2af8d

学習設定を渡した学習ジョブをSageMakerで実行

reqirements.txt で3rd party ライブラリをインストールする場合、Estimator クラスに特別渡す必要がある引数はありません。学習ジョブの実行開始時に、自動的にrequirements.txt を検出してライブラリをインストールしてくれます

以下のフォルダ構成で学習ジョブを実行します。

requirements.txt を、実行する学習スクリプトと同じフォルダ内に配置します。

src
 ├── third_party_library_scirpt_mode_trainer.ipynb
 │
 └── third_party_library
       ├── third_party_library_scirpt_mode.py
       └── requirements.txt

gist4f9f9d1d296b2e7fad722ed3f07adf07

学習ジョブが実行できていることを確認できます

third party custom script training job

学習ジョブのCloud Watch ログを確認すると、学習スクリプトの実行開始時に /miniconda3/bin/python -m pip install -r requirements.txt が実行されていることを確認できます。

third party custom script log

注意点として、requirements.txt でインストールする場合、実行順序が担保されません。そのため、依存関係があるライブラリをインストールする場合は、requirements.txt に依存関係のないライブラリだけ記述して、学習スクリプト内でsubprocess などを用いて明示的に後からインストールする必要があります。ex) subprocess.run(['pip', 'install', 'optuna'])

ライブラリの依存関係を維持してインストールする手順を classmethod さんのブログで紹介してくれています。

【加藤さん向け】オンプレで動かす機械学習パイプラインをSagemaker用に変更するときのポイント【社内共有】 | DevelopersIO

AWS提供のコンテナ環境に自前ライブラリを追加して実行する方法

学習データの準備 (S3にアップロード)

AWS提供のコンテナ環境で実行する方法」で既にS3にデータをアップロード済みなので省略

学習スクリプトの準備

学習データの前処理を別ライブラリに切り出して、実行する学習スクリプト中で読み込んでみます。

以下のような前処理の実装を記述した preprocess.py_init.py をmy_custom_library フォルダ以下に追加します。

gist5997f9ba7bf4e43633eef7fb5d8e4ce9

実行する学習スクリプト側で、preprocess.py中の処理を呼び出します

gista3e01d92e2e5747e163f620a7b771664

学習設定を渡した学習ジョブをSageMakerで実行

自前ライブラリを使用する場合、Estimator クラスのdependencies に自前ライブラリのフォルダ名のリストを指定する必要があります。

公式ドキュメントの説明を見ると、dependencies に設定したフォルダは、entrypoint に設定した学習スクリプトと同じパスでSageMaker のコンテナ中に配置されると説明されています。

SageMaker SDK Document: dependecies

Estimators — sagemaker 2.99.0 documentation

sagemaker sdk のEsitmatorで実際に処理される内容を見ると、entrypoint のファイルとdependencies のファイルを格納した圧縮ファイルがS3にアップロードされることがわかります。

def tar_and_upload_dir(
    session,
    bucket,
    s3_key_prefix,
    script,
    directory=None,
    dependencies=None,
    kms_key=None,
    s3_resource=None,
    settings: Optional[SessionSettings] = None,
):
    """Package source files and upload a compress tar file to S3.
    The S3 location will be ``s3://<bucket>/s3_key_prefix/sourcedir.tar.gz``.
    If directory is an S3 URI, an UploadedCode object will be returned, but
    nothing will be uploaded to S3 (this allow reuse of code already in S3).
    If directory is None, the script will be added to the archive at
    ``./<basename of script>``. If directory is not None, the (recursive) contents
    of the directory will be added to the archive. directory is treated as the base
    path of the archive, and the script name is assumed to be a filename or relative path
    inside the directory.

sagemaker-python-sdk/fw_utils.py at 858e943944304d52adc0beb3ada263a06af8cb23 · aws/sagemaker-python-sdk · GitHub

学習ジョブ実行時にこの圧縮ファイルを展開して使用するため、同じパスにdependencies に設定したファイルと、entrypointに設定したファイルが配置されるという理屈です。

今回はmy_custom_library 以下のファイルを読み込むため、[’my_custom_library’]を渡します。

以下のフォルダ構成で独自スクリプト・ライブラリ・実行ノートブックを配置しています

src
 ├── my_library_script_mode_trainer.ipynb
 │
 ├── my_custom_library
 │      ├── __init__.py
 │      └── preprocess.py
 │      
 └── custom_script
         └── my_library_script_mode.py

gist4827e1568d0db3b91836c6b9fe2a153e

SageMaker のコンソール画面から学習ジョブが実行できていることを確認できます

my library custom script training job

S3に実行コードの圧縮ファイルがアップロードされています

source file

圧縮ファイルを回答すると、entrypoint と denpendencies のファイルがあることを確認できます。

source dir

まとめ

SageMaker 学習ジョブで独自スクリプトを実行する3パターンの方法を取り上げました。AWSが提供するコンテナ内部で行われている処理は sagemaker training toolkit の実装を見ることで確認できます。Script Mode では requirements.txt , 独自ライブラリを追加することでAWS提供のコンテナ環境を拡張することができますが、余計な依存関係を排除して独自のコンテナ環境で学習ジョブを実行する場合はカスタムコンテナを利用します。

参考

SageMaker で学習ジョブを実行する ~組み込みアルゴリズム~

SageMaker で学習ジョブを実行する手順をまとめます。 記事中の実行コード

https://github.com/nsakki55/code-for-blogpost/blob/main/sagemaker_training/built-in-algorithm.ipynbgithub.com

SagaMaker を用いたモデルの学習

SageMaker では学習に必要なスクリプトやライブラリをコンテナベースで管理し、学習に必要なインフラ管理を自動で行なってくれます。

思想としては、データサイエンティストが面倒なインフラ管理や推論サービスの作業を行わず、MLモデルの開発に集中できることが SageMaker を使用するメリットとなっています。

SageMaker Training Jobが行なっていることは、大きく分けると以下の流れになります

  • S3から入力データを読み込み
  • 学習用コンテナを実行
  • モデルアーティファクトをS3に書き戻す

SageMaker Training Jobの概要
出典:Amazon SageMaker でモデルをトレーニングする - Amazon SageMaker

データ入出力を利用するためのデータパスの対応や、コンテナ環境の準備など、SageMaker で学習を実行するためのルールがあります。

SageMakerで学習を実行する場合、3つパターンが存在します

コード量を少なくし手軽にモデルの学習を実行できるのは組み込みアルゴリズムを利用する場合ですが、実装の柔軟性は低くなります。一方、独自スクリプトや独自コンテナを利用する場合、コード量は増えますが実装の柔軟性は高くなり、独自のアルゴリズムでの学習を実行することができるようになります。

SageMaker で学習ジョブを実行するパターン

初めてSageMakerでモデルの学習を実行する際、実行パターンの多さ・守るルールの多さに面食らってしまう人が多いと思います。ドキュメントやサンプルコードは豊富に提供されていますが、どのパターンの学習方法で、最低限必要な要素は一体何なのかを把握するのが大変です。

今回は3つのパターンで独自の予測モデルの学習を行い、学習を実行する流れを整理ます。

SageMaker ではモデル学習のために以下のようなサービスを提供してくれていますが、本記事では扱いません。

  • HyperparameterTuning
  • SagaMaker Model Monitoring
  • Clarify
  • Debugger
  • Endpoint

...

文量が多くなるため、組み込みアルゴリズム・独自スクリプト・独自コンテナを使用するケースで記事を分けます。

この記事では組み込みアルゴリズムを利用した学習の流れを取り扱います。

AWS SDK・SageMaker SDK

SageMaker を操作する際、AWS SDK (boto3)とSageMaker SDK (sagemaker)の二つの方法があります。それぞれのユースケースは以下のように公式で説明されています。

参考:Amazon SageMaker 紹介 & ハンズオン(2018/07/25 実施)

create-endopoint
create-notebook-instance
create-training-job
delete-endopoint
delete-notebook-instance
describe-endpoint
describe-notebook-instance
estimator = Tensorflow(...)
estimator.set_hypyerprameters(...)
estimator.fit(...)
predictor = estimator.deploy(...)
Predictor.predict(...)

データセット・モデル

今回は広告をクリックする確率(CTR)を予測を予測する二値分類モデルの学習を行います。

データセットは kaggle の avazu-ctr-predictionのデータセットを使用します。

Click-Through Rate Prediction | Kaggle

特徴量前処理はOneHotEncoding を行い、線形回帰モデルを使用します。 AWSのドキュメントに合わせ、本文以下では線形学習アルゴリムと呼ぶことにします。

組み込みアルゴリズム

組み込みアルゴリズムを利用する場合、解きたい問題に合わせてAWS が提供しているアルゴリズムの中から適切なものを選択する必要があります。

解きたい問題ごとのアルゴリズムの一覧は公式ドキュメント中にまとめられています。 Amazon を使用する SageMaker 組み込みアルゴリズムまたは事前学習済みモデル - Amazon SageMaker

今回は教師あり学習アルゴリズムの二値分類問題を解くので、「線形学習アルゴリズム(Linear Lerner)」を利用します。
線形学習アルゴリズム - Amazon SageMaker

組み込みアルゴリズムで学習を実行するために必要なことは以下の2点です

  1. 指定された形式のデータをS3に用意
  2. AWSが提供する docker image を指定して学習を実行

アルゴリズムごとにサポートされているデータ形式が異なります

例えば線形学習モデル、XGBoostでは

  • 線形学習モデル : recordIO-wrapped protobuf , CSV
  • XGBoost : CSV , libsvm

データ形式がサポートされています。

指定された形式のデータをS3に用意

AWS SDK, SageMaker SDKを利用する設定、データの読み込みを行います。

import yaml
import pandas as pd
import sagemaker
import boto3

SETTING_FILE_PATH = "../config/settings.yaml"
DATA_FOLDER_PATH = "avazu-ctr-prediction"

with open(SETTING_FILE_PATH) as file:
    aws_info = yaml.safe_load(file)
        
sess = sagemaker.Session()
role = aws_info['aws']['sagemaker']['role']
bucket = aws_info['aws']['sagemaker']['s3bucket']
region = aws_info['aws']['sagemaker']['region']

sm = boto3.client('sagemaker')
s3 = boto3.client('s3')

prefix = 'built-in-algorithm-training'

df_train = pd.read_csv(os.path.join(DATA_FOLDER_PATH, "train"), dtype="object")

データの前処理を行います。今回はデータを train ,validation, test に分割して、OneHotEncoding を行います。

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder

feature_columns = ['C1', 'banner_pos', 'site_category', 'app_category', 'device_type', 'device_conn_type', 'C15', 'C16', 'C18']

df_train, df_test = train_test_split(df_train, train_size=0.7, random_state=0, shuffle=True)
df_train, df_validation = train_test_split(df_train, train_size=0.8, random_state=0, shuffle=True)

one_hot_encoder = OneHotEncoder(handle_unknown='ignore')

y_train = df_train['click'].to_numpy()
X_train = one_hot_encoder.fit_transform(df_train[feature_columns]).toarray()

y_validation = df_validation['click'].to_numpy()
X_validation = one_hot_encoder.transform(df_validation[feature_columns]).toarray()

y_test = df_test['click'].to_numpy()
X_test = one_hot_encoder.fit_transform(df_test[feature_columns]).toarray()

線形学習アルゴリズムがサポートしているrecordIO-wrapped protobuf 形式に変換し、S3へ保存します

import numpy as np 
import io
import sagemaker.amazon.common as smac

def upload_protobuf_to_s3(data_type: str, X: np.ndarray, y: np.ndarray) -> None:
    file_name = f'{data_type}.data'
    f = io.BytesIO()
    smac.write_numpy_to_dense_tensor(f, X.astype("float32"), y.astype("float32"))
    f.seek(0)
    
    boto3.Session().resource("s3").Bucket(bucket).Object(
        os.path.join(prefix, data_type, file_name)
    ).upload_fileobj(f)
    
    
upload_protobuf_to_s3('train', X_train, y_train)
upload_protobuf_to_s3('validation', X_validation, y_validation)
upload_protobuf_to_s3('test', X_test, y_test)

S3にデータが保存されていることを確認します。

train data
validation data
test data

学習ジョブ名と、train, validation, test データのS3パス、学習時の生成ファイルの出力先パスの設定を保持しておきます

job_name = "built-in-linear-learner-ctr-prediction-" + strftime("%Y%m%d-%H-%M-%S", gmtime())

output_location = f"s3://{bucket}/{prefix}/output"
s3_train_data = f"s3://{bucket}/{prefix}/train/{train_file}"
s3_validation_data = f"s3://{bucket}/{prefix}/validation/{validation_file}"
s3_test_data = f"s3://{bucket}/{prefix}/test/{test_file}"

組み込みアルゴリズムの学習の実行方法は3つあります

  • SageMaker SDKのEstimator クラスに、AWSが提供する線形学習アルゴリズム用のdocker imageを渡して実行する方法
  • AWS SDK で、AWSが提供する線形学習アルゴリズム用のdocker imageを指定して学習ジョブを作成する方法
  • SageMaker SDKのLinearLearnerクラスを使用する方法

SageMaker SDKのEstimator クラスを利用する方法

SageMaker SDKimage_uris.retrieve を用いて組み込みアルゴリズムのdocker imageのURIを取得できます。

AWSが提供しているdocker imageの一覧はこちらにまとまっています

アジアパシフィック (東京) (ap-northeast-1) の Docker レジストリパスとサンプルコード - Amazon SageMaker

取得したdocker image uriをEsimatorクラスに渡すことで、組み込みアルゴリズムを利用できます。

AWS提供の線形学習アルゴリズムの内部では以下の処理が行われます

  • 入力データの正規化
  • validation データを用いたハイパーパラメータチューニング
  • test データに対して評価
  • 学習済みモデルをS3にアップロード

線形学習の仕組み - Amazon SageMaker

set_hyperparameters で学習ハイパーパラメータの値を設定します。

fit メソッドにS3のデータパスを渡すことで学習ジョブを実行できます。train, ,validation, test の3つのデータを渡すことができます。

from sagemaker import image_uris
import sagemaker

# AWSが提供するdocker imageのURIを取得
container = image_uris.retrieve(region=region, framework="linear-learner")
print(container) # 351501993468.dkr.ecr.ap-northeast-1.amazonaws.com/linear-learner:1

# Estimatorクラスにdocker imageを渡す
linear = sagemaker.estimator.Estimator(
    container,
    role,
    instance_count=1,
    instance_type="ml.m5.large",
    output_path=output_location,
    sagemaker_session=sess,
)

# 学習ジョブの実行
linear.set_hyperparameters(predictor_type="binary_classifier", mini_batch_size=200)
linear.fit({"train": s3_train_data, "validation": s3_validation_data, "test": s3_test_data}, job_name=job_name)

SageMaker のAWSコンソール画面で、学習ジョブが実行されていることを確認できます。

training job (SageMaker SDK)

CloudWatchで学習ログを確認すると、データが読み込まれモデルの学習が進んでいることを確認できます。

学習ジョブログ1
学習ジョブログ2

学習ジョブ作成時に設定した出力先S3パスに学習済みモデルが保存されていることを確認できます

参考:SageMaker SDKを用いて組み込みアルゴリズムを実行するサンプルノートブック An Introduction to Linear Learner with MNIST — Amazon SageMaker Examples 1.0.0 documentation

AWS SDKを利用する方法

AWS SDK を利用する場合も、SaeMaker SDK で学習ジョブを実行する場合の流れと一緒です

  1. 指定された形式のデータをS3に用意
  2. AWSが提供する docker image を指定して学習を実行

学習データをS3に用意する手順はSageMaker SDKを利用する場合と共通なので、省略します。

学習ジョブの実行はcreate_training_job で行い、SageMaker SDKのEstimatorを利用する場合と同様、学習データのS3パス・AWSが提供する線形学習アルゴリズムのdocker image を設定する必要があります。
SageMaker — Boto3 Docs 1.24.29 documentation

job_name = "built-in-linear-learner-ctr-prediction-aws-sdk" + strftime("%Y%m%d-%H-%M-%S", gmtime())
container = image_uris.retrieve(region=region, framework="linear-learner")

linear_training_params = {
    "RoleArn": role,
    "TrainingJobName": job_name,
    "AlgorithmSpecification": {"TrainingImage": container, "TrainingInputMode": "File"},
    "ResourceConfig": {"InstanceCount": 1, "InstanceType": "ml.m5.large", "VolumeSizeInGB": 10},
    "InputDataConfig": [
        {
            "ChannelName": "train",
            "DataSource": {
                "S3DataSource": {
                    "S3DataType": "S3Prefix",
                    "S3Uri": s3_train_data,
                    "S3DataDistributionType": "ShardedByS3Key",
                }
            },
            "CompressionType": "None",
            "RecordWrapperType": "None",
        },
        {
            "ChannelName": "validation",
            "DataSource": {
                "S3DataSource": {
                    "S3DataType": "S3Prefix",
                    "S3Uri": s3_validation_data,
                    "S3DataDistributionType": "FullyReplicated",
                }
            },
            "CompressionType": "None",
            "RecordWrapperType": "None",
        },
        {
            "ChannelName": "test",
            "DataSource": {
                "S3DataSource": {
                    "S3DataType": "S3Prefix",
                    "S3Uri": s3_test_data,
                    "S3DataDistributionType": "FullyReplicated",
                }
            },
            "CompressionType": "None",
            "RecordWrapperType": "None",
        },
    ],
    "OutputDataConfig": {"S3OutputPath": output_location},
    "HyperParameters": {
        "mini_batch_size": "300",
        "predictor_type": "binary_classifier",
        "epochs": "5",
        "num_models": "1",
        "loss": "absolute_loss",
    },
    "StoppingCondition": {"MaxRuntimeInSeconds": 60 * 60},
}

sm = boto3.client('sagemaker')
sm.create_training_job(**linear_training_params)

学習ジョブが実行されていることを確認できます。

SageMaker Training job

AWS SDKを用いて組み込みアルゴリズムを実行するサンプルノートブック Breast Cancer Prediction — Amazon SageMaker Examples 1.0.0 documentation

SageMaker SDKのLinearLerner クラスを利用する方法

Estimator クラスを利用する際は、AWS提供のdocker image URIを指定する必要がありましたが、線形学習アルゴリズム用のLinearLernerクラスを使用すれば、docker image の指定を省略することができます。
LinearLearner — sagemaker 2.99.0 documentation

内部実装を見ると、線形学習アルゴリズムのdocker image を取得しているので、実行される処理はEstimator を利用する場合と同じです。
sagemaker-python-sdk/linear_learner.py at 255a339ae985041ef47e3a80da91b9f54bca17b9 · aws/sagemaker-python-sdk · GitHub

LinearLernerのように、特定のアルゴリズムに特化したEstimator クラスは限られているので注意してください。

LinearLernerクラスで学習ジョブを実行する場合、線形学習アルゴリズムのdocker imageの取得を内部で行なってくれるため、学習データの準備だけ行います。

LinearLearner を使用する場合、入力データを Record 形式にする必要があります。

LinearLearner — sagemaker 2.99.0 documentation

学習ジョブ実行時に Record はprotocol buf 形式にシリアライズされ、S3へ自動アップロードされます。

train = linear.record_set(X_train.astype('float32'), labels=y_train.astype('float32'), channel='train')
validation = linear.record_set(X_validation.astype('float32'), labels=y_validation.astype('float32'), channel='validation')
test = linear.record_set(X_test.astype('float32'), labels=y_test.astype('float32'), channel='test')

Estimatorクラスを利用する場合と同様、LinearLernerクラスを作成し、 fit メソッドで学習ジョブを登録できます。 fit メソッドにデータの Record クラスのリストを渡すことで、学習データを渡すことができます。

job_name = "built-in-linear-learner-ctr-prediction" + strftime("%Y%m%d-%H-%M-%S", gmtime())

linear = sagemaker.LinearLearner(
    role=role,
    train_instance_count=1,
    train_instance_type="ml.m5.large",
    output_path=output_location,
    predictor_type="binary_classifier",
    sagemaker_session=sess
)

linear.fit([train, validation, test], mini_batch_size=200, wait=False, job_name=job_name)

学習ジョブが実行されていることを確認できます。

training job (LinearLerner)

学習に必要なデータがS3にアップロードされていることを確認できます

入力データ設定
S3 学習データ

SageMaker SDKのLinearLernerを用いて組み込みアルゴリズムを実行するサンプルノートブック

Build multiclass classifiers with Amazon SageMaker linear learner — Amazon SageMaker Examples 1.0.0 documentation

まとめ

SageMaker が提供する組み込みアルゴリズムを用いて学習ジョブを実行する3つの方法を説明しました。

実際の本番環境で使用するMLモデルの学習を行う場合、中身がブラックボックスの組み込みアルゴリズムを使用する場面は少ないかと思います。

別記事で独自スクリプト・独自コンテナを利用してSageMaker で学習ジョブを実行する方法をまとめる予定です。

参考

SageMaker Feature Store を使ってみる

SageMaker Feature Store の使用方法の一連の流れを解説します。
記事中での実行コード github.com

Feature Store とは

まずはML界隈で知られるFeature Store の概念について説明します。

Feature Store は「機械学習で使用される特徴量の一元管理を行うためのデータ管理システム」です。

似たような概念にData Lake や Data Warehouse があげられますが、それぞれ

  • Data Lake : 幅広い用途への利用を想定した、データの一元管理システム
  • Data Warehouse: 分析のために事前に定めたスキーマを持つ、構造化データ管理システム

といった役割を持ちます。

対して、Feature Store はMLで使用される特徴量の管理に特化した役割を持つことがポイントです。

Feature Storeの立ち位置
出典: Feature Stores: Components of a Data Science Factory [Guide] - neptune.ai

以下の記事でFeature Store の機能要件などをまとめてくださってます。 Feature Storeについてふんわり理解する - Re:ゼロから始めるML生活

Feature Store を利用することの利点として

  • 特徴量の保存・探索・更新・共有が容易になる
  • ストリーミング・バッチ両方でのデータの保存が可能
  • Offline・Online のFeature Store の使い分けが可能

があげられます

SageMaker Feature Store
出典: Amazon を使って特徴を作成、保存、共有する SageMaker Feature Store - Amazon SageMaker

Sagemaker Feature Store では以下のようなデータ取り込みの方法が検討できます

  • kafka, kinesis などのストリーミングソースからの取り込み
  • DataWrangler と連携した取り込み
  • Spark によるバッチデータ取り込み
  • SageMaker の Python SDK によるデータ取り込み

今回は、SageMaker Feature StoreのFeatureGroup 作成・データ登録・データ取得の一連の流れを実行します。

データ準備

kaggle のavazu-ctr-prediction のデータセットを今回は Feature Store へ登録します。

https://www.kaggle.com/c/avazu-ctr-prediction

Feature Store を使ってみる

Sagemaker Feature Store を利用する流れは以下のようになります

  • データ準備
  • Feature Group を作成
  • Feature Group にデータを登録
  • Feature Group からデータを取得

順番に実行していきましょう。

なお、記事中に出てくるコードは上から順番に実行していくことを想定しています。

データ準備

AWSの設定と、Feature Store を利用するのに必要なboto3 client の準備をします。

設定ファイルと、avazu-ctr-prediction のデータセットをローカルのPCに用意しておきます。

import yaml
import boto3

import sagemaker

# AWSの設定ファイル・データのあるフォルダのローカルパス
SETTING_FILE_PATH = "../config/settings.yaml"
DATASET_FOLDER_PATH = "./avazu-ctr-prediction"

with open(SETTING_FILE_PATH) as file:
    aws_info = yaml.safe_load(file)

    
sess = sagemaker.Session()
role = aws_info['aws']['sagemaker']['role']
bucket = aws_info['aws']['sagemaker']['s3bucket']
region = boto3.Session().region_name
sm = boto3.client('sagemaker', region_name=region)
featurestore_runtime = boto3.client("sagemaker-featurestore-runtime", region_name=region)

読み込んでいる settings.yamlは以下のようなyaml ファイルです

aws:
  sagemaker:
    role: "arn:aws:iam::{account_id}:role/service-role/AmazonSageMaker-ExecutionRole"
    s3bucket: "{bucket_name}"

データを読み込みます。train データサイズが6.3GBあり容量が大きいので、データ数を間引いています。

from pathlib import Path
import pandas as pd

dataset_folder = Path(DATASET_FOLDER_PATH)
df_train = pd.read_csv(dataset_folder / "train")
df_train_partial = df_train[df_train.index % 100 == 0] # データ数を減らします
df_train_partial.to_csv(dataset_folder / "train_partial", index=False)

df_train_partial = pd.read_csv(dataset_folder / "train_partial")
df_train_partial.head()
id click hour C1 banner_pos site_id site_domain site_category app_id app_domain app_category device_id device_ip device_model device_type device_conn_type C14 C15 C16 C17 C18 C19 C20 C21 event_time
0 1.000009e+18 0 14102100 1005 0 1fbe01fe f3845767 28905ebd ecad2386 7801e8d9 07d7df22 a99f214a ddd2926e 44956a24 1 2 15706 320 50 1722 0 35 -1 79 2022-05-11T09:22:24Z
1 1.001579e+19 0 14102100 1005 1 856e6d3f 58a89a43 f028772b ecad2386 7801e8d9 07d7df22 a99f214a 4375586d 5ec45883 1 0 19772 320 50 2227 0 687 100075 48 2022-05-11T09:22:24Z
2 1.002948e+18 0 14102100 1005 0 85f751fd c4e18dd6 50e219e0 1779deee 2347f47a f95efa07 a99f214a ab9a5222 2ee63ff8 1 0 20596 320 50 2161 0 35 -1 157 2022-05-11T09:22:24Z
3 1.004511e+19 0 14102100 1005 0 85f751fd c4e18dd6 50e219e0 51cedd4e aefc06bd 0f2161f8 a99f214a bbe53381 542422a7 1 0 19743 320 50 2264 3 427 100000 61 2022-05-11T09:22:24Z
4 1.005990e+19 0 14102100 1005 0 1fbe01fe f3845767 28905ebd ecad2386 7801e8d9 07d7df22 a99f214a 8a014cbb 04f5b394 1 0 15702 320 50 1722 0 35 -1 79 2022-05-11T09:22:24Z

Sagamker Feature Store では、Feature Group 作成時に以下の特徴名を渡す必要があります。

  • record_identifier_name: データの各レコードを一意に識別する特徴
  • event_time_feature_name: 各レコードの作成・更新時刻を表す特徴

Feature Store APIs — sagemaker 2.99.0 documentation
Amazon SageMaker Feature Store の使用を開始する - Amazon SageMaker

作成・更新時刻のデータ形式は、ISO-8601 文字列にする必要があります。

yyyy-MM-dd'T'HH:mm:ssZ と yyyy-MM-dd'T'HH:mm:ss.SSSZ の形式がサポートされています。

今回使用するデータに id , event_time 特徴を追加しましょう。 id はすでに存在しますが、簡単のためにindex番号に置き換えています。

# レコードの識別子
df_train_partial['id'] = df_train_partial.index

# レコードの生成時刻を表す列を追加
output_date = datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ")
df_train_partial['event_time'] = output_date

Feature Store に保存する特徴は以下のカラムになります

record_identifier_name = "id"  # レコードの識別子
event_time_feature_name = "event_time"  # レコードの生成時刻

feature_names = [
    "click",
    "hour",
    "C1",
    "banner_pos",
    "site_id",
    "site_domain",
    "site_category",
    "app_id",
    "app_domain",
    "app_category",
    "device_id",
    "device_ip",
    "device_model",
    "device_type",
    "device_conn_type",
    "C14",
    "C15",
    "C16",
    "C17",
    "C18",
    "C19",
    "C20",
    "C21",
] + [record_identifier_name, event_time_feature_name]

Feature Group を作成

Feature Group のグループ名を準備します。同じグループ名で複数のFeature Group を作成することはできないので注意してください。

Ofiline Feature Store を作成する際のS3 bucket 中のprefix を用意しておきます

from sagemaker.feature_store.feature_definition import FeatureDefinition, FeatureTypeEnum
from sagemaker.feature_store.feature_group import FeatureGroup

feature_group_name = "ctr-prediction-group" + f"-{strftime('%d-%H-%M-%S', gmtime())}"
prefix = "ctr-prediction-feature-store"
print(feature_group_name) # ctr-prediction-group-11-15-00-25

Feature Group オブジェクトを作成する際、各特徴量のデータ型定義 feature_definitions を設定する必要があります。

Feature Store APIs — sagemaker 2.99.0 documentation

feature_definitions はFeatureDefinitionクラスのリストを受け取ります。FeatureDefinitionは特徴量名とFeatureTypeEnum をフィールドに持ちます。

Feature Store APIs — sagemaker 2.99.0 documentation

FeatureGroup では3つのデータ型があり、python のデータ型との対応は以下のようになっています

  • String : 文字列
  • Fractional : IEEE 64 ビット浮動小数点値
  • Integral : Int64-64 ビット符号付き整数値

デフォルトのデータ型はString となっているため、データセットの列がfloat or long 型でない場合、String 型として認識されます。

FeatureGroupへの feature_definitions の設定の方法には

  • load_feature_definitions を使用して Feature Definition のスキーマを自動で識別する
  • 自分で定義したFeature Definition のスキーマを使用する

2通りの方法があるので、2つの方法を試してみます。

load_feature_definitions を使用して Feature Definition のスキーマを自動で識別する

FeatureGroup クラスに feature_definitions を渡さず呼び出します.

feature_group_auto = FeatureGroup(name=feature_group_name, sagemaker_session=sess)

pandas DataFrame をそのまま load_feature_definitions メソッドに入力すると、ValueErrorが出てしまいます。

# pandas の DataFrameを直接入力すると ValueError がでる
try:
    feature_group_auto.load_feature_definitions(data_frame=df_train_partial)
except ValueError as e:
    print(e)

# Failed to infer Feature type based on dtype object for column site_id.

これは、pandas DataFrameの文字列object 型と、FeatureGroupの文字列型STRING が対応していないことが原因です。

df_train_partial.dtypes

# 出力
id                  float64
click                 int64
hour                  int64
C1                    int64
banner_pos            int64
site_id              object
site_domain          object
site_category        object
app_id               object
app_domain           object
app_category         object
device_id            object
device_ip            object
device_model         object
device_type           int64
device_conn_type      int64
C14                   int64
C15                   int64
C16                   int64
C17                   int64
C18                   int64
C19                   int64
C20                   int64
C21                   int64
event_time           object
dtype: object

手間ですが、object 型をFeatureGruopが認識できる string 型に明示的に変換します。

# Pandas の object 型を Feature Group が認識できる string 型に変換する
def cast_object_to_string(df: pd.DataFrame) -> pd.DataFrame:
    df_tmp = df.copy()
    for label in df_tmp.columns:
        if df_tmp.dtypes[label] == 'object':
            df_tmp[label] = df_tmp[label].astype("str").astype("string")
    return df_tmp

object 型が string 型に変換されていることを確認します

cast_object_to_string(df_train_partial).dtypes

# 出力
id                  float64
click                 int64
hour                  int64
C1                    int64
banner_pos            int64
site_id              string
site_domain          string
site_category        string
app_id               string
app_domain           string
app_category         string
device_id            string
device_ip            string
device_model         string
device_type           int64
device_conn_type      int64
C14                   int64
C15                   int64
C16                   int64
C17                   int64
C18                   int64
C19                   int64
C20                   int64
C21                   int64
event_time           string
dtype: object

load_feature_definitions メソッドを呼び出して、FeatureGroupに入力データのFeatureDenfinitions のスキーマを自動識別させます。

feature_group_auto.load_feature_definitions(data_frame=cast_object_to_string(df_train_partial))

# 出力
[FeatureDefinition(feature_name='id', feature_type=<FeatureTypeEnum.FRACTIONAL: 'Fractional'>),
 FeatureDefinition(feature_name='click', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='hour', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='C1', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='banner_pos', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='site_id', feature_type=<FeatureTypeEnum.STRING: 'String'>),
 FeatureDefinition(feature_name='site_domain', feature_type=<FeatureTypeEnum.STRING: 'String'>),
 FeatureDefinition(feature_name='site_category', feature_type=<FeatureTypeEnum.STRING: 'String'>),
 FeatureDefinition(feature_name='app_id', feature_type=<FeatureTypeEnum.STRING: 'String'>),
 FeatureDefinition(feature_name='app_domain', feature_type=<FeatureTypeEnum.STRING: 'String'>),
 FeatureDefinition(feature_name='app_category', feature_type=<FeatureTypeEnum.STRING: 'String'>),
 FeatureDefinition(feature_name='device_id', feature_type=<FeatureTypeEnum.STRING: 'String'>),
 FeatureDefinition(feature_name='device_ip', feature_type=<FeatureTypeEnum.STRING: 'String'>),
 FeatureDefinition(feature_name='device_model', feature_type=<FeatureTypeEnum.STRING: 'String'>),
 FeatureDefinition(feature_name='device_type', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='device_conn_type', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='C14', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='C15', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='C16', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='C17', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='C18', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='C19', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='C20', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='C21', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='event_time', feature_type=<FeatureTypeEnum.STRING: 'String'>)]

今回は自前で定義したスキーマのFeatureDefinitionsを使用するので、作成したFeatureGroupオブジェクトは削除しておきます

del feature_group_auto

自分で定義したFeature Definition のスキーマを使用する

自分で定義したFeatureDefinition を利用する場合、特徴名と対応するFeatureTypeEnumをフィールドにもつFeatureDefinitionのリストを作成します。

今回は全てSTRING型として扱います。

feature_definitions = [
    FeatureDefinition(feature_name=feature_name, feature_type=FeatureTypeEnum.STRING)
    for feature_name in feature_names
]

feature_group_original = FeatureGroup(name=feature_group_name, feature_definitions=feature_definitions, sagemaker_session=sess)
feature_group_original.feature_definitions

# 出力
[FeatureDefinition(feature_name='click', feature_type=<FeatureTypeEnum.STRING: 'String'>),
 FeatureDefinition(feature_name='hour', feature_type=<FeatureTypeEnum.STRING: 'String'>),
 FeatureDefinition(feature_name='C1', feature_type=<FeatureTypeEnum.STRING: 'String'>),
 FeatureDefinition(feature_name='banner_pos', feature_type=<FeatureTypeEnum.STRING: 'String'>),
 FeatureDefinition(feature_name='site_id', feature_type=<FeatureTypeEnum.STRING: 'String'>),
 FeatureDefinition(feature_name='site_domain', feature_type=<FeatureTypeEnum.STRING: 'String'>),
 FeatureDefinition(feature_name='site_category', feature_type=<FeatureTypeEnum.STRING: 'String'>),
 FeatureDefinition(feature_name='app_id', feature_type=<FeatureTypeEnum.STRING: 'String'>),
 FeatureDefinition(feature_name='app_domain', feature_type=<FeatureTypeEnum.STRING: 'String'>),
 FeatureDefinition(feature_name='app_category', feature_type=<FeatureTypeEnum.STRING: 'String'>),
 FeatureDefinition(feature_name='device_id', feature_type=<FeatureTypeEnum.STRING: 'String'>),
 FeatureDefinition(feature_name='device_ip', feature_type=<FeatureTypeEnum.STRING: 'String'>),
 FeatureDefinition(feature_name='device_model', feature_type=<FeatureTypeEnum.STRING: 'String'>),
 FeatureDefinition(feature_name='device_type', feature_type=<FeatureTypeEnum.STRING: 'String'>),
 FeatureDefinition(feature_name='device_conn_type', feature_type=<FeatureTypeEnum.STRING: 'String'>),
 FeatureDefinition(feature_name='C14', feature_type=<FeatureTypeEnum.STRING: 'String'>),
 FeatureDefinition(feature_name='C15', feature_type=<FeatureTypeEnum.STRING: 'String'>),
 FeatureDefinition(feature_name='C16', feature_type=<FeatureTypeEnum.STRING: 'String'>),
 FeatureDefinition(feature_name='C17', feature_type=<FeatureTypeEnum.STRING: 'String'>),
 FeatureDefinition(feature_name='C18', feature_type=<FeatureTypeEnum.STRING: 'String'>),
 FeatureDefinition(feature_name='C19', feature_type=<FeatureTypeEnum.STRING: 'String'>),
 FeatureDefinition(feature_name='C20', feature_type=<FeatureTypeEnum.STRING: 'String'>),
 FeatureDefinition(feature_name='C21', feature_type=<FeatureTypeEnum.STRING: 'String'>),
 FeatureDefinition(feature_name='id', feature_type=<FeatureTypeEnum.STRING: 'String'>),
 FeatureDefinition(feature_name='event_time', feature_type=<FeatureTypeEnum.STRING: 'String'>)]

create

作成したFeatureGroupオブジェクトの create メソッドを呼び出すことで、FeatureGroupをSagemaker上に作成します。

Feature Store APIs — sagemaker 2.99.0 documentation

  • s3_uri: Offline Feature Storeでのデータ保存先S3 URI. Falseで offine storeを無効化
  • record_identifier_name:
  • event_time_feature_name : データ中のレコードの生成・更新時刻に対応する特徴名
  • role_arn : FeatureGroup 作成時のRole
  • online_store_kms_key_id : online store のkms key id
  • enable_online_store: online store を有効化するか. default = False
  • offline_store_kms_key_id: offline store のkms key id
  • disable_glue_table_creation: Glue tableの作成有無
  • data_catalog_config: Metadata storeの設定
  • description: FeatureGroupの説明
  • tags: FeatureGroupのタグ
feature_group_original.create(
    s3_uri=f"s3://{bucket}/{prefix}", # offline feature store でデータを保存する S3 URI
    record_identifier_name=record_identifier_name, # レコード識別子のカラム名
    event_time_feature_name=event_time_feature_name, # レコード生成時刻のカラム名
    role_arn=role, 
    enable_online_store=True, # online feature store を作成するか. defualt = False
    description = "Feature Group For CTR Prediciton",
    tags = [{"Key":"author", "Value": "satsuki"},{"Key":"target", "Value": "click"} ]
)

SagaMeker Studio のFeature Store 画面から確認すると、登録されていることが分かります。

FeatureGroup の登録

Details にFeatureGroup 作成時に設定した値が入ってます。offline feature store を作成したので、Table name に自動的に作成されたAWS Glue のテーブル名が入っています。

Feature Group Details

AWS Glue で作成されたテーブルを確認してみます。

FeatureGroup 作成時に設定したS3 URI 以下に接続先のS3パスが登録されています。

AWS Glue

以下のメソッドでもFeatureGroupの詳細を確認することができます

feature_group_original.describe()
sm.list_feature_groups()['FeatureGroupSummaries'][0]

Feature Group にデータを登録

データの登録方法は

の2通の方法があります。

Scale ML feature ingestion using Amazon SageMaker Feature Store | AWS Machine Learning Blog

FeatureGroupのput_record を利用する場合、 FeatureValue クラスのリストを渡します。

Feature Store APIs — sagemaker 2.99.0 documentation

from sagemaker.feature_store.inputs import FeatureValue

record = [
    FeatureValue(feature_name="id", value_as_string=str("-1")),
    FeatureValue(feature_name="event_time", value_as_string=str("2022-05-12T00:00:21Z")),
    FeatureValue(feature_name="click", value_as_string=str("1")),
    FeatureValue(feature_name="C1", value_as_string=str("1005")),
    FeatureValue(feature_name="banner_pos", value_as_string=str("0")),
]
feature_group_original.put_record(record)

boto3 の sagemaker client のput_record を利用する場合は、FeatureName とValueAsString をkeyに持つ辞書型のリストをput_recordに渡す必要があります。

SageMakerFeatureStoreRuntime — Boto3 Docs 1.24.29 documentation

record = [
    {"FeatureName": "id", "ValueAsString": str("-2")},
    {"FeatureName": "event_time", "ValueAsString": str("2022-05-12T00:00:21Z")},
    {"FeatureName": "click", "ValueAsString": str("0")},
    {"FeatureName": "C1", "ValueAsString": str("1002")},
    {"FeatureName": "banner_pos", "ValueAsString": str("1")},
]

featurestore_runtime.put_record(FeatureGroupName=feature_group_name, Record=record)

ingest でpandas のDataFrame をFeatureGroup にまとめて登録することができます。

max_workers を指定することで、並列処理が可能となります。

feature_group_original.ingest(data_frame=df_train, max_workers=4, wait=True)

Feature Group からデータの取得

online feature store の場合はboto3 の sagemaker client の get_record でデータを取得することができます。

SageMakerFeatureStoreRuntime — Boto3 Docs 1.24.29 documentation

get_record では、FeatureGroup のグループ名と、FeatureGroup作成時に設定したレコード識別子の取得したいレコードの番号を渡します。

record_identifier_value = str(-1)
response = featurestore_runtime.get_record(FeatureGroupName=feature_group_name, RecordIdentifierValueAsString=record_identifier_value)
response

# 出力
{'ResponseMetadata': {'RequestId': '2f934a36-601f-4c09-b5d9-d2549372bfaa',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '2f934a36-601f-4c09-b5d9-d2549372bfaa',
   'content-type': 'application/json',
   'content-length': '260',
   'date': 'Fri, 13 May 2022 06:20:10 GMT'},
  'RetryAttempts': 0},
 'Record': [{'FeatureName': 'click', 'ValueAsString': '1'},
  {'FeatureName': 'C1', 'ValueAsString': '-1005'},
  {'FeatureName': 'banner_pos', 'ValueAsString': '0'},
  {'FeatureName': 'id', 'ValueAsString': '-1'},
  {'FeatureName': 'event_time', 'ValueAsString': '2022-05-12T00:00:21Z'}]}

batch_get_record で、複数レコードの取得が可能です

record_identifier_values = ["1", "2", "3", "4"]
batch_response = featurestore_runtime.batch_get_record(Identifiers=[{"FeatureGroupName": feature_group_name, "RecordIdentifiersValueAsString": record_identifier_values}])
batch_response

offline feature store からデータを取得する場合、Athena クエリを利用します。

feature_store_query = feature_group_original.athena_query()
feature_store_table = feature_store_query.table_name
print(feature_store_table) # ctr-prediction-group-11-15-00-25-1652281250

試しに5行データを取得してみます

query_string = """
SELECT *
FROM "{}" LIMIT 5
""".format(
    feature_store_table
)

SQL を実行します。この際、結果の保存先S3パスを指定できます。

feature_store_query.run(query_string=query_string, output_location="s3://" + bucket + "/" + prefix + "/query_results/")
feature_store_query.wait()
dataset = feature_store_query.as_dataframe()
dataset
click hour c1 banner_pos site_id site_domain site_category app_id app_domain app_category device_id device_ip device_model device_type device_conn_type c14 c15 c16 c17 c18 c19 c20 c21 id event_time write_time api_invocation_time is_deleted
0 0 14102100.0 1005.0 0.0 1fbe01fe f3845767 28905ebd ecad2386 7801e8d9 07d7df22 a99f214a 52aa6971 8a4875bd 1.0 0.0 15706.0 320.0 50.0 1722.0 0.0 35.0 -1.0 79.0 10 2022-05-12T00:00:21Z 2022-05-11 15:09:56.893 2022-05-11 15:04:45.000 False
1 1 14102100.0 1005.0 0.0 1fbe01fe f3845767 28905ebd ecad2386 7801e8d9 07d7df22 a99f214a bc7f9471 8b1aa260 1.0 0.0 15705.0 320.0 50.0 1722.0 0.0 35.0 -1.0 79.0 8 2022-05-12T00:00:21Z 2022-05-11 15:17:17.810 2022-05-11 15:12:14.000 False
2 0 14102100.0 1005.0 0.0 3a66a5a5 9e328a4d f028772b ecad2386 7801e8d9 07d7df22 a99f214a e3510321 24f6b932 1.0 0.0 19666.0 300.0 250.0 2253.0 2.0 303.0 100026.0 52.0 12 2022-05-12T00:00:21Z 2022-05-11 15:09:56.642 2022-05-11 15:04:48.000 False
3 1 14102100.0 1005.0 0.0 1fbe01fe f3845767 28905ebd ecad2386 7801e8d9 07d7df22 a99f214a 0c8a8801 c144e605 1.0 0.0 15708.0 320.0 50.0 1722.0 0.0 35.0 -1.0 79.0 17 2022-05-12T00:00:21Z 2022-05-11 15:09:56.642 2022-05-11 15:04:48.000 False
4 1 NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN -1 2022-05-12T00:00:21Z 2022-05-13 06:34:21.256 2022-05-13 06:29:27.000 False

指定したS3パスに結果が出力されていることを確認できます

Athena Query 結果の保存先

まとめ

SageMaker Feature Store の Feature Group の作成・登録・取得までの一連の流れを実行しました。今回はjupyter notebook 上での操作に止まりましたが、kinesis や DataWrangler と連携した操作も可能となっています。

参考