本記事のコードです。
github.com
Amazon Data Firehoseとは
概要
Amazon Data Firehoseはストリーミングデータの取得・変換・保存を行うマネージドサービスです。
例えば、Eコマースのシステムのデータ基盤を開発する場合、商品のクリックや購入のイベントログを、リアルタイムでデータ基盤に反映する要件が発生します。
このような要件を満たすには、アプリケーションのイベントデータを収集し、データ形式に合わせた変換処理を行い、分析テーブルへの保存をリアルタイムで行うデータパイプラインが必要となります。
アプリケーションのイベントデータのような、継続的にリアルタイムで生成・送信されるデータをストリーミングデータと呼びます。
ストリーミングデータとは - ストリーミングデータの説明 - AWS
様々なストリーミングデータを扱う必要がある場合、リアルタイムで処理を行うデータパイプラインを自前で実装するのは考慮すべき要素が多く開発・運用コストが高いです。
例えば、データ処理が詰まらないためのバッファリング機能や、データ処理エラー時のバックアップ機能、データ保存先が複数ある場合のルーティング機能などを考慮する必要があります。
Amazon Data Firehoseを利用することで、このようなストリーミングデータの取得・変換・保存に関わる一連の機能をAWSが提供してくれます。
Amazon Data Firehose とは何ですか? - Amazon Data Firehose
ストリーミングデータパイプライン – Amazon Data Firehose (Firehose) – AWS
Apache Icebergテーブルへのデータ配信
Amazon Data Firehoseは様々なデータ送信先をサポートしています。
例えば、Amazon S3、Amazon RedshiftのようなAWSサービスから、Datadog、SnowflakeのようなSaaSサービスに対しても、Amazon Data Firehoseからデータを送信することができます。
これらのデータ送信先の中から、Amazon Data Firehoseは Apache Icebergテーブルをサポートしています。
AWSでデータレイクを開発する場合、Apache Iceberg形式で保存するのが候補として上がります。
Apache Iceberg とは? – Iceberg テーブルの説明 – AWS
Apache Iceberg - Apache Iceberg™
AWSでApache Icebergテーブルを管理する場合、Amazon S3 Tablesで管理する方法と、セルフマネージド管理の2つの方法があります。
Amazon Data Firehoseはこれら2つの方法で管理されたApache Icebergテーブルへのデータ転送をサポートしています。
Amazon Data Firehose を使用して Apache Iceberg テーブルにデータを配信する - Amazon Data Firehose
また、Amazon Data Firehoseが取得したデータに応じて、動的にデータ保存先テーブルを変更することができます。
lambda関数によるデータ変換処理
Amazon Data Firehoseはストリーミングデータの取得・変換・保存を行うマネージドサービスです。
Amazon Data Firehoseは、データ変換にあたる処理をユーザーが実装したlambda関数で行うことができます。
Amazon Data Firehose でソースデータを変換する - Amazon Data Firehose
lambda関数のreturnデータ内容に応じて、動的にデータ保存先テーブルを変更できます。
以下のように、metadataセクションを追懐することで保存先テーブルを指定することができます。
{ "recordId": "49655962066601463032522589543535113056108699331451682818000000", "result": "Ok", "data": "1IiwiI6ICJmYWxsIiwgImdgU21IiwiI6ICJmYWxsIiwg==tcHV0ZXIgU2NpZW5jZSIsICJzZW1", "metadata":{ "otfMetadata":{ "destinationTableName":"Device1234", "destinationDatabaseName":"IoTevents", "operation":"insert" } } }
Route incoming records to different Iceberg tables - Amazon Data Firehose
構成
全体像

本記事で作成するシステムの構成図です。
以下の処理を行います。
- (データ取得) Amazon Data FirehoseにAmazon SESのイベントデータを送信
- (データ変換) AWS Lambdaで保存先テーブルに合わせてイベントデータをデータ変換
- (データ保存) AWS Glue Data Catalogに保存された Apache Icebergテーブルにレコードを追加
使用データ
本記事ではAWSが提供するメール配信サービスのAmazon SESのイベントデータを使用します。
Amazon SES が Firehose に発行するイベントデータの例 - Amazon Simple Email Service
3つのメール配信イベントのデータを使用します。
- Send: メール送信
- Open: メール開封
- Click: メールクリック
Sendイベントログ例
{ "eventType": "Send", "mail": { "timestamp": "2016-10-14T05:02:16.645Z", "source": "sender@example.com", "sourceArn": "arn:aws:ses:us-east-1:123456789012:identity/sender@example.com", "sendingAccountId": "123456789012", "messageId": "EXAMPLE7c191be45-e9aedb9a-02f9-4d12-a87d-dd0099a07f8a-000000", "destination": [ "recipient@example.com" ], "headersTruncated": false, "headers": [ { "name": "From", "value": "sender@example.com" }, { "name": "To", "value": "recipient@example.com" }, { "name": "Subject", "value": "Message sent from Amazon SES" }, { "name": "MIME-Version", "value": "1.0" }, { "name": "Content-Type", "value": "multipart/mixed; boundary=\"----=_Part_0_716996660.1476421336341\"" }, { "name": "X-SES-MESSAGE-TAGS", "value": "myCustomTag1=myCustomTagValue1, myCustomTag2=myCustomTagValue2" } ], "commonHeaders": { "from": [ "sender@example.com" ], "to": [ "recipient@example.com" ], "messageId": "EXAMPLE7c191be45-e9aedb9a-02f9-4d12-a87d-dd0099a07f8a-000000", "subject": "Message sent from Amazon SES" }, "tags": { "ses:configuration-set": [ "ConfigSet" ], "ses:source-ip": [ "192.0.2.0" ], "ses:from-domain": [ "example.com" ], "ses:caller-identity": [ "ses_user" ], "myCustomTag1": [ "myCustomTagValue1" ], "myCustomTag2": [ "myCustomTagValue2" ] } }, "send": {} }
それぞれのイベントデータを保存する、send_log, open_log, click_logという名前のApache Icebergテーブルを作成します。
実装
Apache Icebergテーブル作成
本記事では、AWS Glue Data Catalogでテーブル情報を管理して Apache Icebergテーブルを作成します。
以下は、AWS Glue Data Catalogのデータベースと、Apache Icebergテーブルファイルを保存するS3バケットを作成するTerraformコードです。
resource "aws_s3_bucket" "glue_data" { bucket_prefix = "glue-data-" } resource "aws_glue_catalog_database" "email_event" { name = "email_event" location_uri = "s3a://${aws_s3_bucket.glue_data.bucket}/email_event/" }
Amazon AthenaでApache Iceberg形式でテーブルを作成するSQLを実行します。テーブルファイルの場所を、作成したS3バケットを指定します。
以下は send_logを作成するSQLです。カラムはSESのSendイベントが持つ値を部分的に選択しています。
CREATE TABLE email_event.send_log ( event_type string, message_id string, from_address string, to_address string, timestamp timestamp, tags string ) LOCATION 's3://glue-data-20250924084554937700000001/email_event/send_log/' TBLPROPERTIES ( 'table_type' = 'ICEBERG', 'format' = 'parquet', 'write_compression' = 'gzip' );
AWSマネジメントコンソールのAWS Glueのサービスページから、send_log, open_log, click_logが作成されていることを確認します。

データ処理用lambda関数作成
Amazon SESのSend, Open, Clickイベントデータを受け取り、Apache Icebergテーブルのレコードに変換するlambda関数の実装です。
公式ドキュメントに記載されている、Amazon Data Firehoseが取得したデータをlambda関数内で処理するサンプルコードを参考にしています。
Route incoming records to different Iceberg tables - Amazon Data Firehose
イベントデータに応じて、送信先テーブルを変更するようにしています。
import base64 import json def lambda_handler(firehose_records_input, context): print( "Received records for processing from DeliveryStream: " + firehose_records_input["deliveryStreamArn"] ) firehose_records_output = {} firehose_records_output["records"] = [] for firehose_record_input in firehose_records_input["records"]: payload_bytes = base64.b64decode(firehose_record_input["data"]).decode("utf-8") payload_dict = json.loads(payload_bytes) converted_data_dict = {} destination_table_name = None # 共通イベントデータ event_type = payload_dict["eventType"] converted_data_dict["event_type"] = event_type converted_data_dict["message_id"] = payload_dict["mail"]["messageId"] converted_data_dict["tags"] = payload_dict["mail"]["tags"] headers = payload_dict["mail"]["headers"] for header in headers: match header["name"]: case "From": converted_data_dict["from_address"] = header["value"] case "To": converted_data_dict["to_address"] = header["value"] # Sendイベントデータ用処理 if event_type == "Send": converted_data_dict["timestamp"] = payload_dict["mail"]["timestamp"] destination_table_name = "send_log" # Openイベントデータ用処理 if event_type == "Open": converted_data_dict["ip_address"] = payload_dict["open"]["ipAddress"] converted_data_dict["user_agent"] = payload_dict["open"]["userAgent"] converted_data_dict["timestamp"] = payload_dict["open"]["timestamp"] destination_table_name = "open_log" # Clickイベントデータ用処理 if event_type == "Click": converted_data_dict["ip_address"] = payload_dict["click"]["ipAddress"] converted_data_dict["link"] = payload_dict["click"]["link"] converted_data_dict["link_tags"] = payload_dict["click"]["linkTags"] converted_data_dict["user_agent"] = payload_dict["click"]["userAgent"] converted_data_dict["timestamp"] = payload_dict["click"]["timestamp"] destination_table_name = "click_log" converted_data_str = json.dumps(converted_data_dict) print("converted_data_str:", converted_data_str) # イベントごとに送信先テーブルを指定 firehose_record_output = {} firehose_record_output["data"] = base64.b64encode( converted_data_str.encode("utf-8") ) firehose_record_output["recordId"] = firehose_record_input["recordId"] firehose_record_output["result"] = "Ok" firehose_record_output["metadata"] = { "otfMetadata": { "destinationDatabaseName": "email_event", "destinationTableName": destination_table_name, "operation": "insert", } } firehose_records_output["records"].append(firehose_record_output) print(firehose_record_output) return firehose_records_output
データ変換用lambda関数のIAM Roleを作成するTerraformコードです。
lambda関数のログをCloud Watchに書き込むための権限を付与します。
resource "aws_cloudwatch_log_group" "firehose_processor_lambda" { name = "/aws/lambda/firehose-processor" retention_in_days = 365 } resource "aws_iam_role" "firehose_processor_lambda" { name = "firehose_processor_lambda_role" assume_role_policy = jsonencode({ Version = "2012-10-17" Statement = [ { Action = "sts:AssumeRole" Effect = "Allow" Principal = { Service = "lambda.amazonaws.com" } } ] }) } resource "aws_iam_policy" "firehose_processor_lambda" { name = "firehose_processor_lambda_policy" description = "Policy for Firehose processor Lambda role" policy = jsonencode({ Version = "2012-10-17" Statement = [ { Effect = "Allow" Action = [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ] Resource = [ "arn:aws:logs:${local.aws_region}:${local.aws_account_id}:log-group:${aws_cloudwatch_log_group.firehose_processor_lambda.name}:*" ] } ] }) }
lambda関数用スクリプトをzip化した後、lambda関数を作成するTerraformコードです。
data "archive_file" "firehose_processor_lambda" { type = "zip" source_dir = "../lambda/" output_path = "../lambda/firehose_processor.zip" } resource "aws_lambda_function" "firehose_processor_lambda" { function_name = "firehose-processor-handler" role = aws_iam_role.firehose_processor_lambda.arn filename = data.archive_file.firehose_processor_lambda.output_path handler = "firehose_processor.lambda_handler" runtime = "python3.12" timeout = 70 source_code_hash = filebase64sha256(data.archive_file.firehose_processor_lambda.output_path) logging_config { log_group = aws_cloudwatch_log_group.firehose_processor_lambda.name log_format = "Text" } depends_on = [ aws_cloudwatch_log_group.firehose_processor_lambda, data.archive_file.firehose_processor_lambda, ] } resource "aws_lambda_alias" "firehose_processor_lambda_default" { name = "default" description = "Default version." function_name = aws_lambda_function.firehose_processor_lambda.arn function_version = "1" lifecycle { ignore_changes = [function_version] } }
Apache Icebergテーブルへのデータ送信用Amazon Data Firehose作成
Amazon Data Firehoseの IAM Roleを作成するTerraformコードです。
Lambda関数を起動した後、AWS Glue Data Catalogに登録したApache Icebergテーブルに書き込みを行うための権限を付与しています。
resource "aws_cloudwatch_log_group" "firehose_load_iceberg" { name = "/aws/firehose/firehose-load-iceberg" retention_in_days = 365 } resource "aws_cloudwatch_log_stream" "firehose_load_iceberg" { name = "firehose-load-iceberg-log-stream" log_group_name = aws_cloudwatch_log_group.firehose_load_iceberg.name } resource "aws_iam_role" "firehose_load_iceberg" { name = "firehose_load_iceberg_role" assume_role_policy = jsonencode({ Version = "2012-10-17" Statement = [ { Action = "sts:AssumeRole" Effect = "Allow" Principal = { Service = "firehose.amazonaws.com" } } ] }) } resource "aws_iam_policy" "firehose_load_iceberg" { name = "firehose_load_iceberg_policy" description = "Policy for Firehose load iceberg role" policy = jsonencode({ Version = "2012-10-17" Statement = [ { Effect = "Allow" Action = [ "s3:AbortMultipartUpload", "s3:GetBucketLocation", "s3:GetObject", "s3:ListBucket", "s3:ListBucketMultipartUploads", "s3:PutObject", "s3:DeleteObject" ] Resource = [ "arn:aws:s3:::*", "arn:aws:s3:::*/*" ] }, { Effect = "Allow" Action = [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ] Resource = [ "arn:aws:logs:${local.aws_region}:${local.aws_account_id}:log-group:${aws_cloudwatch_log_group.firehose_load_iceberg.name}:*", aws_cloudwatch_log_stream.firehose_load_iceberg.arn ] }, { Effect = "Allow" Action = [ "glue:GetTable", "glue:GetDatabase", "glue:UpdateTable" ] Resource = [ "arn:aws:glue:*:*:catalog", "arn:aws:glue:*:*:database/*", "arn:aws:glue:*:*:table/*/*" ] }, { Effect = "Allow" Action = [ "kinesis:DescribeStream", "kinesis:GetShardIterator", "kinesis:GetRecords", "kinesis:ListShards" ] Resource = [ "arn:aws:kinesis:*:*:stream/*" ] }, { Effect = "Allow" Action = [ "lambda:InvokeFunction", "lambda:GetFunctionConfiguration" ] Resource = [ "arn:aws:lambda:*:*:function:*:*" ] } ] }) } resource "aws_iam_role_policy_attachment" "firehose_load_iceberg" { role = aws_iam_role.firehose_load_iceberg.name policy_arn = aws_iam_policy.firehose_load_iceberg.arn }
エラーデータ保存用のS3バケットと、Amazon Data Firehoseを作成するTerraformコードです。
aws_kinesis_firehose_delivery_stream リソースでAmazon Data Firehoseを作成します。
Apache Icebergテーブルをデータ送信先とするため、iceberg_configurationの設定を行います。また、Amazon Data Firehoseで実行するlambda関数を紐づけています。
resource "aws_s3_bucket" "firehose_load_iceberg" { bucket_prefix = "firehose-load-iceberg-" } resource "aws_kinesis_firehose_delivery_stream" "firehose_load_iceberg" { name = "firehose-load-iceberg" destination = "iceberg" iceberg_configuration { role_arn = aws_iam_role.firehose_load_iceberg.arn catalog_arn = "arn:aws:glue:${local.aws_region}:${local.aws_account_id}:catalog" buffering_size = 5 buffering_interval = 10 s3_configuration { role_arn = aws_iam_role.firehose_load_iceberg.arn bucket_arn = aws_s3_bucket.firehose_load_iceberg.arn } processing_configuration { enabled = "true" processors { type = "Lambda" parameters { parameter_name = "LambdaArn" parameter_value = "${aws_lambda_function.firehose_processor_lambda.arn}:${aws_lambda_alias.firehose_processor_lambda_default.name}" } parameters { parameter_name = "BufferIntervalInSeconds" parameter_value = 10 } } } cloudwatch_logging_options { enabled = true log_group_name = aws_cloudwatch_log_group.firehose_load_iceberg.name log_stream_name = aws_cloudwatch_log_stream.firehose_load_iceberg.name } } }
動作確認
Amazon Data Firehoseへデータ送信
JSONファイルとして保存したAmazon SESのイベントデータを読み込み、Amazon Data Firehoseにデータを送信するPyhtonスクリプトです。
import json import boto3 DELIVERY_STREAM_NAME = "firehose-load-iceberg" JSON_FILE_PATH = "event_data/open.json" def main(): with open(JSON_FILE_PATH, "r", encoding="utf-8") as file: data = json.load(file) firehose_client = boto3.client("firehose", region_name="ap-northeast-1") response = firehose_client.put_record( DeliveryStreamName=DELIVERY_STREAM_NAME, Record={"Data": json.dumps(data, ensure_ascii=False).encode("utf-8")}, ) print(f"send data to firehose success: RecordId={response['RecordId']}") if __name__ == "__main__": main()
Amazon Athenaから、Apache Icebergテーブルにデータが追加されていることを確認できます。

エラーデータを保存するS3バケットに、processing-failedが自動的に作成され、以下のようなファイルが保存されます。

エラーデータのファイルには以下のような取得データと、エラーメッセージが含まれます。
{"rawData":"eyJldmVudFR5cGUiOiAiU2VuZCIsICJtYWlsIjogeyJ0aW1lc3RhbXAiOiAiMjAxNi0xMC0xNFQwNTowMjoxNi42NDVaIiwgInNvdXJjZSI6ICJzZW5kZXJAZXhhbXBsZS5jb20iLCAic291cmNlQXJuIjogImFybjphd3M6c2VzOnVzLWVhc3QtMToxMjM0NTY3ODkwMTI6aWRlbnRpdHkvc2VuZGVyQGV4YW1wbGUuY29tIiwgInNlbmRpbmdBY2NvdW50SWQiOiAiMTIzNDU2Nzg5MDEyIiwgIm1lc3NhZ2VJZCI6ICJFWEFNUExFN2MxOTFiZTQ1LWU5YWVkYjlhLTAyZjktNGQxMi1hODdkLWRkMDA5OWEwN2Y4YS0wMDAwMDAiLCAiZGVzdGluYXRpb24iOiBbInJlY2lwaWVudEBleGFtcGxlLmNvbSJdLCAiaGVhZGVyc1RydW5jYXRlZCI6IGZhbHNlLCAiaGVhZGVycyI6IFt7Im5hbWUiOiAiRnJvbSIsICJ2YWx1ZSI6ICJzZW5kZXJAZXhhbXBsZS5jb20ifSwgeyJuYW1lIjogIlRvIiwgInZhbHVlIjogInJlY2lwaWVudEBleGFtcGxlLmNvbSJ9LCB7Im5hbWUiOiAiU3ViamVjdCIsICJ2YWx1ZSI6ICJNZXNzYWdlIHNlbnQgZnJvbSBBbWF6b24gU0VTIn0sIHsibmFtZSI6ICJNSU1FLVZlcnNpb24iLCAidmFsdWUiOiAiMS4wIn0sIHsibmFtZSI6ICJDb250ZW50LVR5cGUiLCAidmFsdWUiOiAibXVsdGlwYXJ0L21peGVkOyAgYm91bmRhcnk9XCItLS0tPV9QYXJ0XzBfNzE2OTk2NjYwLjE0NzY0MjEzMzYzNDFcIiJ9LCB7Im5hbWUiOiAiWC1TRVMtTUVTU0FHRS1UQUdTIiwgInZhbHVlIjogIm15Q3VzdG9tVGFnMT1teUN1c3RvbVRhZ1ZhbHVlMSwgbXlDdXN0b21UYWcyPW15Q3VzdG9tVGFnVmFsdWUyIn1dLCAiY29tbW9uSGVhZGVycyI6IHsiZnJvbSI6IFsic2VuZGVyQGV4YW1wbGUuY29tIl0sICJ0byI6IFsicmVjaXBpZW50QGV4YW1wbGUuY29tIl0sICJtZXNzYWdlSWQiOiAiRVhBTVBMRTdjMTkxYmU0NS1lOWFlZGI5YS0wMmY5LTRkMTItYTg3ZC1kZDAwOTlhMDdmOGEtMDAwMDAwIiwgInN1YmplY3QiOiAiTWVzc2FnZSBzZW50IGZyb20gQW1hem9uIFNFUyJ9LCAidGFncyI6IHsic2VzOmNvbmZpZ3VyYXRpb24tc2V0IjogWyJDb25maWdTZXQiXSwgInNlczpzb3VyY2UtaXAiOiBbIjE5Mi4wLjIuMCJdLCAic2VzOmZyb20tZG9tYWluIjogWyJleGFtcGxlLmNvbSJdLCAic2VzOmNhbGxlci1pZGVudGl0eSI6IFsic2VzX3VzZXIiXSwgIm15Q3VzdG9tVGFnMSI6IFsibXlDdXN0b21UYWdWYWx1ZTEiXSwgIm15Q3VzdG9tVGFnMiI6IFsibXlDdXN0b21UYWdWYWx1ZTIiXX19LCAic2VuZCI6IHt9fQo=","errorCode":"Lambda.InvokeAccessDenied","errorMessage":"Access was denied. Ensure that the access policy allows access to the Lambda function.","attemptsMade":4,"arrivalTimestamp":1758706488335,"attemptEndingTimestamp":1758706512945,"lambdaARN":"arn:aws:lambda:ap-northeast-1:559802578070:function:firehose-processor-handler"}