肉球でキーボード

MLエンジニアの技術ブログです

AWS Glueでicebergテーブルのスキーマを動的に変更する


本記事は datatech-jp Advent Calendar 2024 の13日目の記事です。

はじめに

Glue Catalogに登録したiceberg tableにAWS GlueJobでデータの書き込みを行うケースは多いかと思います。

通常、Glue Catalogのスキーマに登録されていないカラムを持つデータをGlue Jobで書き込もうとするとエラーとなります。

そのため、新しいカラムを持つデータを書き込む前に alter table add columns をAthenaなどで実行し、iceberg tableのスキーマ変更を行う必要があります。
ALTER TABLE ADD COLUMNS - Amazon Athena

Glue Jobの処理で新しくカラムを追加した場合や、元データにカラムが追加された場合に毎回上記の対応を行うのは運用コストが高く、操作ミスの危険もあります。

本記事では、扱うデータのカラムに応じて動的にiceberg tableのカラムを追加する方法をまとめます。

Glue Job実行のためのAWSリソース準備

AWS GlueJobを実行するための、S3 Bucket・IAM Role・Glue Database・Glue Jobを作成するTerraformを作成しました。

code-for-blogpost/glue_job_iceberg_schema_change/terraform at main · nsakki55/code-for-blogpost · GitHub

$ cd terraform 
$ terraform init
$ terraform apply

AWS Glue jobでiceberg tableに対する操作を有効化する場合、Glue JobのSpark Configurationを適切に設定する必要があります。
AWS Glue での Iceberg フレームワークの使用 - AWS Glue.

terraformのコードからglue jobのリソースを作成する部分を抜粋すると、以下のようになります。

resource "aws_glue_job" "update_iceberg_table_schema" {
  name              = "update_iceberg_table_schema"
  role_arn          = aws_iam_role.glue_job_role.arn
  glue_version      = "4.0"
  worker_type       = "G.1X"
  number_of_workers = 2
  max_retries       = 0
  execution_property {
    max_concurrent_runs = 10
  }

  command {
    script_location = "s3://${aws_s3_bucket.glue_job.bucket}/scripts/update_iceberg_table_schema.py"
  }

  default_arguments = {
    "--enable-glue-datacatalog"          = "true"
    "--TempDir"                          = "s3://${aws_s3_bucket.glue_job.bucket}/temporary/"
    "--spark-event-logs-path"            = "s3://${aws_s3_bucket.glue_job.bucket}/sparkHistoryLogs/"
    "--enable-job-insights"              = "false"
    "--enable-continuous-cloudwatch-log" = "true"
    "--datalake-formats"                 = "iceberg"
    # conf to enable iceberg format. ref: https://docs.aws.amazon.com/ja_jp/glue/latest/dg/aws-glue-programming-etl-format-iceberg.html
    "--conf" = "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.catalog.glue_catalog=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.glue_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog --conf spark.sql.catalog.glue_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO --conf spark.sql.catalog.glue_catalog.warehouse=file:///tmp/spark-warehouse"
  }

Glue Jobで取り込んでicebergテーブルに書き込むためのサンプルデータを2つ用意しました。

https://github.com/nsakki55/code-for-blogpost/tree/main/glue_job_iceberg_schema_change/data

test_data.csv

col1 col2
aaa 1
bbb 2
ccc 3

test_data_new_column.csv

col2 col1 col3
444 ddd XXX
555 eee YYY
666 fff ZZZ

2つのテストデータの差は以下です

  • col1, col2のカラムの順序が異なる
  • col3という新しいカラムが追加されている

今回はS3 Bucketに2つのサンプルデータを配置して、Glue Jobで読み込む元データとします。

アップロード先のS3 Bucket名をterraform applyで生成したS3 Bucket名に変更して下さい。

$ aws s3 cp ./data/test_data.csv s3://schema-change-data-20241208085330834400000002/input/
$ aws s3 cp ./data/test_data_new_column.csv s3://schema-change-data-20241208085330834400000002/input/

icebergテーブルにデータを追加するGlueJobの作成

S3からcsvデータを読み込んだ後、以下の処理を行うGlue Jobを作成しました。

  • テーブルが存在しない場合
    • テーブルを作成
  • テーブルが存在する場合
    • テーブルにデータを追加
import sys
from typing import Dict

from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions
from awsglue.dynamicframe import DynamicFrame
from pyspark.context import SparkContext

S3_BUCKET = "schema-change-data-20241208085330834400000002"
TABLE_NAME = "test_table"
DATABASE_NAME = "test_database"
CATALOG_NAME = "glue_catalog"

def get_dynamic_frame_from_s3(glue_context: GlueContext, source_s3_path: str) -> DynamicFrame:
    print(f"Start get dynamic frame from S3. {source_s3_path=}")
    dyf = glue_context.create_dynamic_frame.from_options(
        format_options={
            "withHeader": True,
            "separator": ",",
        },
        connection_type="s3",
        format="csv",
        connection_options={
            "paths": [source_s3_path],
        },
    )
    print(f"Finished get dynamic frame from S3. {dyf.count()=}")
    return dyf

def check_table_in_database(glue_context: GlueContext, database_name: str, table_name: str) -> bool:
    print(f"Start check table in database. {database_name=}, {table_name=}")
    tables = glue_context.spark_session.catalog.listTables(database_name)
    is_exist = table_name in [table.name for table in tables]
    print(f"Finished check table in database. {is_exist=}")
    return is_exist

def main(args: Dict[str, str]) -> None:
    sc = SparkContext()
    glue_context = GlueContext(sc)
    job = Job(glue_context)
    job.init(args["JOB_NAME"], args)
    print(f"Start update iceberg table schema. {args=}")

    dyf = get_dynamic_frame_from_s3(
        glue_context=glue_context,
        source_s3_path=f"s3://{S3_BUCKET}/input/{args['file_name']}",
    )
    df = dyf.toDF()
    df.printSchema()

    is_exist = check_table_in_database(glue_context=glue_context, database_name=DATABASE_NAME, table_name=TABLE_NAME)

    table_path = f"{CATALOG_NAME}.{DATABASE_NAME}.{TABLE_NAME}"
    if is_exist:
        df.writeTo(table_path).append()
    else:
        df.writeTo(table_path).tableProperty("format-version", "2").tableProperty("location", f"s3://{S3_BUCKET}/output").create()

    print(f"Finished update iceberg table schema. {args=}")
    job.commit()

if __name__ == "__main__":

    args = getResolvedOptions(sys.argv, ["JOB_NAME", "file_name"])
    main(args)

Glue Jobの実行スクリプトをS3にアップロードします。S3 Bucketをterraform applyで作成したS3 Bucket名に変更して下さい。

$ aws s3 cp ./src/update_iceberg_table_schema.py s3://glue-job-20241208085330834300000001/scripts/ 

1つ目のサンプルデータであるtest_data.csvを読み込むGlue Jobを実行します。

$ aws glue start-job-run \
    --job-name update_iceberg_table_schema \
    --arguments '{"--file_name": "test_data.csv"}'

Glue Catalogのスキーマを見ると、col1, col2が追加されています

glue catalog

icebergテーブルのスキーマを変更する

icebergでは動的にスキーマ変化できる、mergeSchemaというオプションが提供されています。
Writes - Apache Iceberg™.

mergeSchemaオプションを使用する場合、以下の挙動となります。

  • 新しいカラムがデータソースに存在するが、対象のテーブルにカラムが存在しない場合

    → 新しいカラムが対象のテーブルに追加される。既存のレコードの新しいカラムにはnullが設定される。

  • データソースにカラムが存在しないが、対象のテーブルにはカラムが存在する場合

    → 新しいレコードを追加・レコードを更新すると、対象のカラムにnullが設定される。

mergeSchemaオプションを使用するには、対象のテーブルのプロパティ設定に 'write.spark.accept-any-schema'='true' を追加する必要があります。

ALTER TABLE test_database.test_table SET TBLPROPERTIES (
  'write.spark.accept-any-schema'='true'
)

上記のプロパティ追加のクエリをAthena経由で実行すると、サポートされていないプロパティエラーがでます。

Unsupported table property key: write.spark.accept-any-schema

Athenaでは変更可能なicebergテーブルのプロパティに制限があります。
Create Iceberg tables - Amazon Athena

そのため、上記のプロパティ変更をAthena経由では行えません。

AWSが出してるicebergテーブルに関する記事で、'write.spark.accept-any-schema'='true' のプロパティ設定をspark経由で実行してるのを確認できます。
Modernize your legacy databases with AWS data lakes, Part 2: Build a data lake using AWS DMS data on Apache Iceberg | AWS Big Data Blog

本記事ではこの方法に習い、spark経由で必要なプロパティ設定を行います。

既存テーブルへのデータ追加の実装を以下のように変更しました。

sql = f"ALTER TABLE {table_path} SET TBLPROPERTIES ('write.spark.accept-any-schema' = 'true')"
glue_context.spark_session.sql(sql)
df.writeTo(table_path).option("mergeSchema","true").append()

新しいカラムを持つcsvデータを読み込むGlueJobを実行します

$ aws glue start-job-run \
    --job-name update_iceberg_table_schema \
    --arguments '{"--file_name": "test_data_new_column.csv"}'

しかし、上記の設定のGlueJobは失敗します。

原因はcol1, col2のカラムの順番がテーブルスキーマと元データで異なるためです。

2024-12-09 01:21:18,419 ERROR [main] glue.ProcessLauncher (Logging.scala:logError(77)): Error from Python:Traceback (most recent call last):
  File "/tmp/update_iceberg_table_schema.py", line 74, in <module>
    main(args)
  File "/tmp/update_iceberg_table_schema.py", line 63, in main
    df.writeTo(table_path).option("mergeSchema","true").append()
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 1460, in append
    self._jwriter.append()
  File "/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 196, in deco
    raise converted from None
pyspark.sql.utils.IllegalArgumentException: Cannot write incompatible dataset to table with schema:
table {
  1: col1: optional string
  2: col2: optional string
  3: col3: optional string
}
Provided schema:
table {
  2: col2: optional string
  1: col1: optional string
  3: col3: optional string
}
Problems:
* col2 is out of order, before col1

icebergのspark optionには check-ordering という設定があります。

これは入力スキーマとテーブルスキーマが同じかチェックする設定で、デフォルトではTrueとなります。

icebergのGitHubレポジトリのmergeSchemaに関するissueで議論されているように、スキーマ変更するためにcheck-ordering をFalseにする必要があります。
Adding new columns (mergeSchema) · Issue #8908 · apache/iceberg · GitHub

GlueJobのリソースを作成するterraform設定に、以下のconfigを追加します。

--conf spark.sql.iceberg.check-ordering=false

GlueJobの変更を反映します

$ terraform apply

新しいカラムを持つcsvデータを読み込むGlue Jobを再び実行します。今度はJobが成功します。

$ aws glue start-job-run \
    --job-name update_iceberg_table_schema \
    --arguments '{"--file_name": "test_data_new_column.csv"}'

Glue Catalogのスキーマを確認すると、スキーマが変更されcol3が追加されているのを確認できます。

glue catalog with new column

test_tableの中身を確認してみると、既存のレコードには新しく追加されたcol3のカラムにnullが設定されています。

test_table

GlueJobから動的にiceberg tableのスキーマを変更できることができるようになりました。

参考