肉球でキーボード

MLエンジニアの技術ブログです

GlueJobのicebergテーブル処理テストをローカルで実行する

AWS Glue Jobによるicebergテーブル操作

AWS Glueではicebergテーブルフォーマットがサポートされています。
AWS Glue での Iceberg フレームワークの使用 - AWS Glue

iceberg形式は大規模なデータセットを効率的に処理を行うことができる、データレイクに保存されてるデータ処理に向いたテーブル形式です。

iceberg形式の主要な特徴として以下が挙げられます
Apache Iceberg - Apache Iceberg™

  • SQL操作
    データの追加・更新・削除をSQLコマンドで実行可能

  • スキーマ進化
    テーブルスキーマの柔軟な変更が可能

  • 隠れパーティショニング
    人手管理が不要なパーティションを自動で作成

  • タイムトラベルとロールバック
    特定時刻のテーブル状態の保持と切り替え

  • データ圧縮

architecture
引用: Spec - Apache Iceberg™
icebergテーブルは主に3つの要素で構成されます

Glue Jobでiceberg形式のテーブルへの処理を行う際、実際のAWSリソースへのアクセスを行わずに動作確認したい場合は2つの方法が検討できます

  • LocalStackのAWS Glueを使用 (Pro版のみ使用可 Glue | Docs)
  • ローカル環境に作成したicebergテーブルを使用

今回は2つ目のローカル環境で作成したicebergテーブルを使用する方法を試してみようと思います。

本文中コード
github.com

テスト対象のGlue Job

import sys
from typing import Dict

from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions
from awsglue.dynamicframe import DynamicFrame
from pyspark.context import SparkContext

from pyspark.sql import DataFrame

S3_ENDPOINT_URL = "http://s3.dev:4566"
S3_BUCKET = "test-job-bucket"
TABLE_NAME = "test_table"
DATABASE_NAME = "test_database"

def get_dynamic_frame_from_s3(glue_context: GlueContext, source_s3_path: str) -> DynamicFrame:
    dyf = glue_context.create_dynamic_frame.from_options(
        format_options={
            "quoteChar": '"',
            "withHeader": True,
            "separator": ",",
        },
        connection_type="s3",
        format="csv",
        connection_options={
            "paths": [source_s3_path],
            "recurse": True,
        },
    )
    return dyf

def check_table_in_database(glue_context: GlueContext, database_name: str, table_name: str) -> bool:
    tables_collection = glue_context.spark_session.catalog.listTables(database_name)
    return table_name in [table.name for table in tables_collection]

def append_iceberg_table(df: DataFrame, table_name: str, table_location) -> None:
    df.writeTo(table_name).tableProperty("format-version", "2").tableProperty("location", table_location).append()

def create_iceberg_table(df: DataFrame, table_name: str, table_location) -> None:
    df.writeTo(table_name).tableProperty("format-version", "2").tableProperty("location", table_location).create()

def main(args: Dict[str, str]) -> None:
    sc = SparkContext()
    glue_context = GlueContext(sc)

    job = Job(glue_context)
    job.init(args["JOB_NAME"], args)

    dyf = get_dynamic_frame_from_s3(glue_context=glue_context, source_s3_path=f"s3://{S3_BUCKET}/input")
    df = dyf.toDF()
    is_exist = check_table_in_database(glue_context=glue_context, database_name=DATABASE_NAME, table_name=TABLE_NAME)
    if is_exist:
        append_iceberg_table(df, TABLE_NAME, f"s3://{S3_BUCKET}/output")
    else:
        create_iceberg_table(df, TABLE_NAME, f"s3://{S3_BUCKET}/output")

    job.commit()

if __name__ == "__main__":
    args = getResolvedOptions(sys.argv, ["JOB_NAME"])
    main(args)

以下の処理を実行するGlueJobのスクリプトです。

  • S3からcsvデータを読み込み
  • テーブルが存在する場合→既存テーブルにデータを追加
  • テーブルが存在しない場合→新規テーブルを作成

ローカルのS3リソースを利用してテストする方法をこちらの記事で紹介してるので、S3からのデータ読み込み部分は本記事では割愛します。

LocalStackのS3環境を利用したAWS Glue Jobローカル実行・テスト方法 - 肉球でキーボード

テスト用Glue Contextの設定

from awsglue.context import GlueContext
from pyspark.sql import SparkSession

WAREHOUSE_PATH = "./spark-warehouse"

@pytest.fixture(scope="session")
def glue_context() -> GlueContext:
    spark = (
        SparkSession.builder.master("local[1]")
        # Configure for testing iceberg table operation
        .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
        .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
        .config("spark.sql.catalog.local.type", "hadoop")
        .config("spark.sql.catalog.local.warehouse", WAREHOUSE_PATH)
        .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.3.0")
        .config("spark.sql.catalog.local.default.write.metadata-flush-after-create", "true")
        .config("spark.sql.defaultCatalog", "local")
        # Configure for testing local hive metastore
        .config("spark.sql.hive.metastore.jars", "builtin")
        .config("spark.sql.hive.metastore.version", "2.3.9")
        .config("spark.sql.catalogImplementation", "in-memory")
        .getOrCreate()
    )

    yield GlueContext(spark.sparkContext)
    spark.stop()

ローカル環境で作成したicebergテーブルにアクセスする設定を追加してます。

設定はこちらの記事を参考にしています。

Navigating the Iceberg: unit testing iceberg tables with Pyspark | by Hannes De Smet | datamindedbe | Medium

ローカル開発用のGlueの公式imageには以下の内容の/home/glue_user/spark/conf/hive-site.xml が存在してます。

調べてみるとhive-site.xmlはDBの接続情報を管理するファイルらしく、デフォルトではこちらの設定を読み込んで、AWS Glue Catalogのメタデータにアクセスする挙動となります。

<configuration>
    <property>
        <name>hive.metastore.connect.retries</name>
        <value>15</value>
    </property>
    <property>
        <name>hive.metastore.client.factory.class</name>
        <value>com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory</value>
    </property>
</configuration>

pytestコード

@pytest.fixture(scope="session")
def cleanup_warehouse() -> None:
    yield
    if os.path.exists(WAREHOUSE_PATH):
        shutil.rmtree(WAREHOUSE_PATH)

@pytest.fixture(scope="module")
def test_table(glue_context: GlueContext, sample_dataframe: DataFrame) -> str:
    spark = glue_context.spark_session
    table_name = "test_table"
    try:
        sample_dataframe.writeTo(f"local.{table_name}").create()
        yield table_name
    finally:
        spark.sql(f"DROP TABLE IF EXISTS local.{table_name}")

def test_create_iceberg_table(glue_context: GlueContext, cleanup_warehouse: None, sample_dataframe: DataFrame) -> None:
    spark = glue_context.spark_session

    # new table setting
    table_name = "test_new_table"
    table_full_name = f"local.{table_name}"
    table_location = f"{WAREHOUSE_PATH}/{table_name}"

    create_iceberg_table(df=sample_dataframe, table_name=table_full_name, table_location=table_location)

    result_df = spark.table(table_full_name)
    assert result_df.collect() == sample_dataframe.collect()

def test_append_iceberg_table(
    glue_context: GlueContext, cleanup_warehouse: None, sample_dataframe: DataFrame, test_table: str
) -> None:
    spark = glue_context.spark_session

    table_full_name = f"local.{test_table}"
    table_location = f"{WAREHOUSE_PATH}/{test_table}"

    append_data = [
        ("val4", 4, "2000/01/04 04:00:00"),
        ("val5", 5, "2000/01/05 05:00:00"),
    ]
    append_df = spark.createDataFrame(append_data, sample_dataframe.schema)

    append_iceberg_table(df=append_df, table_name=table_full_name, table_location=table_location)

    result_df = spark.table(table_full_name)
    new_df = result_df.filter(~col("col1").isin(["val1", "val2", "val3"]))
    assert new_df.collect() == append_df.collect()
    assert result_df.count() == 5

以下のフィクスチャを利用しています。

  • cleanup_warehouse
    • ローカルのテーブルを永続化するフォルダの作成・削除
  • test_table
    • テスト用テーブルの作成・削除

外部のデータストアにアクセスせず、テーブルの新規作成・データ追加のテストができているのを確認できます。

参考