肉球でキーボード

MLエンジニアの技術ブログです

LocalStackのS3環境を利用したAWS Glue Jobローカル実行・テスト方法

Glue Jobのローカル開発

AWS Glue Jobをローカル環境で開発する際、AWS公式が提供してるDocker imageを活用する方法があります。

Developing and testing AWS Glue job scripts locally

Glue Jobを利用する場合、S3からデータを取得・保存するユースケースが多いかと思います。

本記事では、ローカル環境にAWS環境をエミュレートするLocalStackを活用して、実際のAWSリソースへのデータをやり取りを行わずGlue Jobの動作検証・テストを行う方法を書きます。
Overview | Docs

Glue version 4.0のdocker imageであるamazon/aws-glue-libs:glue_libs_4.0.0_image_01 を使用します。

Glue versionごとにdocker imageが異なるので、ご注意ください。

本文中コード github.com

ディレクトリ構成

/
├─ src
|  └─ glue_job.py
├─ tests
│  └─ test_glue_job.py
└─ compose.yaml

Glue Jobの実行スクリプト

import sys
from typing import Dict

from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions
from awsglue.dynamicframe import DynamicFrame
from pyspark.context import SparkContext

S3_ENDPOINT_URL = "http://s3.dev:4566"
AWS_REGION = "ap-northeast-1"
S3_BUCKET = "test-job-bucket"

def get_dynamic_frame_from_s3(glue_context: GlueContext, source_s3_path: str) -> DynamicFrame:
    dyf = glue_context.create_dynamic_frame.from_options(
        format_options={
            "quoteChar": '"',
            "withHeader": True,
            "separator": ",",
        },
        connection_type="s3",
        format="csv",
        connection_options={
            "paths": [source_s3_path],
            "recurse": True,
        },
    )
    return dyf

def write_dynamic_frame_to_s3(glue_context: GlueContext, dyf: DynamicFrame, destination_s3_path: str) -> None:
    glue_context.write_dynamic_frame.from_options(
        frame=dyf,
        connection_type="s3",
        connection_options={"path": destination_s3_path},
        format="parquet",
        format_options={"writeHeader": True},
    )

def main(args: Dict[str, str]) -> None:
    sc = SparkContext()
    if args["JOB_NAME"] == "test":
        sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", S3_ENDPOINT_URL)
        sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint.region", AWS_REGION)
        sc._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
    glue_context = GlueContext(sc)

    job = Job(glue_context)
    job.init(args["JOB_NAME"], args)

    dyf = get_dynamic_frame_from_s3(glue_context=glue_context, source_s3_path=f"s3://{S3_BUCKET}/test_data.csv")
    write_dynamic_frame_to_s3(glue_context=glue_context, dyf=dyf, destination_s3_path=f"s3://{S3_BUCKET}/output")

    job.commit()

if __name__ == "__main__":
    args = getResolvedOptions(sys.argv, ["JOB_NAME"])
    main(args)

以下の処理を実行するGlue Jobのスクリプトを用意しました

  • S3からcsvデータを取得
  • S3にparquet形式のデータを保存

SparkContextにLocalStackでエミュレートしたS3にアクセスする設定を追加しています。

本番環境のGlueJobでは実際のAWSリソースにアクセスするため、以下の設定はローカル開発時のみ追加する必要があります。

実行引数のJOB_NAMEがtestの場合は、LocalStackへアクセスする設定を追加することでリソースの使い分けを行っています。

sc = SparkContext()
if args["JOB_NAME"] == "test":
    sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", S3_ENDPOINT_URL)
    sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint.region", AWS_REGION)
    sc._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
glue_context = GlueContext(sc)

docker composeの設定

services:
  glue.dev.s3.local:
    container_name: s3.dev
    image: localstack/localstack:3.8.0
    environment:
      - SERVICES=s3
      - AWS_DEFAULT_REGION=ap-northeast-1
      - AWS_DEFAULT_OUTPUT=json
      - AWS_ACCESS_KEY_ID=test
      - AWS_SECRET_ACCESS_KEY=test
    networks:
      - glue.dev.network
  glue.dev:
    container_name: glue.dev
    image: amazon/aws-glue-libs:glue_libs_4.0.0_image_01
    volumes:
      - ./:/home/glue_user/workspace/
    environment:
      - DISABLE_SSL=true
      - AWS_REGION=ap-northeast-1
      - AWS_OUTPUT=json
      - AWS_ACCESS_KEY_ID=test
      - AWS_SECRET_ACCESS_KEY=test
    networks:
      - glue.dev.network
    tty: true
    stdin_open: true
networks:
  glue.dev.network:
    name: glue.dev.network

LocalStackでエミュレートしたAWS環境にGlue Jobのコンテナがアクセスできるように、Glue Jobのコンテナの環境変数に、LocalStackの起動設定で指定したAWS_ACCESS_KEY_IDとAWS_SECRET_ACCESS_KEYを追加します。

compose.yamlはこちらの実装を参考にさせてもらいました。
GitHub - n-yokota/aws_glue_test_concept

docker containerを起動します。

$ docker compose up -d

Glue Jobをローカル環境で実行する

LocalStackのS3 bucket準備

Glue Jobのコンテナ環境に入ります。

$ docker compose exec glue.dev bash 

LocalStackのS3に test-job-bucket Bucketを作成します。

$ aws s3 mb s3://test-job-bucket --endpoint-url http://s3.dev:4566

テスト用ファイルをLocalStackのS3 Bucketに追加します。

$ aws s3 mv ./data/test_data.csv s3://test-job-bucket/test_data.csv --endpoint-url http://s3.dev:4566

S3 Bucketにテスト用ファイルが保存されていることを確認できます。

$ aws s3api list-objects-v2 --bucket test-job-bucket  --endpoint-url http://s3.dev:4566
{
    "Contents": [
        {
            "LastModified": "2024-10-08T14:31:52.000Z",
            "ETag": "\"19ee3f2027cea3841e74c3aa3520b5ed\"",
            "StorageClass": "STANDARD",
            "Key": "test_data.csv",
            "Size": 100
        }
    ]
}

コンテナ環境でGlue Job実行

Glue Jobのスクリプトを通常のpyhonスクリプトとして実行します。

$ python3 src/glue_job.py --JOB_NAME test

対象のS3 Bucketにparquet形式でファイルが保存されていることを確認できます。

$ aws s3api list-objects-v2 --bucket test-job-bucket  --endpoint-url http://s3.dev:4566
{
    "Contents": [
        {
            "LastModified": "2024-10-08T14:32:23.000Z",
            "ETag": "\"fa768a3a4c9659604c161e45a17ec02f\"",
            "StorageClass": "STANDARD",
            "Key": "output/part-00000-3479d3db-5a89-4bd7-856c-fd714291c2f3-c000.snappy.parquet",
            "Size": 981
        },
        {
            "LastModified": "2024-10-08T14:31:52.000Z",
            "ETag": "\"19ee3f2027cea3841e74c3aa3520b5ed\"",
            "StorageClass": "STANDARD",
            "Key": "test_data.csv",
            "Size": 100
        }
    ]
}

LocalStackのS3を使用したGlue Jobのテスト実行方法

テスト用GlueContextのfixture作成

@pytest.fixture(scope="session")
def glue_context() -> GlueContext:
    spark = (
        SparkSession.builder.master("local[1]")
        # Configure for testing fast
        # https://kakehashi-dev.hatenablog.com/entry/2023/07/13/110000
        .config("spark.sql.shuffle.partitions", "1")
        .config("spark.ui.showConsoleProgress", "false")
        .config("spark.ui.enabled", "false")
        .config("spark.ui.dagGraph.retainedRootRDD", "1")
        .config("spark.ui.retainedJobs", "1")
        .config("spark.ui.retainedStages", "1")
        .config("spark.ui.retainedTasks", "1")
        .config("spark.sql. ui.retainedExecutions", "1")
        .config("spark.worker.ui.retainedExecutors", "1")
        .config("spark.worker.ui.retainedDrivers", "1")
        .getOrCreate()
    )
    # Configuration for localstack
    # https://future-architect.github.io/articles/20220428a
    spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.endpoint", S3_ENDPOINT_URL)
    spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.endpoint.region", AWS_REGION)
    spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
    spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.change.detection.mode", "None")
    spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.change.detection.version.required", "false")

    yield GlueContext(spark.sparkContext)
    spark.stop()

テスト実行速度を高速化するためにspark設定はこちらの記事を参考にしてます.
AWS GlueのCI/CD環境を作ってみた - KAKEHASHI Tech Blog

pytest実行時にLocalStackのS3へアクセスするための設定はこちらの記事を参考にしてます.
AWS Glueの開発環境の構築(2022) | フューチャー技術ブログ

テスト用S3 Bucketのfixture作成

@pytest.fixture(scope="session")
def s3_client():
    return boto3.client(
        "s3",
        endpoint_url=S3_ENDPOINT_URL,
        aws_access_key_id=AWS_ACCESS_KEY_ID,
        aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
        region_name=AWS_REGION,
    )

@pytest.fixture(scope="session")
def s3_bucket(s3_client: boto3.client) -> str:
    bucket_name = "test-s3-bucket"

    try:
        s3_client.head_bucket(Bucket=bucket_name)
    except Exception:
        s3_client.create_bucket(
            Bucket=bucket_name,
            CreateBucketConfiguration={"LocationConstraint": AWS_REGION},
        )

    yield bucket_name

    try:
        s3_client.delete_bucket(Bucket=bucket_name)
    except Exception as e:
        print(f"Failed to clean up test bucket: {e}")

@pytest.fixture(scope="session")
def setup_s3_data(s3_client: boto3.client, s3_bucket: str) -> dict[str, str]:
    key = "test_data.csv"
    inputs = [
        {"col1": "val1", "col2": 1, "col3": "2000/01/01 01:00:00"},
        {"col1": "val2", "col2": 2, "col3": "2000/01/02 02:00:00"},
        {"col1": "val3", "col2": 3, "col3": "2000/01/03 03:00:00"},
    ]
    input_str = io.StringIO()
    w = csv.DictWriter(input_str, fieldnames=inputs[0].keys())
    w.writeheader()
    for input in inputs:
        w.writerow(input)

    body = input_str.getvalue()
    s3_client.put_object(Bucket=s3_bucket, Key=key, Body=body)

    yield {"bucket_name": s3_bucket, "key": key}

    try:
        s3_client.delete_object(Bucket=s3_bucket, Key=key)
    except Exception as e:
        print(f"Failed to clean up test data: {e}")

# https://docs.pytest.org/en/6.2.x/fixture.html#factories-as-fixtures
@pytest.fixture
def get_s3_objects(s3_client):
    def _get_s3_objects(s3_bucket: str, prefix: str) -> list[str] | None:
        try:
            response = s3_client.list_objects_v2(Bucket=s3_bucket, Prefix=prefix)
            if "Contents" in response:
                return [obj["Key"] for obj in response["Contents"]]
        except Exception:
            return

    return _get_s3_objects
    

@pytest.fixture(scope="module")
def sample_dynamicframe(glue_context: GlueContext) -> DynamicFrame:
    spark = glue_context.spark_session
    df = spark.createDataFrame(
        [
            ("val1", 1, "2000/01/01 01:00:00"),
            ("val2", 2, "2000/01/02 02:00:00"),
            ("val3", 3, "2000/01/03 03:00:00"),
        ],
        ["col1", "col2", "col3"],
    )
    dyf = DynamicFrame.fromDF(df, glue_context, "dyf")

    return dyf

各関数の役割

  • s3_client

    LocalStackのS3環境にアクセスするboto3 clientのfixture

  • s3_bucket

    テスト用のS3 Bucketを作成・削除するfixture

  • setup_s3_data

    テスト用のS3 Bucketにデータを追加・削除するfixture

  • get_s3_objects

    テスト用のS3 Bucketに特定のキーに含まれるオブジェクト一覧を取得するヘルパー関数

  • sample_dynamicframe

    テスト用DynamicFrameデータを作成するfixture

S3アクセスを伴う関数のテスト

def test_get_dynamic_frame_from_s3(glue_context: GlueContext, setup_s3_data: dict[str, str]) -> None:
    source_s3_path = f"s3://{setup_s3_data['bucket_name']}/{setup_s3_data['key']}"
    result = get_dynamic_frame_from_s3(glue_context=glue_context, source_s3_path=source_s3_path)

    assert isinstance(result, DynamicFrame)
    assert result.count() == 3

    df = result.toDF()
    assert len(df.columns) == 3
    assert df.columns == ["col1", "col2", "col3"]

    rows = df.collect()
    assert rows == [
        Row(col1="val1", col2="1", col3="2000/01/01 01:00:00"),
        Row(col1="val2", col2="2", col3="2000/01/02 02:00:00"),
        Row(col1="val3", col2="3", col3="2000/01/03 03:00:00"),
    ]

def test_write_dynamic_frame_from_s3(
    glue_context: GlueContext,
    s3_bucket,
    sample_dynamicframe: DynamicFrame,
    get_s3_objects,
) -> None:
    file_key = "test_write_data"
    destination_s3_path = f"s3://{s3_bucket}/{file_key}"
    write_dynamic_frame_to_s3(
        glue_context=glue_context,
        dyf=sample_dynamicframe,
        destination_s3_path=destination_s3_path,
    )
    actual_s3_objects = get_s3_objects(s3_bucket=s3_bucket, prefix=file_key)

    assert len(actual_s3_objects) > 0
    assert any([object for object in actual_s3_objects if object.endswith(".parquet")])

LocalStackのS3 Bucketを使用して、S3とデータのやり取りを行う関数をテストします。

Glue Jobのコンテナ内でpytestコマンドを実行します。

$ pytest tests

参考