Glue Jobのローカル開発
AWS Glue Jobをローカル環境で開発する際、AWS公式が提供してるDocker imageを活用する方法があります。
Developing and testing AWS Glue job scripts locally
Glue Jobを利用する場合、S3からデータを取得・保存するユースケースが多いかと思います。
本記事では、ローカル環境にAWS環境をエミュレートするLocalStackを活用して、実際のAWSリソースへのデータをやり取りを行わずGlue Jobの動作検証・テストを行う方法を書きます。
Overview | Docs
Glue version 4.0のdocker imageであるamazon/aws-glue-libs:glue_libs_4.0.0_image_01
を使用します。
Glue versionごとにdocker imageが異なるので、ご注意ください。
本文中コード github.com
ディレクトリ構成
/ ├─ src | └─ glue_job.py ├─ tests │ └─ test_glue_job.py └─ compose.yaml
Glue Jobの実行スクリプト
import sys from typing import Dict from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions from awsglue.dynamicframe import DynamicFrame from pyspark.context import SparkContext S3_ENDPOINT_URL = "http://s3.dev:4566" AWS_REGION = "ap-northeast-1" S3_BUCKET = "test-job-bucket" def get_dynamic_frame_from_s3(glue_context: GlueContext, source_s3_path: str) -> DynamicFrame: dyf = glue_context.create_dynamic_frame.from_options( format_options={ "quoteChar": '"', "withHeader": True, "separator": ",", }, connection_type="s3", format="csv", connection_options={ "paths": [source_s3_path], "recurse": True, }, ) return dyf def write_dynamic_frame_to_s3(glue_context: GlueContext, dyf: DynamicFrame, destination_s3_path: str) -> None: glue_context.write_dynamic_frame.from_options( frame=dyf, connection_type="s3", connection_options={"path": destination_s3_path}, format="parquet", format_options={"writeHeader": True}, ) def main(args: Dict[str, str]) -> None: sc = SparkContext() if args["JOB_NAME"] == "test": sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", S3_ENDPOINT_URL) sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint.region", AWS_REGION) sc._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true") glue_context = GlueContext(sc) job = Job(glue_context) job.init(args["JOB_NAME"], args) dyf = get_dynamic_frame_from_s3(glue_context=glue_context, source_s3_path=f"s3://{S3_BUCKET}/test_data.csv") write_dynamic_frame_to_s3(glue_context=glue_context, dyf=dyf, destination_s3_path=f"s3://{S3_BUCKET}/output") job.commit() if __name__ == "__main__": args = getResolvedOptions(sys.argv, ["JOB_NAME"]) main(args)
以下の処理を実行するGlue Jobのスクリプトを用意しました
- S3からcsvデータを取得
- S3にparquet形式のデータを保存
SparkContextにLocalStackでエミュレートしたS3にアクセスする設定を追加しています。
本番環境のGlueJobでは実際のAWSリソースにアクセスするため、以下の設定はローカル開発時のみ追加する必要があります。
実行引数のJOB_NAMEがtestの場合は、LocalStackへアクセスする設定を追加することでリソースの使い分けを行っています。
sc = SparkContext() if args["JOB_NAME"] == "test": sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", S3_ENDPOINT_URL) sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint.region", AWS_REGION) sc._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true") glue_context = GlueContext(sc)
docker composeの設定
services: glue.dev.s3.local: container_name: s3.dev image: localstack/localstack:3.8.0 environment: - SERVICES=s3 - AWS_DEFAULT_REGION=ap-northeast-1 - AWS_DEFAULT_OUTPUT=json - AWS_ACCESS_KEY_ID=test - AWS_SECRET_ACCESS_KEY=test networks: - glue.dev.network glue.dev: container_name: glue.dev image: amazon/aws-glue-libs:glue_libs_4.0.0_image_01 volumes: - ./:/home/glue_user/workspace/ environment: - DISABLE_SSL=true - AWS_REGION=ap-northeast-1 - AWS_OUTPUT=json - AWS_ACCESS_KEY_ID=test - AWS_SECRET_ACCESS_KEY=test networks: - glue.dev.network tty: true stdin_open: true networks: glue.dev.network: name: glue.dev.network
LocalStackでエミュレートしたAWS環境にGlue Jobのコンテナがアクセスできるように、Glue Jobのコンテナの環境変数に、LocalStackの起動設定で指定したAWS_ACCESS_KEY_IDとAWS_SECRET_ACCESS_KEYを追加します。
compose.yamlはこちらの実装を参考にさせてもらいました。
GitHub - n-yokota/aws_glue_test_concept
docker containerを起動します。
$ docker compose up -d
Glue Jobをローカル環境で実行する
LocalStackのS3 bucket準備
Glue Jobのコンテナ環境に入ります。
$ docker compose exec glue.dev bash
LocalStackのS3に test-job-bucket
Bucketを作成します。
$ aws s3 mb s3://test-job-bucket --endpoint-url http://s3.dev:4566
テスト用ファイルをLocalStackのS3 Bucketに追加します。
$ aws s3 mv ./data/test_data.csv s3://test-job-bucket/test_data.csv --endpoint-url http://s3.dev:4566
S3 Bucketにテスト用ファイルが保存されていることを確認できます。
$ aws s3api list-objects-v2 --bucket test-job-bucket --endpoint-url http://s3.dev:4566 { "Contents": [ { "LastModified": "2024-10-08T14:31:52.000Z", "ETag": "\"19ee3f2027cea3841e74c3aa3520b5ed\"", "StorageClass": "STANDARD", "Key": "test_data.csv", "Size": 100 } ] }
コンテナ環境でGlue Job実行
Glue Jobのスクリプトを通常のpyhonスクリプトとして実行します。
$ python3 src/glue_job.py --JOB_NAME test
対象のS3 Bucketにparquet形式でファイルが保存されていることを確認できます。
$ aws s3api list-objects-v2 --bucket test-job-bucket --endpoint-url http://s3.dev:4566 { "Contents": [ { "LastModified": "2024-10-08T14:32:23.000Z", "ETag": "\"fa768a3a4c9659604c161e45a17ec02f\"", "StorageClass": "STANDARD", "Key": "output/part-00000-3479d3db-5a89-4bd7-856c-fd714291c2f3-c000.snappy.parquet", "Size": 981 }, { "LastModified": "2024-10-08T14:31:52.000Z", "ETag": "\"19ee3f2027cea3841e74c3aa3520b5ed\"", "StorageClass": "STANDARD", "Key": "test_data.csv", "Size": 100 } ] }
LocalStackのS3を使用したGlue Jobのテスト実行方法
テスト用GlueContextのfixture作成
@pytest.fixture(scope="session") def glue_context() -> GlueContext: spark = ( SparkSession.builder.master("local[1]") # Configure for testing fast # https://kakehashi-dev.hatenablog.com/entry/2023/07/13/110000 .config("spark.sql.shuffle.partitions", "1") .config("spark.ui.showConsoleProgress", "false") .config("spark.ui.enabled", "false") .config("spark.ui.dagGraph.retainedRootRDD", "1") .config("spark.ui.retainedJobs", "1") .config("spark.ui.retainedStages", "1") .config("spark.ui.retainedTasks", "1") .config("spark.sql. ui.retainedExecutions", "1") .config("spark.worker.ui.retainedExecutors", "1") .config("spark.worker.ui.retainedDrivers", "1") .getOrCreate() ) # Configuration for localstack # https://future-architect.github.io/articles/20220428a spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.endpoint", S3_ENDPOINT_URL) spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.endpoint.region", AWS_REGION) spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true") spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.change.detection.mode", "None") spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.change.detection.version.required", "false") yield GlueContext(spark.sparkContext) spark.stop()
テスト実行速度を高速化するためにspark設定はこちらの記事を参考にしてます.
AWS GlueのCI/CD環境を作ってみた - KAKEHASHI Tech Blog
pytest実行時にLocalStackのS3へアクセスするための設定はこちらの記事を参考にしてます.
AWS Glueの開発環境の構築(2022) | フューチャー技術ブログ
テスト用S3 Bucketのfixture作成
@pytest.fixture(scope="session") def s3_client(): return boto3.client( "s3", endpoint_url=S3_ENDPOINT_URL, aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY, region_name=AWS_REGION, ) @pytest.fixture(scope="session") def s3_bucket(s3_client: boto3.client) -> str: bucket_name = "test-s3-bucket" try: s3_client.head_bucket(Bucket=bucket_name) except Exception: s3_client.create_bucket( Bucket=bucket_name, CreateBucketConfiguration={"LocationConstraint": AWS_REGION}, ) yield bucket_name try: s3_client.delete_bucket(Bucket=bucket_name) except Exception as e: print(f"Failed to clean up test bucket: {e}") @pytest.fixture(scope="session") def setup_s3_data(s3_client: boto3.client, s3_bucket: str) -> dict[str, str]: key = "test_data.csv" inputs = [ {"col1": "val1", "col2": 1, "col3": "2000/01/01 01:00:00"}, {"col1": "val2", "col2": 2, "col3": "2000/01/02 02:00:00"}, {"col1": "val3", "col2": 3, "col3": "2000/01/03 03:00:00"}, ] input_str = io.StringIO() w = csv.DictWriter(input_str, fieldnames=inputs[0].keys()) w.writeheader() for input in inputs: w.writerow(input) body = input_str.getvalue() s3_client.put_object(Bucket=s3_bucket, Key=key, Body=body) yield {"bucket_name": s3_bucket, "key": key} try: s3_client.delete_object(Bucket=s3_bucket, Key=key) except Exception as e: print(f"Failed to clean up test data: {e}") # https://docs.pytest.org/en/6.2.x/fixture.html#factories-as-fixtures @pytest.fixture def get_s3_objects(s3_client): def _get_s3_objects(s3_bucket: str, prefix: str) -> list[str] | None: try: response = s3_client.list_objects_v2(Bucket=s3_bucket, Prefix=prefix) if "Contents" in response: return [obj["Key"] for obj in response["Contents"]] except Exception: return return _get_s3_objects @pytest.fixture(scope="module") def sample_dynamicframe(glue_context: GlueContext) -> DynamicFrame: spark = glue_context.spark_session df = spark.createDataFrame( [ ("val1", 1, "2000/01/01 01:00:00"), ("val2", 2, "2000/01/02 02:00:00"), ("val3", 3, "2000/01/03 03:00:00"), ], ["col1", "col2", "col3"], ) dyf = DynamicFrame.fromDF(df, glue_context, "dyf") return dyf
各関数の役割
s3_client
LocalStackのS3環境にアクセスするboto3 clientのfixture
s3_bucket
テスト用のS3 Bucketを作成・削除するfixture
setup_s3_data
テスト用のS3 Bucketにデータを追加・削除するfixture
get_s3_objects
テスト用のS3 Bucketに特定のキーに含まれるオブジェクト一覧を取得するヘルパー関数
sample_dynamicframe
テスト用DynamicFrameデータを作成するfixture
S3アクセスを伴う関数のテスト
def test_get_dynamic_frame_from_s3(glue_context: GlueContext, setup_s3_data: dict[str, str]) -> None: source_s3_path = f"s3://{setup_s3_data['bucket_name']}/{setup_s3_data['key']}" result = get_dynamic_frame_from_s3(glue_context=glue_context, source_s3_path=source_s3_path) assert isinstance(result, DynamicFrame) assert result.count() == 3 df = result.toDF() assert len(df.columns) == 3 assert df.columns == ["col1", "col2", "col3"] rows = df.collect() assert rows == [ Row(col1="val1", col2="1", col3="2000/01/01 01:00:00"), Row(col1="val2", col2="2", col3="2000/01/02 02:00:00"), Row(col1="val3", col2="3", col3="2000/01/03 03:00:00"), ] def test_write_dynamic_frame_from_s3( glue_context: GlueContext, s3_bucket, sample_dynamicframe: DynamicFrame, get_s3_objects, ) -> None: file_key = "test_write_data" destination_s3_path = f"s3://{s3_bucket}/{file_key}" write_dynamic_frame_to_s3( glue_context=glue_context, dyf=sample_dynamicframe, destination_s3_path=destination_s3_path, ) actual_s3_objects = get_s3_objects(s3_bucket=s3_bucket, prefix=file_key) assert len(actual_s3_objects) > 0 assert any([object for object in actual_s3_objects if object.endswith(".parquet")])
LocalStackのS3 Bucketを使用して、S3とデータのやり取りを行う関数をテストします。
Glue Jobのコンテナ内でpytestコマンドを実行します。
$ pytest tests