
AWS Glue Jobによるicebergテーブル操作
AWS Glueではicebergテーブルフォーマットがサポートされています。
AWS Glue での Iceberg フレームワークの使用 - AWS Glue
iceberg形式は大規模なデータセットを効率的に処理を行うことができる、データレイクに保存されてるデータ処理に向いたテーブル形式です。
iceberg形式の主要な特徴として以下が挙げられます
Apache Iceberg - Apache Iceberg™

icebergテーブルは主に3つの要素で構成されます
Glue Jobでiceberg形式のテーブルへの処理を行う際、実際のAWSリソースへのアクセスを行わずに動作確認したい場合は2つの方法が検討できます
- LocalStackのAWS Glueを使用 (Pro版のみ使用可 Glue | Docs)
- ローカル環境に作成したicebergテーブルを使用
今回は2つ目のローカル環境で作成したicebergテーブルを使用する方法を試してみようと思います。
本文中コード
github.com
テスト対象のGlue Job
import sys from typing import Dict from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions from awsglue.dynamicframe import DynamicFrame from pyspark.context import SparkContext from pyspark.sql import DataFrame S3_ENDPOINT_URL = "http://s3.dev:4566" S3_BUCKET = "test-job-bucket" TABLE_NAME = "test_table" DATABASE_NAME = "test_database" def get_dynamic_frame_from_s3(glue_context: GlueContext, source_s3_path: str) -> DynamicFrame: dyf = glue_context.create_dynamic_frame.from_options( format_options={ "quoteChar": '"', "withHeader": True, "separator": ",", }, connection_type="s3", format="csv", connection_options={ "paths": [source_s3_path], "recurse": True, }, ) return dyf def check_table_in_database(glue_context: GlueContext, database_name: str, table_name: str) -> bool: tables_collection = glue_context.spark_session.catalog.listTables(database_name) return table_name in [table.name for table in tables_collection] def append_iceberg_table(df: DataFrame, table_name: str, table_location) -> None: df.writeTo(table_name).tableProperty("format-version", "2").tableProperty("location", table_location).append() def create_iceberg_table(df: DataFrame, table_name: str, table_location) -> None: df.writeTo(table_name).tableProperty("format-version", "2").tableProperty("location", table_location).create() def main(args: Dict[str, str]) -> None: sc = SparkContext() glue_context = GlueContext(sc) job = Job(glue_context) job.init(args["JOB_NAME"], args) dyf = get_dynamic_frame_from_s3(glue_context=glue_context, source_s3_path=f"s3://{S3_BUCKET}/input") df = dyf.toDF() is_exist = check_table_in_database(glue_context=glue_context, database_name=DATABASE_NAME, table_name=TABLE_NAME) if is_exist: append_iceberg_table(df, TABLE_NAME, f"s3://{S3_BUCKET}/output") else: create_iceberg_table(df, TABLE_NAME, f"s3://{S3_BUCKET}/output") job.commit() if __name__ == "__main__": args = getResolvedOptions(sys.argv, ["JOB_NAME"]) main(args)
以下の処理を実行するGlueJobのスクリプトです。
- S3からcsvデータを読み込み
- テーブルが存在する場合→既存テーブルにデータを追加
- テーブルが存在しない場合→新規テーブルを作成
ローカルのS3リソースを利用してテストする方法をこちらの記事で紹介してるので、S3からのデータ読み込み部分は本記事では割愛します。
LocalStackのS3環境を利用したAWS Glue Jobローカル実行・テスト方法 - 肉球でキーボード
テスト用Glue Contextの設定
from awsglue.context import GlueContext from pyspark.sql import SparkSession WAREHOUSE_PATH = "./spark-warehouse" @pytest.fixture(scope="session") def glue_context() -> GlueContext: spark = ( SparkSession.builder.master("local[1]") # Configure for testing iceberg table operation .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") .config("spark.sql.catalog.local.type", "hadoop") .config("spark.sql.catalog.local.warehouse", WAREHOUSE_PATH) .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.3.0") .config("spark.sql.catalog.local.default.write.metadata-flush-after-create", "true") .config("spark.sql.defaultCatalog", "local") # Configure for testing local hive metastore .config("spark.sql.hive.metastore.jars", "builtin") .config("spark.sql.hive.metastore.version", "2.3.9") .config("spark.sql.catalogImplementation", "in-memory") .getOrCreate() ) yield GlueContext(spark.sparkContext) spark.stop()
ローカル環境で作成したicebergテーブルにアクセスする設定を追加してます。
設定はこちらの記事を参考にしています。
ローカル開発用のGlueの公式imageには以下の内容の/home/glue_user/spark/conf/hive-site.xml が存在してます。
調べてみるとhive-site.xmlはDBの接続情報を管理するファイルらしく、デフォルトではこちらの設定を読み込んで、AWS Glue Catalogのメタデータにアクセスする挙動となります。
<configuration> <property> <name>hive.metastore.connect.retries</name> <value>15</value> </property> <property> <name>hive.metastore.client.factory.class</name> <value>com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory</value> </property> </configuration>
pytestコード
@pytest.fixture(scope="session") def cleanup_warehouse() -> None: yield if os.path.exists(WAREHOUSE_PATH): shutil.rmtree(WAREHOUSE_PATH) @pytest.fixture(scope="module") def test_table(glue_context: GlueContext, sample_dataframe: DataFrame) -> str: spark = glue_context.spark_session table_name = "test_table" try: sample_dataframe.writeTo(f"local.{table_name}").create() yield table_name finally: spark.sql(f"DROP TABLE IF EXISTS local.{table_name}") def test_create_iceberg_table(glue_context: GlueContext, cleanup_warehouse: None, sample_dataframe: DataFrame) -> None: spark = glue_context.spark_session # new table setting table_name = "test_new_table" table_full_name = f"local.{table_name}" table_location = f"{WAREHOUSE_PATH}/{table_name}" create_iceberg_table(df=sample_dataframe, table_name=table_full_name, table_location=table_location) result_df = spark.table(table_full_name) assert result_df.collect() == sample_dataframe.collect() def test_append_iceberg_table( glue_context: GlueContext, cleanup_warehouse: None, sample_dataframe: DataFrame, test_table: str ) -> None: spark = glue_context.spark_session table_full_name = f"local.{test_table}" table_location = f"{WAREHOUSE_PATH}/{test_table}" append_data = [ ("val4", 4, "2000/01/04 04:00:00"), ("val5", 5, "2000/01/05 05:00:00"), ] append_df = spark.createDataFrame(append_data, sample_dataframe.schema) append_iceberg_table(df=append_df, table_name=table_full_name, table_location=table_location) result_df = spark.table(table_full_name) new_df = result_df.filter(~col("col1").isin(["val1", "val2", "val3"])) assert new_df.collect() == append_df.collect() assert result_df.count() == 5
以下のフィクスチャを利用しています。
- cleanup_warehouse
- ローカルのテーブルを永続化するフォルダの作成・削除
 
- test_table
- テスト用テーブルの作成・削除
 
外部のデータストアにアクセスせず、テーブルの新規作成・データ追加のテストができているのを確認できます。