nsakki55 のアウトプットブログ

MLエンジニアの技術周りの備忘録です

SageMakerでハイパーパラメータチューニングをWarmStartで実行してみる

SageMaker Hyperparameter Tuning JobでWarm Startを実行してみます 本文中のコードです

github.com

目次

データセット・モデル

SageMaker Hyperparameter Tuningについて

Hyperparameter Tuning

SagaMaker Hyperparameter Tuning でWarm Startを行う場合、内部で行なっていることは以下の流れになります。

  1. 最適化する指標を設定
  2. 探索するハイパラの設定
  3. 1,2 の設定を元に学習ジョブを作成し、ハイパラを探索

ハイパーパラメータを引数として受けとるように設定したEstimatorの学習ジョブを複数作成し、最適化指標が最もいい学習ジョブを選んでいます。

Estimator作成

学習データの準備

AWSリソースの設定を行います。

import yaml
import sagemaker
import boto3
import json

SETTING_FILE_PATH = "../config/settings.yaml"
DATA_FOLDER_PATH = "../avazu-ctr-prediction"

# AWS リソース設定
with open(SETTING_FILE_PATH) as file:
    aws_info = yaml.safe_load(file)
        
sess = sagemaker.Session()
role = aws_info['aws']['sagemaker']['role']
bucket = aws_info['aws']['sagemaker']['s3bucket']

s3 = boto3.client('s3')

古いデータでハイパラ探索を行った後、新しいデータでWarm Startで探索を行う想定で、データを時系列で二分割します。
それぞれのデータで train, validation データに分割しておきます。

import os

import pandas as pd
from sklearn.model_selection import train_test_split

df_train = (
    pd.read_csv(os.path.join(DATA_FOLDER_PATH, "train.csv"), dtype="object")
    .sort_values(by="hour")
    .reset_index(drop=True)
)

# WarmStartのための親・子データを作成
df_train_parent = df_train[: int(len(df_train) * 0.5)]
df_train_child = df_train[int(len(df_train) * 0.5) :]

# train, validation データに分割
df_train_parent, df_validation_parent = train_test_split(df_train_parent, train_size=0.8, random_state=0, shuffle=False)
df_train_child, df_validation_child = train_test_split(df_train_child, train_size=0.8, random_state=0, shuffle=False)

S3にデータをアップロードしておきます

# S3にアップロード
prefix = 'sagemaker-hyperparameter-tuning'
s3_resource_bucket = boto3.Session().resource("s3").Bucket(bucket)

train_parent_file = "train_parent.csv"
validation_parent_file = "validation_parent.csv"
train_child_file = "train_child.csv"
validation_child_file = "validation_child.csv"

df_train_parent.to_csv(train_parent_file, index=False)
df_validation_parent.to_csv(validation_parent_file, index=False)
df_train_child.to_csv(train_child_file, index=False)
df_validation_child.to_csv(validation_child_file, index=False)

s3_resource_bucket.Object(os.path.join(prefix, "train_parent", train_parent_file)).upload_file(train_parent_file)
s3_resource_bucket.Object(os.path.join(prefix, "validation_parent", validation_parent_file)).upload_file(validation_parent_file)
s3_resource_bucket.Object(os.path.join(prefix, "train_child", train_child_file)).upload_file(train_child_file)
s3_resource_bucket.Object(os.path.join(prefix, "validation_child", validation_child_file)).upload_file(validation_child_file)

学習ジョブに渡すための学習データ、モデルのS3 URIを保持します

output_location = f"s3://{bucket}/{prefix}/output"

s3_train_parent_data = f"s3://{bucket}/{prefix}/train_parent/{train_parent_file}"
s3_validation_parent_data = f"s3://{bucket}/{prefix}/validation_parent/{validation_parent_file}"

s3_train_child_data = f"s3://{bucket}/{prefix}/train_child/{train_child_file}"
s3_validation_child_data = f"s3://{bucket}/{prefix}/validation_child/{validation_child_file}"

学習スクリプト

SageMaker Hyperparameter Tuning で実行する学習スクリプトtrainer.pyを用意します。

SageMaker hyperparameter Tuningで実行する場合、以下の点を実装する必要があります

  • チューニングを行うハイパーパラメータを実行引数に加える
  • 最適化対象のメトリクスが分かるように標準出力する

今回は、alpha, penalty, fit_intercept をチューニングするのでargparseで受け取る引数に追加します。

また、訓練・検証データのloglossとaccuracyをメトリクスとして標準出力するようにします。

import argparse
import os

import joblib
import numpy as np
import pandas as pd
from sagemaker_training import environment
from sklearn.feature_extraction import FeatureHasher
from sklearn.linear_model import SGDClassifier
from sklearn.metrics import accuracy_score, log_loss

feature_columns = [
    "id",
    "click",
    "hour",
    "C1",
    "banner_pos",
    "site_id",
    "site_domain",
    "site_category",
    "app_id",
    "app_domain",
    "app_category",
    "device_id",
    "device_ip",
    "device_model",
    "device_type",
    "device_conn_type",
    "C14",
    "C15",
    "C16",
    "C17",
    "C18",
    "C19",
    "C20",
    "C21",
]

target = "click"


def parse_args():
    env = environment.Environment()
    parser = argparse.ArgumentParser()

    # チューニング対象のハイパーパラメータ
    parser.add_argument("--penalty", type=str, choices=["l1", "l2", "elasticnet"], default="l1")
    parser.add_argument("--alpha", type=float, default=0.00001)
    parser.add_argument("--fit_intercept", type=bool, choices=[True, False], default=True)

    # data directories
    parser.add_argument("--train", type=str, default=os.environ.get("SM_CHANNEL_TRAIN"))
    parser.add_argument("--validation", type=str, default=os.environ.get("SM_CHANNEL_VALIDATION"))

    # model directory
    parser.add_argument("--model_dir", type=str, default=os.environ.get("SM_MODEL_DIR"))

    return parser.parse_known_args()


def load_dataset(path: str) -> (pd.DataFrame, np.array):
    # Take the set of files and read them all into a single pandas dataframe
    files = [os.path.join(path, file) for file in os.listdir(path) if file.endswith("csv")]

    if len(files) == 0:
        raise ValueError("Invalid # of files in dir: {}".format(path))

    raw_data = [pd.read_csv(file, sep=",") for file in files]
    data = pd.concat(raw_data)

    y = data[target]
    X = data[feature_columns]
    return X, y


def preprocess(df: pd.DataFrame):
    feature_hasher = FeatureHasher(n_features=2**24, input_type="string")
    hashed_feature = feature_hasher.fit_transform(np.asanyarray(df.astype(str)))

    return hashed_feature


def start(args):
    print("Training Start.")

    X_train, y_train = load_dataset(args.train)
    X_validation, y_validation = load_dataset(args.validation)

    y_train = np.asarray(y_train).ravel()
    X_train_hashed = preprocess(X_train)

    y_validation = np.asarray(y_validation).ravel()
    X_validation_hashed = preprocess(X_validation)

    hyperparameters = {
        "alpha": args.alpha,
        "penalty": args.penalty,
        "fit_intercept": args.fit_intercept,
        "n_jobs": args.n_jobs,
    }

    model = SGDClassifier(loss="log", random_state=42, **hyperparameters)
    print(model.__dict__)
    model.partial_fit(X_train_hashed, y_train, classes=[0, 1])

    # 最適化メトリクスを標準出力
    print("train logloss: {}".format(log_loss(y_train, model.predict_proba(X_train_hashed))))
    print("train accuracy: {}".format(accuracy_score(y_train, model.predict(X_train_hashed))))
    print("validation logloss: {}".format(log_loss(y_validation, model.predict_proba(X_validation_hashed))))
    print("validation accuracy: {}".format(accuracy_score(y_validation, model.predict(X_validation_hashed))))

    joblib.dump(model, os.path.join(args.model_dir, "model.joblib"))

    print("Training Finished.")


if __name__ == "__main__":
    args, _ = parse_args()
    start(args)

Estimatorクラスの作成

SageMaker Hyperparameter Tuning では異なる引数を与えた学習ジョブを作成するため、学習ジョブのEstimatorを用意する必要があります。

注意点として、Hyperparameter Tuning Jobではlocal modeで実行することはできません。

Error when running hyperparameter tuning job locally · Issue #1960 · aws/sagemaker-python-sdk · GitHub

AWS提供のscikit-learnコンテナ環境で学習スクリプトを実行するため、SKLearnクラスを利用します。

以下のディレクトリ構成でEstimatorクラスを作成します

src
 ├── hyperparameter_tuning_warm_start.ipynb
 │
 └─── model
       │
       └── trainer.py
from time import gmtime, strftime

from sagemaker.sklearn.estimator import SKLearn

parent_train_job_name = "hpt-parent-training-job" + strftime("%Y%m%d%H%M", gmtime())

parent_estimator_parameters = {
    "entry_point": "trainer.py",
    "source_dir": "model",
    "framework_version": "0.23-1",
    "py_version": "py3",
    "instance_type": "ml.m5.large",
    "instance_count": 1,
    "output_path": output_location,
    "role": role,
    "base_job_name": parent_train_job_name,
}

parent_estimator = SKLearn(**parent_estimator_parameters)

親Tuning Jobを実行

メトリクスの設定

Hyperparameter Tuning で監視するメトリクスを設定します。
メトリクスの定義には、各メトリクスの名前と、正規表現を指定します。
以下の正規表現部分は、学習スクリプトに合わせて設定します。
メトリクスを定義します - Amazon SageMaker

metrics_definitions = [
    {"Name": "train loss", "Regex": "train logloss: ([0-9\\.]+)"},
    {"Name": "train accuracy", "Regex": "train accuracy: ([0-9\\.]+)"},
    {"Name": "validation loss", "Regex": "validation logloss: ([0-9\\.]+)"},
    {"Name": "validation accuracy", "Regex": "validation accuracy: ([0-9\\.]+)"},
]

探索パラメータの設定

探索するハイパーパラメータの範囲を事前に設定します。

カテゴリー・連続値・整数値の3種類のパラメータがサポートされています。

ハイパラは学習スクリプトに渡す引数名と同じ変数名にします。
ハイパーパラメータの範囲を定義する - Amazon SageMaker

from sagemaker.tuner import ContinuousParameter
from sagemaker.tuner import CategoricalParameter

parent_hyperparameter_ranges = {
    "alpha": ContinuousParameter(0.00001, 0.00009, scaling_type="Linear"),
    "penalty": CategoricalParameter(['l1', 'l2']),
    "fit_intercept": CategoricalParameter([True, False]),
}

Turning jobの作成

Estimatorクラス、最適化メトリクス名、探索パラメータの設定、チューニング戦略、early stoppingを設定したTuning Jobを作成します。

以下の設定では、Bayesian最適化戦略で5個の学習ジョブを並列実行し、合計20個の学習ジョブを実行します。

チューニング戦略: ハイパーパラメータ調整の仕組み - Amazon SageMaker

from sagemaker.tuner import HyperparameterTuner

objective_metric_name = "validation loss"
base_parent_tuning_job_name = "sgd-classifier-parent"

tuner_parent = HyperparameterTuner(
    estimator=parent_estimator,
    base_tuning_job_name=base_parent_tuning_job_name,
    objective_type="Minimize",
    objective_metric_name=objective_metric_name,
    hyperparameter_ranges=parent_hyperparameter_ranges,
    metric_definitions=metrics_definitions,
    max_jobs=20,
    max_parallel_jobs=5,
    strategy="Bayesian",
    early_stopping_type="Auto",
)

fitメッドを呼び出すことで、チューニングジョブを実行することができます。

この時、S3の学習データのURIを指定します。

parent_inputs = {"train": s3_train_parent_data, "validation": s3_validation_parent_data}
parent_job_name = "hpt-parent-" + strftime("%Y%m%d%H%M", gmtime())

tuner_parent.fit(
    inputs=parent_inputs,
    job_name=parent_job_name,
    wait=False,
)

コンソール画面からチューニングジョブが実行されていることを確認できます。

親Tuning Job

チューニングジョブに紐づいて、20個の学習ジョブが実行されています。

親Tuning Job学習ジョブ

学習ジョブの一つの実行ログを確認すると、学習スクリプトの実行引数にハイパーパラメータが渡されていることがわかります

実行ログ

子Tuning Jobを実行

Warm Start 設定

warm startを行うためにWarmStartConfigを設定します。

parentsに親チューニングジョブ名を設定し、warm_start_typeにはWarmStartTypes.TRANSFER_LEARNINGを設定します。

WarmStartTypesは2つのタイプがサポートされていて、以下の違いがあります。

  • IDENTICAL_DATA_AND_ALGORITHM: 子チューニングジョブは親チューニングジョブと同じ入力データ・学習アルゴリズムを利用
  • TRANSFER_LEARNING: 子チューニングジョブは親チューニングジョブと異なる入力データ・学習アルゴリズムを利用

今回は、新しいデータで学習を行うため、TRANSFER_LEARNINGを利用します。

transfer learnigと命名されていますが、モデルの学習済みパラメータを前回のチューニングジョブから引き継ぐわけではないので注意してください。

from sagemaker.tuner import WarmStartConfig
from sagemaker.tuner import WarmStartTypes

warm_start_config = WarmStartConfig(
    warm_start_type=WarmStartTypes.TRANSFER_LEARNING, parents={parent_tuning_job_name}
)

探索パラメータの設定

子チューニングジョブの探索パラメータの設定を用意します。

今回はalphaの範囲を親ジョブより広げた設定にします

child_hyperparameter_ranges = {  
    "alpha": ContinuousParameter(0.00001, 0.00025, scaling_type="Linear"),
    "penalty": CategoricalParameter(['l1', 'l2']),
    "fit_intercept": CategoricalParameter([True, False]),
}

Turning Jopの作成

warm_start_configの設定を渡すことで、warm startを利用したチューニングジョブを実行することができます。

objective_metric_name = "validation loss"
base_child_tuning_job_name = "sgd-classifier-child"

tuner_child = HyperparameterTuner(
    estimator=child_estimator,
    base_tuning_job_name=base_child_tuning_job_name,
    objective_type="Minimize",
    objective_metric_name=objective_metric_name,
    hyperparameter_ranges=child_hyperparameter_ranges,
    metric_definitions=metrics_definitions,
    max_jobs=20,
    max_parallel_jobs=1,
    strategy="Bayesian",
    early_stopping_type="Auto",
    warm_start_config=warm_start_config,
)
child_inputs = {"train": s3_train_child_data, "validation": s3_validation_child_data}
child_job_name = "hpt-child-" + strftime("%Y%m%d%H%M", gmtime())

tuner_child.fit(
    inputs=child_inputs,
    job_name=child_job_name,
    wait=False,
)

コンソール画面から子チューニングジョブが実行されていることを確認できます

子Tuning Job

結果の確認

warm startによるハイパラチューニングの結果を確認します。

boto3 のsagemaker client のdescribe_hyper_parameter_tuning_jobでTunig jobの詳細を見ることができます。

ベストパラメータを取得したい場合はこちらの方法が簡単に取得できます。

tuning_job_result = sm.describe_hyper_parameter_tuning_job(
    HyperParameterTuningJobName=child_job_name
)
print(tuning_job_result['BestTrainingJob'])
# {'alpha': '2.3900957827979537e-05', 'fit_intercept': '"True"', 'penalty': '"l2"'}

Tuning jobで実行された学習ジョブを一覧で見る場合、sagemaker sdk のHyperparameterTuningJobAnalyticsを利用します。

データフレーム形式で全ての学習ジョブの内容を表示させます。

from sagemaker.analytics import HyperparameterTuningJobAnalytics

child_results = HyperparameterTuningJobAnalytics(sagemaker_session=sess, hyperparameter_tuning_job_name=child_job_name)
df_child = child_results.dataframe()
df_child

dataframe result

以下のグラフは、親tuning job(青)と子tuning job(赤)の最適化指標がチューニングされている結果を表しています。

検証データが異なるので大小比較に意味はありませんが、子tuning jobの最適化指標が小さい値からチューニングが始まっていて、warm startできていることがわかります。

validation logloss vs timestamp

以下は、子チューニングジョブの各ハイパラとvalidation lossの関係をプロットしたグラフです。

Bayesian最適化戦略で、validation loss が小さい区間での探索が多く行われていることが確認できます。

まとめ

SageMaker HypeparameterTuning を使用してwarm startでハイパラ探索を実行しました。使う利点としては並列で学習を実行できるため、学習に時間がかかるようなモデルに対しては恩恵が受けられると思います。

探索する学習ジョブ数分、インスタンスが立ち上がるため、課金には十分注意してください。軽い気持ちで使用したら結構な額が課金されていて肝を冷やしました。

計算コストが小さいモデルでしたら、OptunaやGridSearchなどによるハイパラ探索を学習スクリプト中に実装する方がコスパいいなと思います。

参考