肉球でキーボード

MLエンジニアの技術ブログです

Amazon Data Firehoseを用いたApache Icebergテーブルの更新

本記事のコードです。
github.com

Amazon Data Firehoseとは

概要

Amazon Data Firehoseはストリーミングデータの取得・変換・保存を行うマネージドサービスです。
例えば、Eコマースのシステムのデータ基盤を開発する場合、商品のクリックや購入のイベントログを、リアルタイムでデータ基盤に反映する要件が発生します。
このような要件を満たすには、アプリケーションのイベントデータを収集し、データ形式に合わせた変換処理を行い、分析テーブルへの保存をリアルタイムで行うデータパイプラインが必要となります。
アプリケーションのイベントデータのような、継続的にリアルタイムで生成・送信されるデータをストリーミングデータと呼びます。
ストリーミングデータとは - ストリーミングデータの説明 - AWS

様々なストリーミングデータを扱う必要がある場合、リアルタイムで処理を行うデータパイプラインを自前で実装するのは考慮すべき要素が多く開発・運用コストが高いです。
例えば、データ処理が詰まらないためのバッファリング機能や、データ処理エラー時のバックアップ機能、データ保存先が複数ある場合のルーティング機能などを考慮する必要があります。
Amazon Data Firehoseを利用することで、このようなストリーミングデータの取得・変換・保存に関わる一連の機能をAWSが提供してくれます。
Amazon Data Firehose とは何ですか? - Amazon Data Firehose
ストリーミングデータパイプライン – Amazon Data Firehose (Firehose) – AWS

Apache Icebergテーブルへのデータ配信

Amazon Data Firehoseは様々なデータ送信先をサポートしています。
例えば、Amazon S3Amazon RedshiftのようなAWSサービスから、Datadog、SnowflakeのようなSaaSサービスに対しても、Amazon Data Firehoseからデータを送信することができます。
これらのデータ送信先の中から、Amazon Data Firehoseは Apache Icebergテーブルをサポートしています。
AWSでデータレイクを開発する場合、Apache Iceberg形式で保存するのが候補として上がります。
Apache Iceberg とは? – Iceberg テーブルの説明 – AWS
Apache Iceberg - Apache Iceberg™

AWSApache Icebergテーブルを管理する場合、Amazon S3 Tablesで管理する方法と、セルフマネージド管理の2つの方法があります。
Amazon Data Firehoseはこれら2つの方法で管理されたApache Icebergテーブルへのデータ転送をサポートしています。
Amazon Data Firehose を使用して Apache Iceberg テーブルにデータを配信する - Amazon Data Firehose

また、Amazon Data Firehoseが取得したデータに応じて、動的にデータ保存先テーブルを変更することができます。

lambda関数によるデータ変換処理

Amazon Data Firehoseはストリーミングデータの取得・変換・保存を行うマネージドサービスです。
Amazon Data Firehoseは、データ変換にあたる処理をユーザーが実装したlambda関数で行うことができます。

Amazon Data Firehose でソースデータを変換する - Amazon Data Firehose

lambda関数のreturnデータ内容に応じて、動的にデータ保存先テーブルを変更できます。
以下のように、metadataセクションを追懐することで保存先テーブルを指定することができます。

 {
    "recordId": "49655962066601463032522589543535113056108699331451682818000000",
    "result": "Ok",
    "data": "1IiwiI6ICJmYWxsIiwgImdgU21IiwiI6ICJmYWxsIiwg==tcHV0ZXIgU2NpZW5jZSIsICJzZW1",
    "metadata":{
    "otfMetadata":{
            "destinationTableName":"Device1234",
            "destinationDatabaseName":"IoTevents",
            "operation":"insert"
        }
    }
  }

Route incoming records to different Iceberg tables - Amazon Data Firehose

構成

全体像

Architecture

本記事で作成するシステムの構成図です。
以下の処理を行います。

  • (データ取得) Amazon Data FirehoseにAmazon SESのイベントデータを送信
  • (データ変換) AWS Lambdaで保存先テーブルに合わせてイベントデータをデータ変換
  • (データ保存) AWS Glue Data Catalogに保存された Apache Icebergテーブルにレコードを追加

使用データ

本記事ではAWSが提供するメール配信サービスのAmazon SESのイベントデータを使用します。
Amazon SES が Firehose に発行するイベントデータの例 - Amazon Simple Email Service
3つのメール配信イベントのデータを使用します。

  • Send: メール送信
  • Open: メール開封
  • Click: メールクリック

Sendイベントログ例

{
  "eventType": "Send",
  "mail": {
    "timestamp": "2016-10-14T05:02:16.645Z",
    "source": "sender@example.com",
    "sourceArn": "arn:aws:ses:us-east-1:123456789012:identity/sender@example.com",
    "sendingAccountId": "123456789012",
    "messageId": "EXAMPLE7c191be45-e9aedb9a-02f9-4d12-a87d-dd0099a07f8a-000000",
    "destination": [
      "recipient@example.com"
    ],
    "headersTruncated": false,
    "headers": [
      {
        "name": "From",
        "value": "sender@example.com"
      },
      {
        "name": "To",
        "value": "recipient@example.com"
      },
      {
        "name": "Subject",
        "value": "Message sent from Amazon SES"
      },
      {
        "name": "MIME-Version",
        "value": "1.0"
      },
      {
        "name": "Content-Type",
        "value": "multipart/mixed;  boundary=\"----=_Part_0_716996660.1476421336341\""
      },
      {
        "name": "X-SES-MESSAGE-TAGS",
        "value": "myCustomTag1=myCustomTagValue1, myCustomTag2=myCustomTagValue2"
      }
    ],
    "commonHeaders": {
      "from": [
        "sender@example.com"
      ],
      "to": [
        "recipient@example.com"
      ],
      "messageId": "EXAMPLE7c191be45-e9aedb9a-02f9-4d12-a87d-dd0099a07f8a-000000",
      "subject": "Message sent from Amazon SES"
    },
    "tags": {
      "ses:configuration-set": [
        "ConfigSet"
      ],
      "ses:source-ip": [
        "192.0.2.0"
      ],
      "ses:from-domain": [
        "example.com"
      ],
      "ses:caller-identity": [
        "ses_user"
      ],
      "myCustomTag1": [
        "myCustomTagValue1"
      ],
      "myCustomTag2": [
        "myCustomTagValue2"
      ]
    }
  },
  "send": {}
}

それぞれのイベントデータを保存する、send_log, open_log, click_logという名前のApache Icebergテーブルを作成します。

実装

Apache Icebergテーブル作成

本記事では、AWS Glue Data Catalogでテーブル情報を管理して Apache Icebergテーブルを作成します。
以下は、AWS Glue Data Catalogのデータベースと、Apache Icebergテーブルファイルを保存するS3バケットを作成するTerraformコードです。

resource "aws_s3_bucket" "glue_data" {
  bucket_prefix = "glue-data-"
}

resource "aws_glue_catalog_database" "email_event" {
  name         = "email_event"
  location_uri = "s3a://${aws_s3_bucket.glue_data.bucket}/email_event/"
}

Amazon AthenaでApache Iceberg形式でテーブルを作成するSQLを実行します。テーブルファイルの場所を、作成したS3バケットを指定します。
以下は send_logを作成するSQLです。カラムはSESのSendイベントが持つ値を部分的に選択しています。

CREATE TABLE email_event.send_log (
    event_type string,
    message_id string,
    from_address string,
    to_address string,
    timestamp timestamp,
    tags string
)
LOCATION 's3://glue-data-20250924084554937700000001/email_event/send_log/'
TBLPROPERTIES (
    'table_type' = 'ICEBERG',
    'format' = 'parquet',
    'write_compression' = 'gzip'
);

AWSマネジメントコンソールのAWS Glueのサービスページから、send_log, open_log, click_logが作成されていることを確認します。

AWS Glue Data Catalog

データ処理用lambda関数作成

Amazon SESのSend, Open, Clickイベントデータを受け取り、Apache Icebergテーブルのレコードに変換するlambda関数の実装です。
公式ドキュメントに記載されている、Amazon Data Firehoseが取得したデータをlambda関数内で処理するサンプルコードを参考にしています。
Route incoming records to different Iceberg tables - Amazon Data Firehose

イベントデータに応じて、送信先テーブルを変更するようにしています。

import base64
import json

def lambda_handler(firehose_records_input, context):
    print(
        "Received records for processing from DeliveryStream: "
        + firehose_records_input["deliveryStreamArn"]
    )

    firehose_records_output = {}
    firehose_records_output["records"] = []

    for firehose_record_input in firehose_records_input["records"]:
        payload_bytes = base64.b64decode(firehose_record_input["data"]).decode("utf-8")
        payload_dict = json.loads(payload_bytes)

        converted_data_dict = {}
        destination_table_name = None

        # 共通イベントデータ
        event_type = payload_dict["eventType"]
        converted_data_dict["event_type"] = event_type
        converted_data_dict["message_id"] = payload_dict["mail"]["messageId"]
        converted_data_dict["tags"] = payload_dict["mail"]["tags"]
        headers = payload_dict["mail"]["headers"]
        for header in headers:
            match header["name"]:
                case "From":
                    converted_data_dict["from_address"] = header["value"]
                case "To":
                    converted_data_dict["to_address"] = header["value"]

        # Sendイベントデータ用処理
        if event_type == "Send":
            converted_data_dict["timestamp"] = payload_dict["mail"]["timestamp"]
            destination_table_name = "send_log"

        # Openイベントデータ用処理
        if event_type == "Open":
            converted_data_dict["ip_address"] = payload_dict["open"]["ipAddress"]
            converted_data_dict["user_agent"] = payload_dict["open"]["userAgent"]
            converted_data_dict["timestamp"] = payload_dict["open"]["timestamp"]
            destination_table_name = "open_log"

        # Clickイベントデータ用処理
        if event_type == "Click":
            converted_data_dict["ip_address"] = payload_dict["click"]["ipAddress"]
            converted_data_dict["link"] = payload_dict["click"]["link"]
            converted_data_dict["link_tags"] = payload_dict["click"]["linkTags"]
            converted_data_dict["user_agent"] = payload_dict["click"]["userAgent"]
            converted_data_dict["timestamp"] = payload_dict["click"]["timestamp"]
            destination_table_name = "click_log"

        converted_data_str = json.dumps(converted_data_dict)
        print("converted_data_str:", converted_data_str)

        # イベントごとに送信先テーブルを指定
        firehose_record_output = {}
        firehose_record_output["data"] = base64.b64encode(
            converted_data_str.encode("utf-8")
        )
        firehose_record_output["recordId"] = firehose_record_input["recordId"]
        firehose_record_output["result"] = "Ok"
        firehose_record_output["metadata"] = {
            "otfMetadata": {
                "destinationDatabaseName": "email_event",
                "destinationTableName": destination_table_name,
                "operation": "insert",
            }
        }
        firehose_records_output["records"].append(firehose_record_output)
        print(firehose_record_output)

    return firehose_records_output

データ変換用lambda関数のIAM Roleを作成するTerraformコードです。
lambda関数のログをCloud Watchに書き込むための権限を付与します。

resource "aws_cloudwatch_log_group" "firehose_processor_lambda" {
  name              = "/aws/lambda/firehose-processor"
  retention_in_days = 365
}

resource "aws_iam_role" "firehose_processor_lambda" {
  name = "firehose_processor_lambda_role"

  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Action = "sts:AssumeRole"
        Effect = "Allow"
        Principal = {
          Service = "lambda.amazonaws.com"
        }
      }
    ]
  })
}

resource "aws_iam_policy" "firehose_processor_lambda" {
  name        = "firehose_processor_lambda_policy"
  description = "Policy for Firehose processor Lambda role"

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = [
          "logs:CreateLogGroup",
          "logs:CreateLogStream",
          "logs:PutLogEvents"
        ]
        Resource = [
          "arn:aws:logs:${local.aws_region}:${local.aws_account_id}:log-group:${aws_cloudwatch_log_group.firehose_processor_lambda.name}:*"
        ]
      }
    ]
  })
}

lambda関数用スクリプトをzip化した後、lambda関数を作成するTerraformコードです。

data "archive_file" "firehose_processor_lambda" {
  type        = "zip"
  source_dir  = "../lambda/"
  output_path = "../lambda/firehose_processor.zip"
}

resource "aws_lambda_function" "firehose_processor_lambda" {
  function_name    = "firehose-processor-handler"
  role             = aws_iam_role.firehose_processor_lambda.arn
  filename         = data.archive_file.firehose_processor_lambda.output_path
  handler          = "firehose_processor.lambda_handler"
  runtime          = "python3.12"
  timeout          = 70
  source_code_hash = filebase64sha256(data.archive_file.firehose_processor_lambda.output_path)

  logging_config {
    log_group  = aws_cloudwatch_log_group.firehose_processor_lambda.name
    log_format = "Text"
  }

  depends_on = [
    aws_cloudwatch_log_group.firehose_processor_lambda,
    data.archive_file.firehose_processor_lambda,
  ]
}

resource "aws_lambda_alias" "firehose_processor_lambda_default" {
  name             = "default"
  description      = "Default version."
  function_name    = aws_lambda_function.firehose_processor_lambda.arn
  function_version = "1"
  lifecycle {
    ignore_changes = [function_version]
  }
}

Apache Icebergテーブルへのデータ送信用Amazon Data Firehose作成

Amazon Data Firehoseの IAM Roleを作成するTerraformコードです。
Lambda関数を起動した後、AWS Glue Data Catalogに登録したApache Icebergテーブルに書き込みを行うための権限を付与しています。

resource "aws_cloudwatch_log_group" "firehose_load_iceberg" {
  name              = "/aws/firehose/firehose-load-iceberg"
  retention_in_days = 365
}

resource "aws_cloudwatch_log_stream" "firehose_load_iceberg" {
  name           = "firehose-load-iceberg-log-stream"
  log_group_name = aws_cloudwatch_log_group.firehose_load_iceberg.name
}

resource "aws_iam_role" "firehose_load_iceberg" {
  name = "firehose_load_iceberg_role"

  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Action = "sts:AssumeRole"
        Effect = "Allow"
        Principal = {
          Service = "firehose.amazonaws.com"
        }
      }
    ]
  })
}

resource "aws_iam_policy" "firehose_load_iceberg" {
  name        = "firehose_load_iceberg_policy"
  description = "Policy for Firehose load iceberg role"

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = [
          "s3:AbortMultipartUpload",
          "s3:GetBucketLocation",
          "s3:GetObject",
          "s3:ListBucket",
          "s3:ListBucketMultipartUploads",
          "s3:PutObject",
          "s3:DeleteObject"
        ]
        Resource = [
          "arn:aws:s3:::*",
          "arn:aws:s3:::*/*"
        ]
      },
      {
        Effect = "Allow"
        Action = [
          "logs:CreateLogGroup",
          "logs:CreateLogStream",
          "logs:PutLogEvents"
        ]
        Resource = [
          "arn:aws:logs:${local.aws_region}:${local.aws_account_id}:log-group:${aws_cloudwatch_log_group.firehose_load_iceberg.name}:*",
          aws_cloudwatch_log_stream.firehose_load_iceberg.arn
        ]
      },
      {
        Effect = "Allow"
        Action = [
          "glue:GetTable",
          "glue:GetDatabase",
          "glue:UpdateTable"
        ]
        Resource = [
          "arn:aws:glue:*:*:catalog",
          "arn:aws:glue:*:*:database/*",
          "arn:aws:glue:*:*:table/*/*"
        ]
      },
      {
        Effect = "Allow"
        Action = [
          "kinesis:DescribeStream",
          "kinesis:GetShardIterator",
          "kinesis:GetRecords",
          "kinesis:ListShards"
        ]
        Resource = [
          "arn:aws:kinesis:*:*:stream/*"
        ]
      },
      {
        Effect = "Allow"
        Action = [
          "lambda:InvokeFunction",
          "lambda:GetFunctionConfiguration"
        ]
        Resource = [
          "arn:aws:lambda:*:*:function:*:*"
        ]
      }
    ]
  })
}

resource "aws_iam_role_policy_attachment" "firehose_load_iceberg" {
  role       = aws_iam_role.firehose_load_iceberg.name
  policy_arn = aws_iam_policy.firehose_load_iceberg.arn
}

エラーデータ保存用のS3バケットと、Amazon Data Firehoseを作成するTerraformコードです。
aws_kinesis_firehose_delivery_stream リソースでAmazon Data Firehoseを作成します。
Apache Icebergテーブルをデータ送信先とするため、iceberg_configurationの設定を行います。また、Amazon Data Firehoseで実行するlambda関数を紐づけています。

Terraform Registry

resource "aws_s3_bucket" "firehose_load_iceberg" {
  bucket_prefix = "firehose-load-iceberg-"
}

resource "aws_kinesis_firehose_delivery_stream" "firehose_load_iceberg" {
  name        = "firehose-load-iceberg"
  destination = "iceberg"

  iceberg_configuration {
    role_arn           = aws_iam_role.firehose_load_iceberg.arn
    catalog_arn        = "arn:aws:glue:${local.aws_region}:${local.aws_account_id}:catalog"
    buffering_size     = 5
    buffering_interval = 10

    s3_configuration {
      role_arn   = aws_iam_role.firehose_load_iceberg.arn
      bucket_arn = aws_s3_bucket.firehose_load_iceberg.arn
    }

    processing_configuration {
      enabled = "true"

      processors {
        type = "Lambda"

        parameters {
          parameter_name  = "LambdaArn"
          parameter_value = "${aws_lambda_function.firehose_processor_lambda.arn}:${aws_lambda_alias.firehose_processor_lambda_default.name}"
        }
        parameters {
          parameter_name  = "BufferIntervalInSeconds"
          parameter_value = 10
        }
      }
    }

    cloudwatch_logging_options {
      enabled         = true
      log_group_name  = aws_cloudwatch_log_group.firehose_load_iceberg.name
      log_stream_name = aws_cloudwatch_log_stream.firehose_load_iceberg.name
    }
  }
}

動作確認

Amazon Data Firehoseへデータ送信

JSONファイルとして保存したAmazon SESのイベントデータを読み込み、Amazon Data Firehoseにデータを送信するPyhtonスクリプトです。

import json

import boto3

DELIVERY_STREAM_NAME = "firehose-load-iceberg"
JSON_FILE_PATH = "event_data/open.json"

def main():
    with open(JSON_FILE_PATH, "r", encoding="utf-8") as file:
        data = json.load(file)

    firehose_client = boto3.client("firehose", region_name="ap-northeast-1")
    response = firehose_client.put_record(
        DeliveryStreamName=DELIVERY_STREAM_NAME,
        Record={"Data": json.dumps(data, ensure_ascii=False).encode("utf-8")},
    )
    print(f"send data to firehose success: RecordId={response['RecordId']}")

if __name__ == "__main__":
    main()

Amazon Athenaから、Apache Icebergテーブルにデータが追加されていることを確認できます。

Amazon Athena

エラーデータを保存するS3バケットに、processing-failedが自動的に作成され、以下のようなファイルが保存されます。

エラーデータ保存用S3 Bucket

エラーデータのファイルには以下のような取得データと、エラーメッセージが含まれます。

{"rawData":"eyJldmVudFR5cGUiOiAiU2VuZCIsICJtYWlsIjogeyJ0aW1lc3RhbXAiOiAiMjAxNi0xMC0xNFQwNTowMjoxNi42NDVaIiwgInNvdXJjZSI6ICJzZW5kZXJAZXhhbXBsZS5jb20iLCAic291cmNlQXJuIjogImFybjphd3M6c2VzOnVzLWVhc3QtMToxMjM0NTY3ODkwMTI6aWRlbnRpdHkvc2VuZGVyQGV4YW1wbGUuY29tIiwgInNlbmRpbmdBY2NvdW50SWQiOiAiMTIzNDU2Nzg5MDEyIiwgIm1lc3NhZ2VJZCI6ICJFWEFNUExFN2MxOTFiZTQ1LWU5YWVkYjlhLTAyZjktNGQxMi1hODdkLWRkMDA5OWEwN2Y4YS0wMDAwMDAiLCAiZGVzdGluYXRpb24iOiBbInJlY2lwaWVudEBleGFtcGxlLmNvbSJdLCAiaGVhZGVyc1RydW5jYXRlZCI6IGZhbHNlLCAiaGVhZGVycyI6IFt7Im5hbWUiOiAiRnJvbSIsICJ2YWx1ZSI6ICJzZW5kZXJAZXhhbXBsZS5jb20ifSwgeyJuYW1lIjogIlRvIiwgInZhbHVlIjogInJlY2lwaWVudEBleGFtcGxlLmNvbSJ9LCB7Im5hbWUiOiAiU3ViamVjdCIsICJ2YWx1ZSI6ICJNZXNzYWdlIHNlbnQgZnJvbSBBbWF6b24gU0VTIn0sIHsibmFtZSI6ICJNSU1FLVZlcnNpb24iLCAidmFsdWUiOiAiMS4wIn0sIHsibmFtZSI6ICJDb250ZW50LVR5cGUiLCAidmFsdWUiOiAibXVsdGlwYXJ0L21peGVkOyAgYm91bmRhcnk9XCItLS0tPV9QYXJ0XzBfNzE2OTk2NjYwLjE0NzY0MjEzMzYzNDFcIiJ9LCB7Im5hbWUiOiAiWC1TRVMtTUVTU0FHRS1UQUdTIiwgInZhbHVlIjogIm15Q3VzdG9tVGFnMT1teUN1c3RvbVRhZ1ZhbHVlMSwgbXlDdXN0b21UYWcyPW15Q3VzdG9tVGFnVmFsdWUyIn1dLCAiY29tbW9uSGVhZGVycyI6IHsiZnJvbSI6IFsic2VuZGVyQGV4YW1wbGUuY29tIl0sICJ0byI6IFsicmVjaXBpZW50QGV4YW1wbGUuY29tIl0sICJtZXNzYWdlSWQiOiAiRVhBTVBMRTdjMTkxYmU0NS1lOWFlZGI5YS0wMmY5LTRkMTItYTg3ZC1kZDAwOTlhMDdmOGEtMDAwMDAwIiwgInN1YmplY3QiOiAiTWVzc2FnZSBzZW50IGZyb20gQW1hem9uIFNFUyJ9LCAidGFncyI6IHsic2VzOmNvbmZpZ3VyYXRpb24tc2V0IjogWyJDb25maWdTZXQiXSwgInNlczpzb3VyY2UtaXAiOiBbIjE5Mi4wLjIuMCJdLCAic2VzOmZyb20tZG9tYWluIjogWyJleGFtcGxlLmNvbSJdLCAic2VzOmNhbGxlci1pZGVudGl0eSI6IFsic2VzX3VzZXIiXSwgIm15Q3VzdG9tVGFnMSI6IFsibXlDdXN0b21UYWdWYWx1ZTEiXSwgIm15Q3VzdG9tVGFnMiI6IFsibXlDdXN0b21UYWdWYWx1ZTIiXX19LCAic2VuZCI6IHt9fQo=","errorCode":"Lambda.InvokeAccessDenied","errorMessage":"Access was denied. Ensure that the access policy allows access to the Lambda function.","attemptsMade":4,"arrivalTimestamp":1758706488335,"attemptEndingTimestamp":1758706512945,"lambdaARN":"arn:aws:lambda:ap-northeast-1:559802578070:function:firehose-processor-handler"}

参考