本記事は datatech-jp Advent Calendar 2024 の13日目の記事です。
はじめに
Glue Catalogに登録したiceberg tableにAWS GlueJobでデータの書き込みを行うケースは多いかと思います。
通常、Glue Catalogのスキーマに登録されていないカラムを持つデータをGlue Jobで書き込もうとするとエラーとなります。
そのため、新しいカラムを持つデータを書き込む前に alter table add columns
をAthenaなどで実行し、iceberg tableのスキーマ変更を行う必要があります。
ALTER TABLE ADD COLUMNS - Amazon Athena
Glue Jobの処理で新しくカラムを追加した場合や、元データにカラムが追加された場合に毎回上記の対応を行うのは運用コストが高く、操作ミスの危険もあります。
本記事では、扱うデータのカラムに応じて動的にiceberg tableのカラムを追加する方法をまとめます。
Glue Job実行のためのAWSリソース準備
AWS GlueJobを実行するための、S3 Bucket・IAM Role・Glue Database・Glue Jobを作成するTerraformを作成しました。
$ cd terraform $ terraform init $ terraform apply
AWS Glue jobでiceberg tableに対する操作を有効化する場合、Glue JobのSpark Configurationを適切に設定する必要があります。
AWS Glue での Iceberg フレームワークの使用 - AWS Glue.
terraformのコードからglue jobのリソースを作成する部分を抜粋すると、以下のようになります。
resource "aws_glue_job" "update_iceberg_table_schema" { name = "update_iceberg_table_schema" role_arn = aws_iam_role.glue_job_role.arn glue_version = "4.0" worker_type = "G.1X" number_of_workers = 2 max_retries = 0 execution_property { max_concurrent_runs = 10 } command { script_location = "s3://${aws_s3_bucket.glue_job.bucket}/scripts/update_iceberg_table_schema.py" } default_arguments = { "--enable-glue-datacatalog" = "true" "--TempDir" = "s3://${aws_s3_bucket.glue_job.bucket}/temporary/" "--spark-event-logs-path" = "s3://${aws_s3_bucket.glue_job.bucket}/sparkHistoryLogs/" "--enable-job-insights" = "false" "--enable-continuous-cloudwatch-log" = "true" "--datalake-formats" = "iceberg" # conf to enable iceberg format. ref: https://docs.aws.amazon.com/ja_jp/glue/latest/dg/aws-glue-programming-etl-format-iceberg.html "--conf" = "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.catalog.glue_catalog=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.glue_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog --conf spark.sql.catalog.glue_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO --conf spark.sql.catalog.glue_catalog.warehouse=file:///tmp/spark-warehouse" }
Glue Jobで取り込んでicebergテーブルに書き込むためのサンプルデータを2つ用意しました。
https://github.com/nsakki55/code-for-blogpost/tree/main/glue_job_iceberg_schema_change/data
test_data.csv
col1 | col2 |
---|---|
aaa | 1 |
bbb | 2 |
ccc | 3 |
test_data_new_column.csv
col2 | col1 | col3 |
---|---|---|
444 | ddd | XXX |
555 | eee | YYY |
666 | fff | ZZZ |
2つのテストデータの差は以下です
- col1, col2のカラムの順序が異なる
- col3という新しいカラムが追加されている
今回はS3 Bucketに2つのサンプルデータを配置して、Glue Jobで読み込む元データとします。
アップロード先のS3 Bucket名をterraform applyで生成したS3 Bucket名に変更して下さい。
$ aws s3 cp ./data/test_data.csv s3://schema-change-data-20241208085330834400000002/input/ $ aws s3 cp ./data/test_data_new_column.csv s3://schema-change-data-20241208085330834400000002/input/
icebergテーブルにデータを追加するGlueJobの作成
S3からcsvデータを読み込んだ後、以下の処理を行うGlue Jobを作成しました。
- テーブルが存在しない場合
- テーブルを作成
- テーブルが存在する場合
- テーブルにデータを追加
import sys from typing import Dict from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions from awsglue.dynamicframe import DynamicFrame from pyspark.context import SparkContext S3_BUCKET = "schema-change-data-20241208085330834400000002" TABLE_NAME = "test_table" DATABASE_NAME = "test_database" CATALOG_NAME = "glue_catalog" def get_dynamic_frame_from_s3(glue_context: GlueContext, source_s3_path: str) -> DynamicFrame: print(f"Start get dynamic frame from S3. {source_s3_path=}") dyf = glue_context.create_dynamic_frame.from_options( format_options={ "withHeader": True, "separator": ",", }, connection_type="s3", format="csv", connection_options={ "paths": [source_s3_path], }, ) print(f"Finished get dynamic frame from S3. {dyf.count()=}") return dyf def check_table_in_database(glue_context: GlueContext, database_name: str, table_name: str) -> bool: print(f"Start check table in database. {database_name=}, {table_name=}") tables = glue_context.spark_session.catalog.listTables(database_name) is_exist = table_name in [table.name for table in tables] print(f"Finished check table in database. {is_exist=}") return is_exist def main(args: Dict[str, str]) -> None: sc = SparkContext() glue_context = GlueContext(sc) job = Job(glue_context) job.init(args["JOB_NAME"], args) print(f"Start update iceberg table schema. {args=}") dyf = get_dynamic_frame_from_s3( glue_context=glue_context, source_s3_path=f"s3://{S3_BUCKET}/input/{args['file_name']}", ) df = dyf.toDF() df.printSchema() is_exist = check_table_in_database(glue_context=glue_context, database_name=DATABASE_NAME, table_name=TABLE_NAME) table_path = f"{CATALOG_NAME}.{DATABASE_NAME}.{TABLE_NAME}" if is_exist: df.writeTo(table_path).append() else: df.writeTo(table_path).tableProperty("format-version", "2").tableProperty("location", f"s3://{S3_BUCKET}/output").create() print(f"Finished update iceberg table schema. {args=}") job.commit() if __name__ == "__main__": args = getResolvedOptions(sys.argv, ["JOB_NAME", "file_name"]) main(args)
Glue Jobの実行スクリプトをS3にアップロードします。S3 Bucketをterraform applyで作成したS3 Bucket名に変更して下さい。
$ aws s3 cp ./src/update_iceberg_table_schema.py s3://glue-job-20241208085330834300000001/scripts/
1つ目のサンプルデータであるtest_data.csvを読み込むGlue Jobを実行します。
$ aws glue start-job-run \ --job-name update_iceberg_table_schema \ --arguments '{"--file_name": "test_data.csv"}'
Glue Catalogのスキーマを見ると、col1, col2が追加されています
icebergテーブルのスキーマを変更する
icebergでは動的にスキーマ変化できる、mergeSchemaというオプションが提供されています。
Writes - Apache Iceberg™.
mergeSchemaオプションを使用する場合、以下の挙動となります。
新しいカラムがデータソースに存在するが、対象のテーブルにカラムが存在しない場合
→ 新しいカラムが対象のテーブルに追加される。既存のレコードの新しいカラムにはnullが設定される。
データソースにカラムが存在しないが、対象のテーブルにはカラムが存在する場合
→ 新しいレコードを追加・レコードを更新すると、対象のカラムにnullが設定される。
mergeSchemaオプションを使用するには、対象のテーブルのプロパティ設定に 'write.spark.accept-any-schema'='true'
を追加する必要があります。
ALTER TABLE test_database.test_table SET TBLPROPERTIES ( 'write.spark.accept-any-schema'='true' )
上記のプロパティ追加のクエリをAthena経由で実行すると、サポートされていないプロパティエラーがでます。
Unsupported table property key: write.spark.accept-any-schema
Athenaでは変更可能なicebergテーブルのプロパティに制限があります。
Create Iceberg tables - Amazon Athena
そのため、上記のプロパティ変更をAthena経由では行えません。
AWSが出してるicebergテーブルに関する記事で、'write.spark.accept-any-schema'='true'
のプロパティ設定をspark経由で実行してるのを確認できます。
Modernize your legacy databases with AWS data lakes, Part 2: Build a data lake using AWS DMS data on Apache Iceberg | AWS Big Data Blog
本記事ではこの方法に習い、spark経由で必要なプロパティ設定を行います。
既存テーブルへのデータ追加の実装を以下のように変更しました。
sql = f"ALTER TABLE {table_path} SET TBLPROPERTIES ('write.spark.accept-any-schema' = 'true')" glue_context.spark_session.sql(sql) df.writeTo(table_path).option("mergeSchema","true").append()
新しいカラムを持つcsvデータを読み込むGlueJobを実行します
$ aws glue start-job-run \ --job-name update_iceberg_table_schema \ --arguments '{"--file_name": "test_data_new_column.csv"}'
しかし、上記の設定のGlueJobは失敗します。
原因はcol1, col2のカラムの順番がテーブルスキーマと元データで異なるためです。
2024-12-09 01:21:18,419 ERROR [main] glue.ProcessLauncher (Logging.scala:logError(77)): Error from Python:Traceback (most recent call last): File "/tmp/update_iceberg_table_schema.py", line 74, in <module> main(args) File "/tmp/update_iceberg_table_schema.py", line 63, in main df.writeTo(table_path).option("mergeSchema","true").append() File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 1460, in append self._jwriter.append() File "/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__ return_value = get_return_value( File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 196, in deco raise converted from None pyspark.sql.utils.IllegalArgumentException: Cannot write incompatible dataset to table with schema: table { 1: col1: optional string 2: col2: optional string 3: col3: optional string } Provided schema: table { 2: col2: optional string 1: col1: optional string 3: col3: optional string } Problems: * col2 is out of order, before col1
icebergのspark optionには check-ordering
という設定があります。
これは入力スキーマとテーブルスキーマが同じかチェックする設定で、デフォルトではTrueとなります。
icebergのGitHubレポジトリのmergeSchemaに関するissueで議論されているように、スキーマ変更するためにcheck-ordering
をFalseにする必要があります。
Adding new columns (mergeSchema) · Issue #8908 · apache/iceberg · GitHub
GlueJobのリソースを作成するterraform設定に、以下のconfigを追加します。
--conf spark.sql.iceberg.check-ordering=false
GlueJobの変更を反映します
$ terraform apply
新しいカラムを持つcsvデータを読み込むGlue Jobを再び実行します。今度はJobが成功します。
$ aws glue start-job-run \ --job-name update_iceberg_table_schema \ --arguments '{"--file_name": "test_data_new_column.csv"}'
Glue Catalogのスキーマを確認すると、スキーマが変更されcol3が追加されているのを確認できます。
test_tableの中身を確認してみると、既存のレコードには新しく追加されたcol3のカラムにnullが設定されています。
GlueJobから動的にiceberg tableのスキーマを変更できることができるようになりました。