肉球でキーボード

MLエンジニアの技術ブログです

特徴量ストアを拡張した特徴量プラットフォームについて調べてみた

この記事はMLOps(LLMOps、生成AIOps、AI AgentOps) Advent Calendar 2025にリンクしてます。

本記事のまとめ

  • 特徴量ストアを拡張した特徴量プラットフォームという概念が登場
  • 特徴量ストアは特徴量プラットフォームの機能の一部という位置付け
  • 特徴量プラットフォームは5つの要素で構成される
    • 特徴量ロジックSQL、PySpark、Pythonなどによる特徴量計算ロジックの記述
    • 特徴量レポジトリ:Gitなどのレポジトリで特徴量定義をファイルとして管理
    • 特徴量エンジン:バッチ・ストリーミング・リアルタイムデータの処理
    • 特徴量ストア:オフライン・オンラインで一貫した特徴量を保存・提供
    • 特徴量管理:エンドツーエンドの特徴量パイプラインの発見、使用、監視、ガバナンス
  • 特徴量プラットフォームにおける特徴量ストアの目的は2つ
    • 特徴量の保存:オフライン学習環境とオンライン推論環境で一貫性のある特徴量を保存
    • 特徴量の提供:オフライン学習環境とオンライン推論環境で一貫性のある特徴量を提供

特徴量ストア

特徴量ストアの責務が曖昧になっている

特徴量ストアは2017年のUberのブログ記事で初めて登場して以降、MLOps実践のための機能の一つとして注目され、特徴量ストアという名前がつく様々なサービスが登場しています。

Feature Store For ML

様々な特徴量ストアのサービスが、モニタリング機能やストリーミング処理機能などをサポートし、幅広いユーザー課題に応えてくれる存在になっています。

一方で各サービスの「特徴量ストア」が持つ機能が増え、特徴量ストアの責務が曖昧になっていると感じます。

特徴量ストア登場時の機能と、現在の機能を整理します。

2017年の登場時の機能

「特徴量ストア」という言葉は2017年に公開された、Uberが利用している機械学習プラットフォームのMichelangeloについてのブログ記事で初めて登場しました。

Meet Michelangelo: Uber's Machine Learning Platform | Uber Blog

このブログ記事内では、特徴量ストアを、「高品質にキュレーション※された特徴量のセットを共有、発見、使用できるようにするもの」と説明しています。

※キュレーション: 情報を収集・選択し、取りまとめて共有すること

特徴量ストアの導入背景にある課題として、Uber社内の様々な機械学習モデルの開発過程で、全く同じか似たような特徴量を使用していたことを指摘しています。このような状況から、Uberではプロジェクト間・チーム間で特徴量を共有できる仕組みを提供することで、大きな価値を生まれると述べています。

In addition, we added a layer of data management, a feature store that allows teams to share, discover, and use a highly curated set of features for their machine learning problems. We found that many modeling problems at Uber use identical or similar features, and there is substantial value in enabling teams to share features between their own projects and for teams in different organizations to share features with each other.

特徴量ストアを導入したことで、以下2つの成果があったと述べています。

  • 個人やプロジェクト固有の用途で必要な情報に加え、わずかな追加メタデータを付与するだけで特徴量を簡単に特徴量ストアに追加できる
  • モデル設定内の特徴量名を参照するだけで、特徴量ストアの特徴量をオンライン・オフラインで簡単に利用できる

下図は、Michelangeloを利用した機械学習プロジェクトのワークフロー全体の中での特徴量ストアの役割を表した図です。

オンライン・オフラインの特徴量ストアが存在します。オンラインではリアルタイム推論サービスへの特徴量の提供、オフラインではモデル学習やバッチ推論への特徴量の提供を行っています。

feature store in whole ML workflow

このように、特徴量ストアは、オンライン・オフラインでの特徴量の保存と提供によって、Uber特徴量管理の課題を解決する存在でした。

現在の「特徴量ストア」サービスの機能

AWSでは、機械学習マネージドサービスである Amazon SageMaker の機能の一部として、Amazon SageMaker Feature Store が提供されています。

特徴量ストアを使用して特徴量を作成、保存、共有する - Amazon SageMaker AI

Amazon SageMaker Feature Storeでは以下の機能が含まれます。

  • 特徴量グループ
    • 特徴量スキーマ管理
    • 特徴量メタデータ管理
    • 特徴量の検索
    • アカウントごとのアクセス制御
    • AWS KMSと連携したデータ暗号化
    • Cloud Trailと連携した特徴量ストアの操作ログ記録
  • オンラインストア
  • オフラインストア
    • Data Wranglerと連携した、特徴量のクエリ、探索、可視化
  • AWSサービスと連携した、ストリーミング・バッチデータ取り込み
  • 特徴量計算
    • 特徴量生成パイプラインの作成
  • 耐障害性
    • 複数AZに配置

オフライン・オンラインでの特徴量の保存と提供に加え、特徴量処理や、アクセス制御、データ取り込み機能などをAmazon SageMaker Feature Storeではサポートしています。

このように、2017年の登場時の特徴量ストアの機能と比較して、現在の特徴量ストアサービスの機能は拡大しています。

特徴量プラットフォーム

2023年にChip Huyen氏が公開したブログ記事「Self-serve feature platforms: architectures and APIs」で特徴量プラットフォームの概念が紹介されました。

Self-serve feature platforms: architectures and APIs

従来の特徴量ストアが特徴量の保存と提供を責務とする一方、特徴量プラットフォームは特徴量計算も責務に含めています。

特徴量プラットフォームの概念が出た背景として、機械学習の主なユースケースがバッチ推論からオンライン推論に移行したことを指摘しています。オンライン推論のための特徴量処理はレイテンシの問題があります。そのため、特徴量処理に最適化されたツールをモデル開発者に提供することは、大規模なオンライン推論にスケールさせるために必要であると述べています。

以下の図は特徴量プラットフォームを提供するTecton※のブログ記事から引用した、特徴量プラットフォームを説明する図です。

※ 2025年8月にDatabricksがTectonを買収したため、現在はDatabricksの機能の一部という表現が正確かもしれません。

What Is a Feature Platform for Machine Learning? | Databricks

Feature Platform

特徴量プラットフォームは5つの要素で構成されます。

  • 特徴量ロジックSQL、PySpark、Pythonなどによる特徴量計算ロジックの記述
  • 特徴量レポジトリ:Gitなどのレポジトリで特徴量定義をファイルとして管理
  • 特徴量エンジン:バッチ・ストリーミング・リアルタイムデータの処理
  • 特徴量ストア:オフライン・オンラインで一貫した特徴量を保存・提供
  • 特徴量管理:エンドツーエンドの特徴量パイプラインの発見、使用、監視、ガバナンス

特徴量プラットフォームと特徴量ストアの関係

特徴量プラットフォームでは特徴量ストアは機能の一部という位置付けです。Tectonのブログ記事では、特徴量プラットフォームにおける特徴量ストアの目的は2つと述べられています。

  • 特徴量の保存:オフライン学習環境とオンライン推論環境で一貫性のある特徴量を保存
  • 特徴量の提供:オフライン学習環境とオンライン推論環境で一貫性のある特徴量を提供

現在の「特徴量ストア」の名前がつくサービスは、2017年登場時の特徴量ストアの概念を拡大させた、特徴量プラットフォームの機能をサポートしているものが多いです。

特徴量ストアと特徴量プラットフォームの背景にある課題感が異なるため、自分たちが抱える課題がどちらのツールを必要とするものか区別する必要があります。

余談: 特徴量ストア(プラットフォーム)は本当に必要か

自分たちの課題に対して、世の中の特徴量ストア(プラットフォーム)が必ずしもマッチしないことについて言及したブログ記事が面白かったので紹介します。

Building a feature store - nlathia.github.io

特徴量プラットフォームでは特徴量計算に関わる機能を一元管理します。特徴量プラットフォームを支持する人は2つの利点を強調すると述べています。

  • 「同じものを2度構築する」無駄をなくせる
  • 異なる問題領域間での再利用可能な特徴量の発見

この特徴量プラットフォーム支持者の考えに対して、懐疑的な意見を述べています。

「コードを一度だけ書く」は全てのユースケースに対して機能するわけではない

the “only code things once” pitch doesn’t actually work for all use cases.

データの一元管理を進めると、次第にそのデータが何を意味し、どのように使用すべきかという文脈が失われていく傾向がある。数千もの特徴量が存在するようになると、それらを一覧化した中央リストの管理は困難を極める。

At some point, centralising all of the data would come hand-in-hand with a loss of context about what that data means, and how it should be used. Once you have 1000s of features, a central list of them becomes difficult to navigate.

また、特徴量プラットフォームのサービスページを見ると、特徴量プラットフォームが解決できる5 ~ 10個の課題が列挙されていて、情報量に圧倒されると指摘しています。

ブログ記事内のチームでは、特徴量プラットフォームが既存システムの代替になったり、小規模なチームが巨大なシステムを管理する状態を避け、特徴量プラットフォームが解決する問題領域を絞り込んでいます。

I didn’t want the feature store to become a replacement for things that already existed, or a behemoth that is owned by my (small) team. Instead, I learned about what we needed by looking for patterns in the machine learning models that we were shipping.

まとめ

  • 特徴量ストアを拡張した特徴量プラットフォームという概念が登場
  • 特徴量ストアは特徴量プラットフォームの機能の一部という位置付け
  • 特徴量プラットフォームは5つの要素で構成される
    • 特徴量ロジックSQL、PySpark、Pythonなどによる特徴量計算ロジックの記述
    • 特徴量レポジトリ:Gitなどのレポジトリで特徴量定義をファイルとして管理
    • 特徴量エンジン:バッチ・ストリーミング・リアルタイムデータの処理
    • 特徴量ストア:オフライン・オンラインで一貫した特徴量を保存・提供
    • 特徴量管理:エンドツーエンドの特徴量パイプラインの発見、使用、監視、ガバナンス
  • 特徴量プラットフォームにおける特徴量ストアの目的は2つ
    • 特徴量の保存:オフライン学習環境とオンライン推論環境で一貫性のある特徴量を保存
    • 特徴量の提供:オフライン学習環境とオンライン推論環境で一貫性のある特徴量を提供

『実践MLOps 作って理解する機械学習システムの構築と運用』という本を書きました


2025年10月18日に刊行される『実践MLOps 作って理解する機械学習システムの構築と運用』というMLOpsに関する本を執筆しました。

本の内容の紹介をさせていただきます。

www.ohmsha.co.jp

実践MLOps 作って理解する機械学習システムの構築と運用

執筆経緯

MLOpsを実践する場合、多くの問題はソフトウェアエンジニアリングに関する問題だと気づきます。

多くの企業では、MLOpsは機械学習エンジニアやデータサイエンティストが担当する場合が多いです。彼らは役割の性質上ソフトウェアエンジニアリング業務に関わる機会が限られるため、MLOpsの概念を実践するハードルが高いと言わざるを得ません。

2025年現在、MLOpsを体系的に説明した書籍が出版されるようになりました。しかし、MLOpsの概念的な内容の整理に留まり、具体的な実践方法まで踏み込んでいる書籍はまだまだ少ない印象です。

このようなMLOpsの概念的な理解と、実際にMLOpsを実践する際に求められるソフトウェアエンジニアリングスキルとのギャップを埋められるよう、本書を執筆しました。

どんな本か

本書では、広告クリック率予測を行う機械学習システムを、AWSで構築・運用する一連の流れを通して、MLOpsを実践的に学ぶ一冊です。
本書の特徴は機械学習システムの運用にまでカバーしている点です。
本書は準備・開発・運用の3つのカテゴリの目次構成となっています。準備・開発のカテゴリでは、一般的なMLOpsの説明を行った後、機械学習パイプラインの実装を行います。運用のカテゴリではバージョン管理、CI・CD、推論サービス、継続的学習、監視のトピックをカバーします。

目次

本書ではMLOpsを「DevOpsを機械学習システムに適用した手法」と位置付けています。そのため、TerraformによるIaC (Infrastructure as Code)や、GitHub ActionsによるCI・CDのような、一般的なソフトウェア開発で扱う技術トピックが多く含まれます。

また、本書ではMLOpsを機械学習の専門家のみが扱う領域とせず、一般的なソフトウェア開発に準拠した技術選定の方針をとっています。そのため、本書では機械学習のマネージドサービスを使わずに、従来のソフトウェア開発で一般的に使用されるAWSサービスを使用します。例えば、推論サービスにはECS (Elastic Cotainer Service)を使用し、特徴量ストアやモデルレジストリのようなMLOps特有の要素をDynamoDBを使用して構築します。

本書はソフトウェア開発の初学者向けの本ではありません。本書は264ページという限られたページ数の中で、機械学習システムの開発から運用までを行うため、機械学習AWSの基礎的な説明は省略しています。また、MLOpsの要素として一般的に含まれる組織作りやチーム体制などの非技術的な側面は扱いません。

より広くMLOpsを学びたい方は、『事例でわかるMLOps』や『機械学習システムデザイン』が参考になると思います。本書はこれらの本に書かれている内容の、一つの例を実践するような立ち位置となります。

本書で利用するコードをGitHubリポジトリで公開しています。 github.com

本の内容イメージ

本書は、筆者が前職で講師を行なったMLOps研修の経験が生かされています。
そのため、本書の内容はこれらの資料をより深く掘り下げたものとイメージしてもらえると分かりやすいです。

speakerdeck.com

speakerdeck.com

ALBとNLBの違いを実際に作って理解する

本記事のコードです。
github.com

ALB, NLBの説明の具体的なイメージがわかない

AWSロードバランサーを作成する場合、ALB (Application Load Balancer)かNLB (Network Load Balancer)のどちらかを選択します。
Gateway Load BalancerやClassic Load Balancerも提供されていますが、ユースーケースが限定的なため本記事では扱いません。

以下はAWSマネジメントコンソールのロードバランサー作成画面での、各サービスの説明文です。

loadbalancer

ALBとNLBの違いについて説明した文章を読むと、ALBはアプリケーション層(7層. HTTP/HTTPS)、NLBはトランスポート層(4層. TCP/UDP)でトラフィック分割を行うという表現を見ることが多いです。
特徴 - Elastic Load Balancing | AWS

Feature of ELB

自分はこの説明を見るたびに、具体的な動作の違いをイメージできておらず、納得感を得られてなかったです。

以下はAWSマネジメントコンソールの、ALBの説明文です。

HTTP および HTTPS トラフィックを使用するウェブアプリケーション用に柔軟性の高い機能セットが必要な場合は、Application Load Balancer を選択します

自分はこの文章を読んだ時

  • HTTP/HTTPSトラフィックを使用しないNLBと挙動がどう変わる?
  • 「柔軟性が高い」とはどういう状況?

という疑問が起きました。

本記事では、ALBとNLBを実際に作成して、その動作の違いを確認し、ALB・NLBの説明の具体的なイメージを掴もうと思います。

本記事で作成する構成

Architecture

ALB・NLBのトラフィックをECS Serviceに割り当てます。
ALBとNLBをパブリックサブネットに起動し、インターネット経由でローカルPCからリクエストを送信します。

AWSリソースのデプロイ

ネットワーク

以下は、本記事のシステムで使用するネットワークを作成するTerraformです。
ALBは2つ以上のサブネットに作成する必要があるため、2つのAZにパブリックサブネットを作成します。
Application Load Balancer - エラスティックロードバランシング

静的IPをNLBに付与するために、Elastic IPを用意しています。

# VPC
resource "aws_vpc" "alb_nlb" {
  cidr_block           = "10.0.0.0/16"
  enable_dns_hostnames = true
  enable_dns_support   = true
}

# Internet Gateway
resource "aws_internet_gateway" "alb_nlb" {
  vpc_id = aws_vpc.alb_nlb.id
}

data "aws_availability_zones" "available" {
  state = "available"
}

# Public Subnets
resource "aws_subnet" "public_1" {
  vpc_id                  = aws_vpc.alb_nlb.id
  cidr_block              = "10.0.1.0/24"
  availability_zone       = data.aws_availability_zones.available.names[0]
  map_public_ip_on_launch = true

}

resource "aws_subnet" "public_2" {
  vpc_id                  = aws_vpc.alb_nlb.id
  cidr_block              = "10.0.2.0/24"
  availability_zone       = data.aws_availability_zones.available.names[1]
  map_public_ip_on_launch = true

}

resource "aws_route_table" "public" {
  vpc_id = aws_vpc.alb_nlb.id

  route {
    cidr_block = "0.0.0.0/0"
    gateway_id = aws_internet_gateway.alb_nlb.id
  }
}

resource "aws_route_table_association" "public_1" {
  subnet_id      = aws_subnet.public_1.id
  route_table_id = aws_route_table.public.id
}

resource "aws_route_table_association" "public_2" {
  subnet_id      = aws_subnet.public_2.id
  route_table_id = aws_route_table.public.id
}

# Elastic IP
resource "aws_eip" "nlb_1" {
  domain = "vpc"
}

resource "aws_eip" "nlb_2" {
  domain = "vpc"
}

ALB, NLB, ECS Serviceのセキュリティグループを作成するTerraformコードです。
2023年以前はALBのみしかセキュリティグループを設定できませんが、現在はNLBにもセキュリティグループを設定できるようです。
Network Load Balancer のセキュリティグループを更新する - エラスティックロードバランシング

# Security Group
resource "aws_security_group" "alb" {
  name        = "alb-sg"
  description = "Security group for ALB"
  vpc_id      = aws_vpc.alb_nlb.id

  ingress {
    from_port   = 80
    to_port     = 80
    protocol    = "tcp"
    cidr_blocks = ["0.0.0.0/0"]
  }

  egress {
    from_port   = 0
    to_port     = 0
    protocol    = "-1"
    cidr_blocks = ["0.0.0.0/0"]
  }
}

resource "aws_security_group" "nlb" {
  name        = "nlb-sg"
  description = "Security group for NLB"
  vpc_id      = aws_vpc.alb_nlb.id

  ingress {
    from_port   = 80
    to_port     = 80
    protocol    = "tcp"
    cidr_blocks = ["0.0.0.0/0"]
  }

  egress {
    from_port   = 0
    to_port     = 0
    protocol    = "-1"
    cidr_blocks = ["0.0.0.0/0"]
  }
}

resource "aws_security_group" "alb_ecs" {
  name        = "alb-ecs-sg"
  description = "Security group for ECS tasks behind ALB"
  vpc_id      = aws_vpc.alb_nlb.id

  ingress {
    from_port       = 8080
    to_port         = 8080
    protocol        = "tcp"
    security_groups = [aws_security_group.alb.id]
  }

  egress {
    from_port   = 0
    to_port     = 0
    protocol    = "-1"
    cidr_blocks = ["0.0.0.0/0"]
  }
}

resource "aws_security_group" "nlb_ecs" {
  name        = "nlb-ecs-sg"
  description = "Security group for ECS tasks behind NLB"
  vpc_id      = aws_vpc.alb_nlb.id

  ingress {
    from_port       = 8080
    to_port         = 8080
    protocol        = "tcp"
    security_groups = [aws_security_group.nlb.id]
  }

  egress {
    from_port   = 0
    to_port     = 0
    protocol    = "-1"
    cidr_blocks = ["0.0.0.0/0"]
  }
}

ECSリソース

ECS Serviceを起動するための、ECSクラスター、ECS Task実行ロール、CloudWatchグループを作成する Terraformです。

resource "aws_ecs_cluster" "alb_nlb" {
  name = "alb-nlb-cluster"

  setting {
    name  = "containerInsights"
    value = "enabled"
  }
}

resource "aws_iam_role" "ecs_task_execution" {
  name = "ecs-task-execution"

  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Action = "sts:AssumeRole"
        Effect = "Allow"
        Principal = {
          Service = "ecs-tasks.amazonaws.com"
        }
      }
    ]
  })
}

resource "aws_iam_role_policy_attachment" "ecs_task_execution" {
  role       = aws_iam_role.ecs_task_execution.name
  policy_arn = "arn:aws:iam::aws:policy/service-role/AmazonECSTaskExecutionRolePolicy"
}

resource "aws_cloudwatch_log_group" "ecs" {
  name              = "/ecs/alb-nlb"
  retention_in_days = 3
}

ALB

ALBがトラフィックを割り当てるECSサービスを作成する Terraformコードです。
Hashicorpが提供するhashicorp/http-echo イメージを使用し、簡易的なHTTPレスポンスを返すアプリケーションを作成します。
GitHub - hashicorp/http-echo: A tiny go web server that echos what you start it with!
ALBのパスルーティングの挙動を確認するために、 alb-appalb-api という2つのECSサービスを作成しています。

resource "aws_ecs_task_definition" "alb_app" {
  family                   = "alb-app"
  network_mode             = "awsvpc"
  requires_compatibilities = ["FARGATE"]
  cpu                      = "256"
  memory                   = "512"
  execution_role_arn       = aws_iam_role.ecs_task_execution.arn

  container_definitions = jsonencode([
    {
      name  = "app"
      image = "hashicorp/http-echo:latest"

      portMappings = [
        {
          containerPort = 8080
          protocol      = "tcp"
        }
      ]

      command = [
        "-text=ALB-APP-SERVICE: Successful response from ALB",
        "-listen=:8080"
      ]

      logConfiguration = {
        logDriver = "awslogs"
        options = {
          "awslogs-group"         = aws_cloudwatch_log_group.ecs.name
          "awslogs-region"        = local.aws_region
          "awslogs-stream-prefix" = "alb-app"
        }
      }
    }
  ])
}

resource "aws_ecs_task_definition" "alb_api" {
  family                   = "alb-api"
  network_mode             = "awsvpc"
  requires_compatibilities = ["FARGATE"]
  cpu                      = "256"
  memory                   = "512"
  execution_role_arn       = aws_iam_role.ecs_task_execution.arn

  container_definitions = jsonencode([
    {
      name  = "api"
      image = "hashicorp/http-echo:latest"

      portMappings = [
        {
          containerPort = 8080
          protocol      = "tcp"
        }
      ]

      command = [
        "-text=ALB-API-SERVICE: Successful response from ALB",
        "-listen=:8080"
      ]

      logConfiguration = {
        logDriver = "awslogs"
        options = {
          "awslogs-group"         = aws_cloudwatch_log_group.ecs.name
          "awslogs-region"        = local.aws_region
          "awslogs-stream-prefix" = "alb-api"
        }
      }
    }
  ])
}

resource "aws_ecs_service" "alb_app" {
  name            = "alb-app"
  cluster         = aws_ecs_cluster.alb_nlb.id
  task_definition = aws_ecs_task_definition.alb_app.arn
  desired_count   = 2
  launch_type     = "FARGATE"

  network_configuration {
    subnets          = [aws_subnet.public_1.id, aws_subnet.public_2.id]
    security_groups  = [aws_security_group.alb_ecs.id]
    assign_public_ip = true
  }

  load_balancer {
    target_group_arn = aws_lb_target_group.alb_app.arn
    container_name   = "app"
    container_port   = 8080
  }

  depends_on = [aws_lb_listener.alb_http]
}

resource "aws_ecs_service" "alb_api" {
  name            = "alb-api"
  cluster         = aws_ecs_cluster.alb_nlb.id
  task_definition = aws_ecs_task_definition.alb_api.arn
  desired_count   = 2
  launch_type     = "FARGATE"

  network_configuration {
    subnets          = [aws_subnet.public_1.id, aws_subnet.public_2.id]
    security_groups  = [aws_security_group.alb_ecs.id]
    assign_public_ip = true
  }

  load_balancer {
    target_group_arn = aws_lb_target_group.alb_api.arn
    container_name   = "api"
    container_port   = 8080
  }

  depends_on = [aws_lb_listener.alb_http]
}

以下はALBを作成するTerraformコードです。
リスナールールにパスパターンを設定しています。デフォルトのトラフィック送信先alb-app-tg ターゲットグループですが、/apiパスへのリクエストの場合、 alb-api-tg ターゲットグループとします。
また、リダイレクトと固定レスポンスの設定を行なっています。

resource "aws_lb" "alb" {
  name               = "alb-app"
  internal           = false
  load_balancer_type = "application"
  security_groups    = [aws_security_group.alb.id]
  subnets            = [aws_subnet.public_1.id, aws_subnet.public_2.id]

  enable_deletion_protection = false
  enable_http2               = true

}

resource "aws_lb_target_group" "alb_app" {
  name        = "alb-app-tg"
  port        = 8080
  protocol    = "HTTP"
  vpc_id      = aws_vpc.alb_nlb.id
  target_type = "ip"

  health_check {
    enabled             = true
    healthy_threshold   = 2
    unhealthy_threshold = 2
    timeout             = 5
    interval            = 30
    path                = "/"
  }
}

resource "aws_lb_target_group" "alb_api" {
  name        = "alb-api-tg"
  port        = 8080
  protocol    = "HTTP"
  vpc_id      = aws_vpc.alb_nlb.id
  target_type = "ip"

  health_check {
    enabled             = true
    healthy_threshold   = 2
    unhealthy_threshold = 2
    timeout             = 5
    interval            = 30
    path                = "/"
  }
}

resource "aws_lb_listener" "alb_http" {
  load_balancer_arn = aws_lb.alb.arn
  port              = "80"
  protocol          = "HTTP"

  default_action {
    type             = "forward"
    target_group_arn = aws_lb_target_group.alb_app.arn
  }
}

resource "aws_lb_listener_rule" "alb_api_path" {
  listener_arn = aws_lb_listener.alb_http.arn
  priority     = 100

  action {
    type             = "forward"
    target_group_arn = aws_lb_target_group.alb_api.arn
  }

  condition {
    path_pattern {
      values = ["/api/*"]
    }
  }
}

resource "aws_lb_listener_rule" "redirect_api" {
  listener_arn = aws_lb_listener.alb_http.arn
  priority     = 90

  action {
    type = "redirect"
    redirect {
      path        = "/api"
      status_code = "HTTP_301"
    }
  }

  condition {
    path_pattern {
      values = ["/redirect/*", "/redirect"]
    }
  }
}

resource "aws_lb_listener_rule" "forbidden" {
  listener_arn = aws_lb_listener.alb_http.arn
  priority     = 10

  action {
    type = "fixed-response"
    fixed_response {
      content_type = "text/plain"
      message_body = "Access Denied: This resource is restricted"
      status_code  = "403"
    }
  }

  condition {
    path_pattern {
      values = ["/admin", "/admin/*"]
    }
  }
}

NLB

NLBがトラフィックを割り当てるECSサービスを作成する Terraformコードです。
istio/tcp-echo-server Dockerイメージを使用し、TCPトラフィックを処理するアプリケーションを起動します。
istio/samples/tcp-echo at release-1.1 · istio/istio · GitHub

resource "aws_ecs_service" "nlb_app" {
  name            = "nlb-app"
  cluster         = aws_ecs_cluster.alb_nlb.id
  task_definition = aws_ecs_task_definition.nlb_app.arn
  desired_count   = 2
  launch_type     = "FARGATE"

  network_configuration {
    subnets          = [aws_subnet.public_1.id, aws_subnet.public_2.id]
    security_groups  = [aws_security_group.nlb_ecs.id]
    assign_public_ip = true
  }

  load_balancer {
    target_group_arn = aws_lb_target_group.nlb_tcp.arn
    container_name   = "nlb-app"
    container_port   = 8080
  }

  depends_on = [aws_lb_listener.nlb_tcp]
}

resource "aws_ecs_task_definition" "nlb_app" {
  family                   = "nlb-app"
  network_mode             = "awsvpc"
  requires_compatibilities = ["FARGATE"]
  cpu                      = "256"
  memory                   = "512"
  execution_role_arn       = aws_iam_role.ecs_task_execution.arn

  container_definitions = jsonencode([
    {
      name  = "nlb-app"
      image = "istio/tcp-echo-server:latest"

      portMappings = [
        {
          containerPort = 8080
          protocol      = "tcp"
        }
      ]

      command = ["8080", "NLB-APP-SERVICE: Successful response from NLB"]

      logConfiguration = {
        logDriver = "awslogs"
        options = {
          "awslogs-group"         = aws_cloudwatch_log_group.ecs.name
          "awslogs-region"        = local.aws_region
          "awslogs-stream-prefix" = "nlb-app"
        }
      }
    }
  ])
}

以下はNLBを作成するTerraformコードです。
静的IPを設定しています。

resource "aws_lb" "nlb" {
  name               = "nlb"
  internal           = false
  load_balancer_type = "network"
  security_groups    = [aws_security_group.nlb.id]

  subnet_mapping {
    subnet_id     = aws_subnet.public_1.id
    allocation_id = aws_eip.nlb_1.id
  }

  subnet_mapping {
    subnet_id     = aws_subnet.public_2.id
    allocation_id = aws_eip.nlb_2.id
  }

  enable_deletion_protection       = false
  enable_cross_zone_load_balancing = false
}

resource "aws_lb_target_group" "nlb_tcp" {
  name        = "nlb-tcp-tg"
  port        = 8080
  protocol    = "TCP"
  vpc_id      = aws_vpc.alb_nlb.id
  target_type = "ip"

  health_check {
    enabled             = true
    healthy_threshold   = 2
    unhealthy_threshold = 2
    protocol            = "TCP"
  }

  preserve_client_ip = true
}

resource "aws_lb_listener" "nlb_tcp" {
  load_balancer_arn = aws_lb.nlb.arn
  port              = "80"
  protocol          = "TCP"

  default_action {
    type             = "forward"
    target_group_arn = aws_lb_target_group.nlb_tcp.arn
  }
}

検証

レイヤー7とレイヤー4のトラフィック分割の違い

ALBとNLBのトラフィック分割は以下となります。

アプリケーション層、ネットワーク層OSI参照モデルのレイヤー名です。各レイヤーを担うプロトコルがあり、アプリケーション層はHTTP、トランスポート層TCPUDPが担います。
ALBとNLBではトラフィック分割で使用するプロトコルが異なることになります。

OSI reference model

引用: OSI参照モデルとTCP/IPの階層の違いとは? 【2分間Q&A(1)】|ビジネス+IT

今回作成したALBはHTTP、NLBはTCPによるリクエストを受け付けます。
HTTPとTCPによるトラフィック分割の違いを確認します。

ALBのDNSに対してHTTPリクエストを送ります。正常にレスポンスが返ってくることを確認できます。

$ (echo -e "GET / HTTP/1.1\r\nHost: test\r\n\r\n"; cat) | nc -v alb-app-1499646873.ap-northeast-1.elb.amazonaws.com 80
Connection to alb-app-1499646873.ap-northeast-1.elb.amazonaws.com port 80 [tcp/http] succeeded!
HTTP/1.1 200 OK
Date: Wed, 01 Oct 2025 00:56:32 GMT
Content-Type: text/plain; charset=utf-8
Content-Length: 46
Connection: keep-alive
X-App-Name: http-echo
X-App-Version: 1.0.0

ALB-APP-SERVICE: Successful response from ALB

次に、無効なHTTPリクエストをALBに送信します。HTTP/1.1ではHostヘッダが必須なため、ALBが400エラーを返します。
とほほのHTTP入門 - とほほのWWW入門

ALBがHTTPリクエストメッセージを見ていることが分かります。

$ (echo -e "GET / HTTP/1.1\r\n\r\n"; cat) | nc -v alb-app-1499646873.ap-northeast-1.elb.amazonaws.com 80
Connection to alb-app-1499646873.ap-northeast-1.elb.amazonaws.com port 80 [tcp/http] succeeded!
HTTP/1.1 400 Bad Request
Server: awselb/2.0
Date: Wed, 01 Oct 2025 00:58:32 GMT
Content-Type: text/html
Content-Length: 122
Connection: close

<html>
<head><title>400 Bad Request</title></head>
<body>
<center><h1>400 Bad Request</h1></center>
</body>
</html>

次に、無効なHTTPリクエストをNLBに送信します。すると、正常なレスポンスが返ってくることを確認できます。
これは、NLBがHTTPリクエストメッセージを見ず、TCPヘッダーを見てトライフィックをECSに送信してるためです。

$ (echo -e "GET / HTTP/1.1\r\n\r\n"; cat) | nc -v nlb-ebdf9de1dbb1a350.elb.ap-northeast-1.amazonaws.com 80
Connection to nlb-ebdf9de1dbb1a350.elb.ap-northeast-1.amazonaws.com port 80 [tcp/http] succeeded!
NLB-APP-SERVICE: Successful response from NLB

上記の挙動の違いから、ALBとNLBのトラフィック分割で使用するプロトコルが違うことを実際に確認できました。

ALBの「柔軟性」を確認する

AWSマネージメントコンソールのALBの説明を再掲します。

HTTP および HTTPS トラフィックを使用するウェブアプリケーション用に柔軟性の高い機能セットが必要な場合は、Application Load Balancer を選択します

ALBはHTTPトラフィックを使用することを確認できました。次は、「柔軟性の高い機能セット」の部分を確認します。
以下は、ロードバランサーの機能比較を表した表です。ALBはHTTP/HTTPSプロトコルを活用した特徴がサポートされています。

ALB feature
引用: 特徴 - Elastic Load Balancing | AWS

ALBのリソースマップから、リスナールールを確認します。
Terraformで作成したパスルーティング、リダイレクト、固定レスポンスのリスナールールを確認できます。
これらのリスナールールの挙動を確認し、ALBの「柔軟性」を確認します。

resource map

パスルーティングの挙動を確認します。
/api パスにリクエストを送信すると、alb-apiのECSサービスからのレスポンスが返ってくるのを確認できます。

$ curl alb-app-1499646873.ap-northeast-1.elb.amazonaws.com/api
ALB-API-SERVICE: Successful response from ALB

リダイレクトの挙動を確認します。
/redirectパスにリクエストを送信すると、/api パスにリダイレクトされ、alb-apiのECSサービスのレスポンスが返ってくるのを確認できます。

$ curl -L alb-app-1499646873.ap-northeast-1.elb.amazonaws.com/redirect
ALB-API-SERVICE: Successful response from ALB

固定レスポンスの挙動を確認します。
/adminパスにリクエストを送信すると、ALBで設定した固定レスポンスが返ってくることを確認できます。

$ curl alb-app-1499646873.ap-northeast-1.elb.amazonaws.com/admin
Access Denied: This resource is restricted

HTTPリクエストを用いたトラフィック分散を行う性質から、ALBでは様々なトラフィック処理を行えることが分かりました。
NLBではこのようなHTTPリクエストをもとにした処理を行うことができないため、トラフィック処理の観点で、確かにALBはNLBより「柔軟性」があると言えそうです。

ALBとNLBのレイテンシの違いを確認する

AWSマネージメントコンソールのNLBの説明に以下の文章が含まれます。

Network Load Balancer は接続レベルで動作し、非常に低いレイテンシーを維持しながら、1 秒あたり数百万のリクエストを確実に処理することができます。

NLBはALBと比較した際に、低レイテンシーという特徴が挙げられることが多いです。
NLBがALBより低レイテンシーとなるか実際に確認します。

ALBとNLBでアプリケーションの処理を揃えるために、ALBと同様の処理を行うECSサービスにNLBのトラフィックを送信するように変更します。
以下は、起動イメージを変更したECS Task定義のTerraformコードです。

resource "aws_ecs_task_definition" "nlb_app" {
  family                   = "nlb-app"
  network_mode             = "awsvpc"
  requires_compatibilities = ["FARGATE"]
  cpu                      = "256"
  memory                   = "512"
  execution_role_arn       = aws_iam_role.ecs_task_execution.arn

  container_definitions = jsonencode([
    {
      name  = "nlb-app"
      image = "hashicorp/http-echo:latest"

      portMappings = [
        {
          containerPort = 8080
          protocol      = "tcp"
        }
      ]

      command = [
        "-text=NLB-APP-SERVICE: Successful response from NLB",
        "-listen=:8080"
      ]

      logConfiguration = {
        logDriver = "awslogs"
        options = {
          "awslogs-group"         = aws_cloudwatch_log_group.ecs.name
          "awslogs-region"        = local.aws_region
          "awslogs-stream-prefix" = "nlb-app"
        }
      }
    }
  ])
}

負荷試験ツールlocustを使ってALBとNLBのレイテンシを計測します。
Locust - A modern load testing framework

ALB・NLBを起動しているパブリックサブネットにEC2インスタンスを起動し、ALB・NLBにリクエストを送信します。
以下のスクリプトをEC2で実行し、統計結果ファイルを集計します。

import gevent
from locust import HttpUser, task
from locust.env import Environment
from locust.stats import StatsCSVFileWriter

class ELBUser(HttpUser):
    host = "http://alb-app-1499646873.ap-northeast-1.elb.amazonaws.com"

    @task
    def test_request(self):
        self.client.get("/test")

env: Environment = Environment(user_classes=[ELBUser])

csv_writer: StatsCSVFileWriter = StatsCSVFileWriter(
    environment=env,
    base_filepath="./alb",
    full_history=True,
    percentiles_to_report=[0.0, 100.0],
)
gevent.spawn(csv_writer)

env.create_local_runner()
env.runner.start(1, spawn_rate=1)

gevent.spawn_later(60, lambda: env.runner.quit())
env.runner.greenlet.join()

以下はALBとNLBのレスポンス時間の比較結果です。単位はmsです。
NLBの方がALBよりも低レイテンシであることを確認できます。

Load Balancer Request Count Median Response Time Average Response Time Min Response Time Max Response Time Requests/s
ALB 13872 4.000000 4.177806 1.749528 46.194923 234.685076
NLB 45697 1.046798 1.232170 1.046798 28.522916 773.339445

参考

Amazon Data Firehoseを用いたApache Icebergテーブルの更新

本記事のコードです。
github.com

Amazon Data Firehoseとは

概要

Amazon Data Firehoseはストリーミングデータの取得・変換・保存を行うマネージドサービスです。
例えば、Eコマースのシステムのデータ基盤を開発する場合、商品のクリックや購入のイベントログを、リアルタイムでデータ基盤に反映する要件が発生します。
このような要件を満たすには、アプリケーションのイベントデータを収集し、データ形式に合わせた変換処理を行い、分析テーブルへの保存をリアルタイムで行うデータパイプラインが必要となります。
アプリケーションのイベントデータのような、継続的にリアルタイムで生成・送信されるデータをストリーミングデータと呼びます。
ストリーミングデータとは - ストリーミングデータの説明 - AWS

様々なストリーミングデータを扱う必要がある場合、リアルタイムで処理を行うデータパイプラインを自前で実装するのは考慮すべき要素が多く開発・運用コストが高いです。
例えば、データ処理が詰まらないためのバッファリング機能や、データ処理エラー時のバックアップ機能、データ保存先が複数ある場合のルーティング機能などを考慮する必要があります。
Amazon Data Firehoseを利用することで、このようなストリーミングデータの取得・変換・保存に関わる一連の機能をAWSが提供してくれます。
Amazon Data Firehose とは何ですか? - Amazon Data Firehose
ストリーミングデータパイプライン – Amazon Data Firehose (Firehose) – AWS

Apache Icebergテーブルへのデータ配信

Amazon Data Firehoseは様々なデータ送信先をサポートしています。
例えば、Amazon S3Amazon RedshiftのようなAWSサービスから、Datadog、SnowflakeのようなSaaSサービスに対しても、Amazon Data Firehoseからデータを送信することができます。
これらのデータ送信先の中から、Amazon Data Firehoseは Apache Icebergテーブルをサポートしています。
AWSでデータレイクを開発する場合、Apache Iceberg形式で保存するのが候補として上がります。
Apache Iceberg とは? – Iceberg テーブルの説明 – AWS
Apache Iceberg - Apache Iceberg™

AWSApache Icebergテーブルを管理する場合、Amazon S3 Tablesで管理する方法と、セルフマネージド管理の2つの方法があります。
Amazon Data Firehoseはこれら2つの方法で管理されたApache Icebergテーブルへのデータ転送をサポートしています。
Amazon Data Firehose を使用して Apache Iceberg テーブルにデータを配信する - Amazon Data Firehose

また、Amazon Data Firehoseが取得したデータに応じて、動的にデータ保存先テーブルを変更することができます。

lambda関数によるデータ変換処理

Amazon Data Firehoseはストリーミングデータの取得・変換・保存を行うマネージドサービスです。
Amazon Data Firehoseは、データ変換にあたる処理をユーザーが実装したlambda関数で行うことができます。

Amazon Data Firehose でソースデータを変換する - Amazon Data Firehose

lambda関数のreturnデータ内容に応じて、動的にデータ保存先テーブルを変更できます。
以下のように、metadataセクションを追懐することで保存先テーブルを指定することができます。

 {
    "recordId": "49655962066601463032522589543535113056108699331451682818000000",
    "result": "Ok",
    "data": "1IiwiI6ICJmYWxsIiwgImdgU21IiwiI6ICJmYWxsIiwg==tcHV0ZXIgU2NpZW5jZSIsICJzZW1",
    "metadata":{
    "otfMetadata":{
            "destinationTableName":"Device1234",
            "destinationDatabaseName":"IoTevents",
            "operation":"insert"
        }
    }
  }

Route incoming records to different Iceberg tables - Amazon Data Firehose

構成

全体像

Architecture

本記事で作成するシステムの構成図です。
以下の処理を行います。

  • (データ取得) Amazon Data FirehoseにAmazon SESのイベントデータを送信
  • (データ変換) AWS Lambdaで保存先テーブルに合わせてイベントデータをデータ変換
  • (データ保存) AWS Glue Data Catalogに保存された Apache Icebergテーブルにレコードを追加

使用データ

本記事ではAWSが提供するメール配信サービスのAmazon SESのイベントデータを使用します。
Amazon SES が Firehose に発行するイベントデータの例 - Amazon Simple Email Service
3つのメール配信イベントのデータを使用します。

  • Send: メール送信
  • Open: メール開封
  • Click: メールクリック

Sendイベントログ例

{
  "eventType": "Send",
  "mail": {
    "timestamp": "2016-10-14T05:02:16.645Z",
    "source": "sender@example.com",
    "sourceArn": "arn:aws:ses:us-east-1:123456789012:identity/sender@example.com",
    "sendingAccountId": "123456789012",
    "messageId": "EXAMPLE7c191be45-e9aedb9a-02f9-4d12-a87d-dd0099a07f8a-000000",
    "destination": [
      "recipient@example.com"
    ],
    "headersTruncated": false,
    "headers": [
      {
        "name": "From",
        "value": "sender@example.com"
      },
      {
        "name": "To",
        "value": "recipient@example.com"
      },
      {
        "name": "Subject",
        "value": "Message sent from Amazon SES"
      },
      {
        "name": "MIME-Version",
        "value": "1.0"
      },
      {
        "name": "Content-Type",
        "value": "multipart/mixed;  boundary=\"----=_Part_0_716996660.1476421336341\""
      },
      {
        "name": "X-SES-MESSAGE-TAGS",
        "value": "myCustomTag1=myCustomTagValue1, myCustomTag2=myCustomTagValue2"
      }
    ],
    "commonHeaders": {
      "from": [
        "sender@example.com"
      ],
      "to": [
        "recipient@example.com"
      ],
      "messageId": "EXAMPLE7c191be45-e9aedb9a-02f9-4d12-a87d-dd0099a07f8a-000000",
      "subject": "Message sent from Amazon SES"
    },
    "tags": {
      "ses:configuration-set": [
        "ConfigSet"
      ],
      "ses:source-ip": [
        "192.0.2.0"
      ],
      "ses:from-domain": [
        "example.com"
      ],
      "ses:caller-identity": [
        "ses_user"
      ],
      "myCustomTag1": [
        "myCustomTagValue1"
      ],
      "myCustomTag2": [
        "myCustomTagValue2"
      ]
    }
  },
  "send": {}
}

それぞれのイベントデータを保存する、send_log, open_log, click_logという名前のApache Icebergテーブルを作成します。

実装

Apache Icebergテーブル作成

本記事では、AWS Glue Data Catalogでテーブル情報を管理して Apache Icebergテーブルを作成します。
以下は、AWS Glue Data Catalogのデータベースと、Apache Icebergテーブルファイルを保存するS3バケットを作成するTerraformコードです。

resource "aws_s3_bucket" "glue_data" {
  bucket_prefix = "glue-data-"
}

resource "aws_glue_catalog_database" "email_event" {
  name         = "email_event"
  location_uri = "s3a://${aws_s3_bucket.glue_data.bucket}/email_event/"
}

Amazon AthenaでApache Iceberg形式でテーブルを作成するSQLを実行します。テーブルファイルの場所を、作成したS3バケットを指定します。
以下は send_logを作成するSQLです。カラムはSESのSendイベントが持つ値を部分的に選択しています。

CREATE TABLE email_event.send_log (
    event_type string,
    message_id string,
    from_address string,
    to_address string,
    timestamp timestamp,
    tags string
)
LOCATION 's3://glue-data-20250924084554937700000001/email_event/send_log/'
TBLPROPERTIES (
    'table_type' = 'ICEBERG',
    'format' = 'parquet',
    'write_compression' = 'gzip'
);

AWSマネジメントコンソールのAWS Glueのサービスページから、send_log, open_log, click_logが作成されていることを確認します。

AWS Glue Data Catalog

データ処理用lambda関数作成

Amazon SESのSend, Open, Clickイベントデータを受け取り、Apache Icebergテーブルのレコードに変換するlambda関数の実装です。
公式ドキュメントに記載されている、Amazon Data Firehoseが取得したデータをlambda関数内で処理するサンプルコードを参考にしています。
Route incoming records to different Iceberg tables - Amazon Data Firehose

イベントデータに応じて、送信先テーブルを変更するようにしています。

import base64
import json

def lambda_handler(firehose_records_input, context):
    print(
        "Received records for processing from DeliveryStream: "
        + firehose_records_input["deliveryStreamArn"]
    )

    firehose_records_output = {}
    firehose_records_output["records"] = []

    for firehose_record_input in firehose_records_input["records"]:
        payload_bytes = base64.b64decode(firehose_record_input["data"]).decode("utf-8")
        payload_dict = json.loads(payload_bytes)

        converted_data_dict = {}
        destination_table_name = None

        # 共通イベントデータ
        event_type = payload_dict["eventType"]
        converted_data_dict["event_type"] = event_type
        converted_data_dict["message_id"] = payload_dict["mail"]["messageId"]
        converted_data_dict["tags"] = payload_dict["mail"]["tags"]
        headers = payload_dict["mail"]["headers"]
        for header in headers:
            match header["name"]:
                case "From":
                    converted_data_dict["from_address"] = header["value"]
                case "To":
                    converted_data_dict["to_address"] = header["value"]

        # Sendイベントデータ用処理
        if event_type == "Send":
            converted_data_dict["timestamp"] = payload_dict["mail"]["timestamp"]
            destination_table_name = "send_log"

        # Openイベントデータ用処理
        if event_type == "Open":
            converted_data_dict["ip_address"] = payload_dict["open"]["ipAddress"]
            converted_data_dict["user_agent"] = payload_dict["open"]["userAgent"]
            converted_data_dict["timestamp"] = payload_dict["open"]["timestamp"]
            destination_table_name = "open_log"

        # Clickイベントデータ用処理
        if event_type == "Click":
            converted_data_dict["ip_address"] = payload_dict["click"]["ipAddress"]
            converted_data_dict["link"] = payload_dict["click"]["link"]
            converted_data_dict["link_tags"] = payload_dict["click"]["linkTags"]
            converted_data_dict["user_agent"] = payload_dict["click"]["userAgent"]
            converted_data_dict["timestamp"] = payload_dict["click"]["timestamp"]
            destination_table_name = "click_log"

        converted_data_str = json.dumps(converted_data_dict)
        print("converted_data_str:", converted_data_str)

        # イベントごとに送信先テーブルを指定
        firehose_record_output = {}
        firehose_record_output["data"] = base64.b64encode(
            converted_data_str.encode("utf-8")
        )
        firehose_record_output["recordId"] = firehose_record_input["recordId"]
        firehose_record_output["result"] = "Ok"
        firehose_record_output["metadata"] = {
            "otfMetadata": {
                "destinationDatabaseName": "email_event",
                "destinationTableName": destination_table_name,
                "operation": "insert",
            }
        }
        firehose_records_output["records"].append(firehose_record_output)
        print(firehose_record_output)

    return firehose_records_output

データ変換用lambda関数のIAM Roleを作成するTerraformコードです。
lambda関数のログをCloud Watchに書き込むための権限を付与します。

resource "aws_cloudwatch_log_group" "firehose_processor_lambda" {
  name              = "/aws/lambda/firehose-processor"
  retention_in_days = 365
}

resource "aws_iam_role" "firehose_processor_lambda" {
  name = "firehose_processor_lambda_role"

  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Action = "sts:AssumeRole"
        Effect = "Allow"
        Principal = {
          Service = "lambda.amazonaws.com"
        }
      }
    ]
  })
}

resource "aws_iam_policy" "firehose_processor_lambda" {
  name        = "firehose_processor_lambda_policy"
  description = "Policy for Firehose processor Lambda role"

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = [
          "logs:CreateLogGroup",
          "logs:CreateLogStream",
          "logs:PutLogEvents"
        ]
        Resource = [
          "arn:aws:logs:${local.aws_region}:${local.aws_account_id}:log-group:${aws_cloudwatch_log_group.firehose_processor_lambda.name}:*"
        ]
      }
    ]
  })
}

lambda関数用スクリプトをzip化した後、lambda関数を作成するTerraformコードです。

data "archive_file" "firehose_processor_lambda" {
  type        = "zip"
  source_dir  = "../lambda/"
  output_path = "../lambda/firehose_processor.zip"
}

resource "aws_lambda_function" "firehose_processor_lambda" {
  function_name    = "firehose-processor-handler"
  role             = aws_iam_role.firehose_processor_lambda.arn
  filename         = data.archive_file.firehose_processor_lambda.output_path
  handler          = "firehose_processor.lambda_handler"
  runtime          = "python3.12"
  timeout          = 70
  source_code_hash = filebase64sha256(data.archive_file.firehose_processor_lambda.output_path)

  logging_config {
    log_group  = aws_cloudwatch_log_group.firehose_processor_lambda.name
    log_format = "Text"
  }

  depends_on = [
    aws_cloudwatch_log_group.firehose_processor_lambda,
    data.archive_file.firehose_processor_lambda,
  ]
}

resource "aws_lambda_alias" "firehose_processor_lambda_default" {
  name             = "default"
  description      = "Default version."
  function_name    = aws_lambda_function.firehose_processor_lambda.arn
  function_version = "1"
  lifecycle {
    ignore_changes = [function_version]
  }
}

Apache Icebergテーブルへのデータ送信用Amazon Data Firehose作成

Amazon Data Firehoseの IAM Roleを作成するTerraformコードです。
Lambda関数を起動した後、AWS Glue Data Catalogに登録したApache Icebergテーブルに書き込みを行うための権限を付与しています。

resource "aws_cloudwatch_log_group" "firehose_load_iceberg" {
  name              = "/aws/firehose/firehose-load-iceberg"
  retention_in_days = 365
}

resource "aws_cloudwatch_log_stream" "firehose_load_iceberg" {
  name           = "firehose-load-iceberg-log-stream"
  log_group_name = aws_cloudwatch_log_group.firehose_load_iceberg.name
}

resource "aws_iam_role" "firehose_load_iceberg" {
  name = "firehose_load_iceberg_role"

  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Action = "sts:AssumeRole"
        Effect = "Allow"
        Principal = {
          Service = "firehose.amazonaws.com"
        }
      }
    ]
  })
}

resource "aws_iam_policy" "firehose_load_iceberg" {
  name        = "firehose_load_iceberg_policy"
  description = "Policy for Firehose load iceberg role"

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = [
          "s3:AbortMultipartUpload",
          "s3:GetBucketLocation",
          "s3:GetObject",
          "s3:ListBucket",
          "s3:ListBucketMultipartUploads",
          "s3:PutObject",
          "s3:DeleteObject"
        ]
        Resource = [
          "arn:aws:s3:::*",
          "arn:aws:s3:::*/*"
        ]
      },
      {
        Effect = "Allow"
        Action = [
          "logs:CreateLogGroup",
          "logs:CreateLogStream",
          "logs:PutLogEvents"
        ]
        Resource = [
          "arn:aws:logs:${local.aws_region}:${local.aws_account_id}:log-group:${aws_cloudwatch_log_group.firehose_load_iceberg.name}:*",
          aws_cloudwatch_log_stream.firehose_load_iceberg.arn
        ]
      },
      {
        Effect = "Allow"
        Action = [
          "glue:GetTable",
          "glue:GetDatabase",
          "glue:UpdateTable"
        ]
        Resource = [
          "arn:aws:glue:*:*:catalog",
          "arn:aws:glue:*:*:database/*",
          "arn:aws:glue:*:*:table/*/*"
        ]
      },
      {
        Effect = "Allow"
        Action = [
          "kinesis:DescribeStream",
          "kinesis:GetShardIterator",
          "kinesis:GetRecords",
          "kinesis:ListShards"
        ]
        Resource = [
          "arn:aws:kinesis:*:*:stream/*"
        ]
      },
      {
        Effect = "Allow"
        Action = [
          "lambda:InvokeFunction",
          "lambda:GetFunctionConfiguration"
        ]
        Resource = [
          "arn:aws:lambda:*:*:function:*:*"
        ]
      }
    ]
  })
}

resource "aws_iam_role_policy_attachment" "firehose_load_iceberg" {
  role       = aws_iam_role.firehose_load_iceberg.name
  policy_arn = aws_iam_policy.firehose_load_iceberg.arn
}

エラーデータ保存用のS3バケットと、Amazon Data Firehoseを作成するTerraformコードです。
aws_kinesis_firehose_delivery_stream リソースでAmazon Data Firehoseを作成します。
Apache Icebergテーブルをデータ送信先とするため、iceberg_configurationの設定を行います。また、Amazon Data Firehoseで実行するlambda関数を紐づけています。

Terraform Registry

resource "aws_s3_bucket" "firehose_load_iceberg" {
  bucket_prefix = "firehose-load-iceberg-"
}

resource "aws_kinesis_firehose_delivery_stream" "firehose_load_iceberg" {
  name        = "firehose-load-iceberg"
  destination = "iceberg"

  iceberg_configuration {
    role_arn           = aws_iam_role.firehose_load_iceberg.arn
    catalog_arn        = "arn:aws:glue:${local.aws_region}:${local.aws_account_id}:catalog"
    buffering_size     = 5
    buffering_interval = 10

    s3_configuration {
      role_arn   = aws_iam_role.firehose_load_iceberg.arn
      bucket_arn = aws_s3_bucket.firehose_load_iceberg.arn
    }

    processing_configuration {
      enabled = "true"

      processors {
        type = "Lambda"

        parameters {
          parameter_name  = "LambdaArn"
          parameter_value = "${aws_lambda_function.firehose_processor_lambda.arn}:${aws_lambda_alias.firehose_processor_lambda_default.name}"
        }
        parameters {
          parameter_name  = "BufferIntervalInSeconds"
          parameter_value = 10
        }
      }
    }

    cloudwatch_logging_options {
      enabled         = true
      log_group_name  = aws_cloudwatch_log_group.firehose_load_iceberg.name
      log_stream_name = aws_cloudwatch_log_stream.firehose_load_iceberg.name
    }
  }
}

動作確認

Amazon Data Firehoseへデータ送信

JSONファイルとして保存したAmazon SESのイベントデータを読み込み、Amazon Data Firehoseにデータを送信するPyhtonスクリプトです。

import json

import boto3

DELIVERY_STREAM_NAME = "firehose-load-iceberg"
JSON_FILE_PATH = "event_data/open.json"

def main():
    with open(JSON_FILE_PATH, "r", encoding="utf-8") as file:
        data = json.load(file)

    firehose_client = boto3.client("firehose", region_name="ap-northeast-1")
    response = firehose_client.put_record(
        DeliveryStreamName=DELIVERY_STREAM_NAME,
        Record={"Data": json.dumps(data, ensure_ascii=False).encode("utf-8")},
    )
    print(f"send data to firehose success: RecordId={response['RecordId']}")

if __name__ == "__main__":
    main()

Amazon Athenaから、Apache Icebergテーブルにデータが追加されていることを確認できます。

Amazon Athena

エラーデータを保存するS3バケットに、processing-failedが自動的に作成され、以下のようなファイルが保存されます。

エラーデータ保存用S3 Bucket

エラーデータのファイルには以下のような取得データと、エラーメッセージが含まれます。

{"rawData":"eyJldmVudFR5cGUiOiAiU2VuZCIsICJtYWlsIjogeyJ0aW1lc3RhbXAiOiAiMjAxNi0xMC0xNFQwNTowMjoxNi42NDVaIiwgInNvdXJjZSI6ICJzZW5kZXJAZXhhbXBsZS5jb20iLCAic291cmNlQXJuIjogImFybjphd3M6c2VzOnVzLWVhc3QtMToxMjM0NTY3ODkwMTI6aWRlbnRpdHkvc2VuZGVyQGV4YW1wbGUuY29tIiwgInNlbmRpbmdBY2NvdW50SWQiOiAiMTIzNDU2Nzg5MDEyIiwgIm1lc3NhZ2VJZCI6ICJFWEFNUExFN2MxOTFiZTQ1LWU5YWVkYjlhLTAyZjktNGQxMi1hODdkLWRkMDA5OWEwN2Y4YS0wMDAwMDAiLCAiZGVzdGluYXRpb24iOiBbInJlY2lwaWVudEBleGFtcGxlLmNvbSJdLCAiaGVhZGVyc1RydW5jYXRlZCI6IGZhbHNlLCAiaGVhZGVycyI6IFt7Im5hbWUiOiAiRnJvbSIsICJ2YWx1ZSI6ICJzZW5kZXJAZXhhbXBsZS5jb20ifSwgeyJuYW1lIjogIlRvIiwgInZhbHVlIjogInJlY2lwaWVudEBleGFtcGxlLmNvbSJ9LCB7Im5hbWUiOiAiU3ViamVjdCIsICJ2YWx1ZSI6ICJNZXNzYWdlIHNlbnQgZnJvbSBBbWF6b24gU0VTIn0sIHsibmFtZSI6ICJNSU1FLVZlcnNpb24iLCAidmFsdWUiOiAiMS4wIn0sIHsibmFtZSI6ICJDb250ZW50LVR5cGUiLCAidmFsdWUiOiAibXVsdGlwYXJ0L21peGVkOyAgYm91bmRhcnk9XCItLS0tPV9QYXJ0XzBfNzE2OTk2NjYwLjE0NzY0MjEzMzYzNDFcIiJ9LCB7Im5hbWUiOiAiWC1TRVMtTUVTU0FHRS1UQUdTIiwgInZhbHVlIjogIm15Q3VzdG9tVGFnMT1teUN1c3RvbVRhZ1ZhbHVlMSwgbXlDdXN0b21UYWcyPW15Q3VzdG9tVGFnVmFsdWUyIn1dLCAiY29tbW9uSGVhZGVycyI6IHsiZnJvbSI6IFsic2VuZGVyQGV4YW1wbGUuY29tIl0sICJ0byI6IFsicmVjaXBpZW50QGV4YW1wbGUuY29tIl0sICJtZXNzYWdlSWQiOiAiRVhBTVBMRTdjMTkxYmU0NS1lOWFlZGI5YS0wMmY5LTRkMTItYTg3ZC1kZDAwOTlhMDdmOGEtMDAwMDAwIiwgInN1YmplY3QiOiAiTWVzc2FnZSBzZW50IGZyb20gQW1hem9uIFNFUyJ9LCAidGFncyI6IHsic2VzOmNvbmZpZ3VyYXRpb24tc2V0IjogWyJDb25maWdTZXQiXSwgInNlczpzb3VyY2UtaXAiOiBbIjE5Mi4wLjIuMCJdLCAic2VzOmZyb20tZG9tYWluIjogWyJleGFtcGxlLmNvbSJdLCAic2VzOmNhbGxlci1pZGVudGl0eSI6IFsic2VzX3VzZXIiXSwgIm15Q3VzdG9tVGFnMSI6IFsibXlDdXN0b21UYWdWYWx1ZTEiXSwgIm15Q3VzdG9tVGFnMiI6IFsibXlDdXN0b21UYWdWYWx1ZTIiXX19LCAic2VuZCI6IHt9fQo=","errorCode":"Lambda.InvokeAccessDenied","errorMessage":"Access was denied. Ensure that the access policy allows access to the Lambda function.","attemptsMade":4,"arrivalTimestamp":1758706488335,"attemptEndingTimestamp":1758706512945,"lambdaARN":"arn:aws:lambda:ap-northeast-1:559802578070:function:firehose-processor-handler"}

参考

プライベートサブネット内のlambdaからVPC Endpoints経由でSESを実行する

構成

プライベートサブネット内で起動したlambda関数から、インターネットを経由せずSESを呼び出そうと思います。
本記事では以下の構成でlambda関数からSESを呼び出します。

acrhitecture

本文中コード.
github.com

lambdaからSESにアクセスする手段

プライベートサブネット内のlambdaからSESにメールを送信する場合、2通りのインフラ構成が考えれられます。
参考: [アップデート] Amazon SES が SMTP エンドポイントの VPC エンドポイントをサポートしました | DevelopersIO

また、lambdaようなアプリケーションからSESにリクエストを送る場合、SES APISMTPインターフェイスの2通りの方法があります。
Set up email sending with Amazon SES - Amazon Simple Email Service

インターネット経由でSESにアクセス(SES API / SMTPインターフェイス)

1つ目はNAT GatewayとInternet Gatewayを設置し、インターネット経由でSESを呼び出す方法です。
この方法ではSES APISMTPインターフェイス両方の方法でSESにアクセスすることができます。
今回はインターネット接続を行わない方法を検討するので、この構成は採用しません。

Use Nat Gateway

VPC Endpoints経由でSESにアクセス(SMTPインターフェイス)

VPC Endpoints経由でプライベートサブネットのクライアントから直接SESへアクセスできます。
注意点として、VPC Endpointsを利用したSESへのアクセスはSMTPインターフェイスのみサポートされています
AWSサポートに問い合わせてくれている方がいます。
VPC内のLambdaからはSES向けのVPCエンドポイントを使用してもBoto3ではメールを送信できない #Python - Qiita

インターネットを経由しないため、今回はこの構成を採用します。

Use VPC Endpoints

VPC Endpoints経由でSTMPインターフェイスでメール送信する

Terraform

AWSリソースを作成するTerraformを用意しました

github.com

$ git clone https://github.com/nsakki55/lambda-ses-vpc-endpoints
$ cd lambda-ses-vpc-endpoints

AWSアカウントとSESで送るメールの値を環境変数に設定します。

$ export AWS_ACCOUNT_ID=<aws_account_id> 
$ export AWS_REGION=<aws_region>
$ export DOMAIN=<domain>
$ export FROM_ADDRESS=<from_address>
$ export TO_ADDRESS=<to_address>

今回のインフラ構成をterraformで作成します。

$ cd terraform
$ terraform init
$ terraform apply -var="aws_account_id=$AWS_ACCOUNT_ID" -var="aws_region=$AWS_REGION" -var="domain=$DOMAIN" 
$ cd ..

lambda関数のDocker imageをビルドしECRにデプロイします

$ make build
$ make push

SMTP認証情報の作成

SMTPインターフェイスでSESにアクセスする場合、SMTP認証情報を取得する必要があります。
AWSコンソールからSMTP認証ユーザーを作成する手順が公式ドキュメントで紹介されています。
Obtaining Amazon SES SMTP credentials - Amazon Simple Email Service.

IAMユーザーを作成し、SMTP認証に必要な2つの値をSecretManagerに保存しています。

  • アクセスキー
  • シークレットアクセスキーから作成したSMTPパスワード

シークレットアクセスキーとSMTPパスワードは異なる値ですので注意してください。

SMTP認証用のIAMユーザーに、SESでメールを送信するためのポリシーをアタッチしています。

resource "aws_iam_user" "smtp_user" {
  name = "smtp-user"
}

resource "aws_iam_access_key" "smtp_user_key" {
  user = aws_iam_user.smtp_user.name
}

resource "aws_secretsmanager_secret" "smtp_credential" {
  name = "smtp-credential"
}

resource "aws_secretsmanager_secret_version" "smtp_credentials" {
 secret_id = aws_secretsmanager_secret.smtp_credential.id
 secret_string = jsonencode({
   username = aws_iam_access_key.smtp_user_key.id
   password = aws_iam_access_key.smtp_user_key.ses_smtp_password_v4
 })
}

resource "aws_iam_user_policy" "smtp_user_policy" {
  name = "smtp-user-policy"
  user = aws_iam_user.smtp_user.name

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = [
          "ses:SendEmail",
          "ses:SendRawEmail"
        ]
        Resource = "*"
      }
    ]
  })
}

SMTPインターフェイスVPC Endpointsの作成

SMTPインターフェイスでSESにアクセスするための、VPC Endpoints、セキュリティグループを作成します。

com.amazonaws.ap-northeast-1.email-smtp がSES用のVPC Endpointsサービスです。

SMTPプロトコルで通信するために、587ポートのインバウンドルールをVPC Endpointsのセキュリティグループに設定します。

resource "aws_vpc" "main" {
  cidr_block           = "10.1.0.0/16"
  enable_dns_support   = true
  enable_dns_hostnames = true
}

resource "aws_subnet" "private" {
  vpc_id            = aws_vpc.main.id
  availability_zone = "${var.aws_region}a"
  cidr_block        = "10.1.10.0/24"
}

locals {
  aws_services = [
    "com.amazonaws.${var.aws_region}.email-smtp",
    "com.amazonaws.${var.aws_region}.secretsmanager",
  ]
}

resource "aws_vpc_endpoint" "aws_services_interface_type" {
  for_each            = toset(local.aws_services)
  vpc_id              = aws_vpc.main.id
  service_name        = each.value
  vpc_endpoint_type   = "Interface"
  private_dns_enabled = true
  subnet_ids          = [aws_subnet.private.id]
  security_group_ids  = [aws_security_group.aws_services_interface_endpoints.id]
  tags = {
    Name = "Invoke SES via SMTP"
  }
}

resource "aws_security_group" "aws_services_interface_endpoints" {
  name   = "aws-services-interface-endpoints-sg"
  vpc_id = aws_vpc.main.id
}

resource "aws_security_group" "invoke_ses" {
  name   = "invoke-ses-sg"
  vpc_id = aws_vpc.main.id
}

resource "aws_vpc_security_group_egress_rule" "invoke_ses" {
  security_group_id = aws_security_group.invoke_ses.id
  cidr_ipv4         = "0.0.0.0/0"
  ip_protocol       = "-1"
  from_port         = -1
  to_port           = -1
}

resource "aws_vpc_security_group_ingress_rule" "allows_access_to_interface_endpoints" {
  security_group_id            = aws_security_group.aws_services_interface_endpoints.id
  referenced_security_group_id = aws_security_group.invoke_ses.id
  from_port                    = 443
  to_port                      = 443
  ip_protocol                  = "tcp"
}

resource "aws_vpc_security_group_ingress_rule" "allows_smtp_access_to_interface_endpoints" {
  security_group_id            = aws_security_group.aws_services_interface_endpoints.id
  referenced_security_group_id = aws_security_group.invoke_ses.id
  from_port                    = 587
  to_port                      = 587
  ip_protocol                  = "tcp"
}

SMTPインターフェイスでSESにアクセスするlambda関数の作成

SecretManagerからSMTP認証情報を取得し、SMTPインターフェイスでSESにアクセスする処理を記述したlambda関数です。

SMTPサーバーにはSESのVPC EndpointsのプライベートDNSemail-smtp.ap-northeast-1.amazonaws.com を指定します。

import os
import json
import boto3
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart

def get_smtp_credential() -> tuple[str, str]:
    session = boto3.session.Session()
    client = session.client(service_name="secretsmanager", region_name="ap-northeast-1")
    response = client.get_secret_value(SecretId="smtp-credential")
    secret = json.loads(response["SecretString"])

    smtp_username = secret["username"]
    smtp_password = secret["password"]
    return smtp_username, smtp_password

def handler(event, context) -> None:
    # メールの作成
    msg = MIMEMultipart()
    msg["From"] = os.environ['FROM_ADDRESS']
    msg["To"] = os.environ['TO_ADDRESS']
    msg["Subject"] = "test title"
    email_body = "This is test email via smtp."
    msg.attach(MIMEText(email_body, "plain"))

    # SMTPサーバーにメール送信
    smtp_server = "email-smtp.ap-northeast-1.amazonaws.com"
    smtp_port = 587
    smtp_username, smtp_password = get_smtp_credential()
    with smtplib.SMTP(smtp_server, smtp_port) as server:
        server.starttls()
        server.login(smtp_username, smtp_password)
        server.send_message(msg)

    return {"statusCode": 200, "body": json.dumps("Email sent successfully")}

動作確認

lambda関数をテスト実行すると、実行が成功します。

lambda test

設定した送信元メールアドレスから、メールが届いてることを確認できます。

mail from ses

参考

AWS Glueでicebergテーブルのスキーマを動的に変更する


本記事は datatech-jp Advent Calendar 2024 の13日目の記事です。

はじめに

Glue Catalogに登録したiceberg tableにAWS GlueJobでデータの書き込みを行うケースは多いかと思います。

通常、Glue Catalogのスキーマに登録されていないカラムを持つデータをGlue Jobで書き込もうとするとエラーとなります。

そのため、新しいカラムを持つデータを書き込む前に alter table add columns をAthenaなどで実行し、iceberg tableのスキーマ変更を行う必要があります。
ALTER TABLE ADD COLUMNS - Amazon Athena

Glue Jobの処理で新しくカラムを追加した場合や、元データにカラムが追加された場合に毎回上記の対応を行うのは運用コストが高く、操作ミスの危険もあります。

本記事では、扱うデータのカラムに応じて動的にiceberg tableのカラムを追加する方法をまとめます。

Glue Job実行のためのAWSリソース準備

AWS GlueJobを実行するための、S3 Bucket・IAM Role・Glue Database・Glue Jobを作成するTerraformを作成しました。

code-for-blogpost/glue_job_iceberg_schema_change/terraform at main · nsakki55/code-for-blogpost · GitHub

$ cd terraform 
$ terraform init
$ terraform apply

AWS Glue jobでiceberg tableに対する操作を有効化する場合、Glue JobのSpark Configurationを適切に設定する必要があります。
AWS Glue での Iceberg フレームワークの使用 - AWS Glue.

terraformのコードからglue jobのリソースを作成する部分を抜粋すると、以下のようになります。

resource "aws_glue_job" "update_iceberg_table_schema" {
  name              = "update_iceberg_table_schema"
  role_arn          = aws_iam_role.glue_job_role.arn
  glue_version      = "4.0"
  worker_type       = "G.1X"
  number_of_workers = 2
  max_retries       = 0
  execution_property {
    max_concurrent_runs = 10
  }

  command {
    script_location = "s3://${aws_s3_bucket.glue_job.bucket}/scripts/update_iceberg_table_schema.py"
  }

  default_arguments = {
    "--enable-glue-datacatalog"          = "true"
    "--TempDir"                          = "s3://${aws_s3_bucket.glue_job.bucket}/temporary/"
    "--spark-event-logs-path"            = "s3://${aws_s3_bucket.glue_job.bucket}/sparkHistoryLogs/"
    "--enable-job-insights"              = "false"
    "--enable-continuous-cloudwatch-log" = "true"
    "--datalake-formats"                 = "iceberg"
    # conf to enable iceberg format. ref: https://docs.aws.amazon.com/ja_jp/glue/latest/dg/aws-glue-programming-etl-format-iceberg.html
    "--conf" = "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.catalog.glue_catalog=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.glue_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog --conf spark.sql.catalog.glue_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO --conf spark.sql.catalog.glue_catalog.warehouse=file:///tmp/spark-warehouse"
  }

Glue Jobで取り込んでicebergテーブルに書き込むためのサンプルデータを2つ用意しました。

https://github.com/nsakki55/code-for-blogpost/tree/main/glue_job_iceberg_schema_change/data

test_data.csv

col1 col2
aaa 1
bbb 2
ccc 3

test_data_new_column.csv

col2 col1 col3
444 ddd XXX
555 eee YYY
666 fff ZZZ

2つのテストデータの差は以下です

  • col1, col2のカラムの順序が異なる
  • col3という新しいカラムが追加されている

今回はS3 Bucketに2つのサンプルデータを配置して、Glue Jobで読み込む元データとします。

アップロード先のS3 Bucket名をterraform applyで生成したS3 Bucket名に変更して下さい。

$ aws s3 cp ./data/test_data.csv s3://schema-change-data-20241208085330834400000002/input/
$ aws s3 cp ./data/test_data_new_column.csv s3://schema-change-data-20241208085330834400000002/input/

icebergテーブルにデータを追加するGlueJobの作成

S3からcsvデータを読み込んだ後、以下の処理を行うGlue Jobを作成しました。

  • テーブルが存在しない場合
    • テーブルを作成
  • テーブルが存在する場合
    • テーブルにデータを追加
import sys
from typing import Dict

from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions
from awsglue.dynamicframe import DynamicFrame
from pyspark.context import SparkContext

S3_BUCKET = "schema-change-data-20241208085330834400000002"
TABLE_NAME = "test_table"
DATABASE_NAME = "test_database"
CATALOG_NAME = "glue_catalog"

def get_dynamic_frame_from_s3(glue_context: GlueContext, source_s3_path: str) -> DynamicFrame:
    print(f"Start get dynamic frame from S3. {source_s3_path=}")
    dyf = glue_context.create_dynamic_frame.from_options(
        format_options={
            "withHeader": True,
            "separator": ",",
        },
        connection_type="s3",
        format="csv",
        connection_options={
            "paths": [source_s3_path],
        },
    )
    print(f"Finished get dynamic frame from S3. {dyf.count()=}")
    return dyf

def check_table_in_database(glue_context: GlueContext, database_name: str, table_name: str) -> bool:
    print(f"Start check table in database. {database_name=}, {table_name=}")
    tables = glue_context.spark_session.catalog.listTables(database_name)
    is_exist = table_name in [table.name for table in tables]
    print(f"Finished check table in database. {is_exist=}")
    return is_exist

def main(args: Dict[str, str]) -> None:
    sc = SparkContext()
    glue_context = GlueContext(sc)
    job = Job(glue_context)
    job.init(args["JOB_NAME"], args)
    print(f"Start update iceberg table schema. {args=}")

    dyf = get_dynamic_frame_from_s3(
        glue_context=glue_context,
        source_s3_path=f"s3://{S3_BUCKET}/input/{args['file_name']}",
    )
    df = dyf.toDF()
    df.printSchema()

    is_exist = check_table_in_database(glue_context=glue_context, database_name=DATABASE_NAME, table_name=TABLE_NAME)

    table_path = f"{CATALOG_NAME}.{DATABASE_NAME}.{TABLE_NAME}"
    if is_exist:
        df.writeTo(table_path).append()
    else:
        df.writeTo(table_path).tableProperty("format-version", "2").tableProperty("location", f"s3://{S3_BUCKET}/output").create()

    print(f"Finished update iceberg table schema. {args=}")
    job.commit()

if __name__ == "__main__":

    args = getResolvedOptions(sys.argv, ["JOB_NAME", "file_name"])
    main(args)

Glue Jobの実行スクリプトをS3にアップロードします。S3 Bucketをterraform applyで作成したS3 Bucket名に変更して下さい。

$ aws s3 cp ./src/update_iceberg_table_schema.py s3://glue-job-20241208085330834300000001/scripts/ 

1つ目のサンプルデータであるtest_data.csvを読み込むGlue Jobを実行します。

$ aws glue start-job-run \
    --job-name update_iceberg_table_schema \
    --arguments '{"--file_name": "test_data.csv"}'

Glue Catalogのスキーマを見ると、col1, col2が追加されています

glue catalog

icebergテーブルのスキーマを変更する

icebergでは動的にスキーマ変化できる、mergeSchemaというオプションが提供されています。
Writes - Apache Iceberg™.

mergeSchemaオプションを使用する場合、以下の挙動となります。

  • 新しいカラムがデータソースに存在するが、対象のテーブルにカラムが存在しない場合

    → 新しいカラムが対象のテーブルに追加される。既存のレコードの新しいカラムにはnullが設定される。

  • データソースにカラムが存在しないが、対象のテーブルにはカラムが存在する場合

    → 新しいレコードを追加・レコードを更新すると、対象のカラムにnullが設定される。

mergeSchemaオプションを使用するには、対象のテーブルのプロパティ設定に 'write.spark.accept-any-schema'='true' を追加する必要があります。

ALTER TABLE test_database.test_table SET TBLPROPERTIES (
  'write.spark.accept-any-schema'='true'
)

上記のプロパティ追加のクエリをAthena経由で実行すると、サポートされていないプロパティエラーがでます。

Unsupported table property key: write.spark.accept-any-schema

Athenaでは変更可能なicebergテーブルのプロパティに制限があります。
Create Iceberg tables - Amazon Athena

そのため、上記のプロパティ変更をAthena経由では行えません。

AWSが出してるicebergテーブルに関する記事で、'write.spark.accept-any-schema'='true' のプロパティ設定をspark経由で実行してるのを確認できます。
Modernize your legacy databases with AWS data lakes, Part 2: Build a data lake using AWS DMS data on Apache Iceberg | AWS Big Data Blog

本記事ではこの方法に習い、spark経由で必要なプロパティ設定を行います。

既存テーブルへのデータ追加の実装を以下のように変更しました。

sql = f"ALTER TABLE {table_path} SET TBLPROPERTIES ('write.spark.accept-any-schema' = 'true')"
glue_context.spark_session.sql(sql)
df.writeTo(table_path).option("mergeSchema","true").append()

新しいカラムを持つcsvデータを読み込むGlueJobを実行します

$ aws glue start-job-run \
    --job-name update_iceberg_table_schema \
    --arguments '{"--file_name": "test_data_new_column.csv"}'

しかし、上記の設定のGlueJobは失敗します。

原因はcol1, col2のカラムの順番がテーブルスキーマと元データで異なるためです。

2024-12-09 01:21:18,419 ERROR [main] glue.ProcessLauncher (Logging.scala:logError(77)): Error from Python:Traceback (most recent call last):
  File "/tmp/update_iceberg_table_schema.py", line 74, in <module>
    main(args)
  File "/tmp/update_iceberg_table_schema.py", line 63, in main
    df.writeTo(table_path).option("mergeSchema","true").append()
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 1460, in append
    self._jwriter.append()
  File "/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 196, in deco
    raise converted from None
pyspark.sql.utils.IllegalArgumentException: Cannot write incompatible dataset to table with schema:
table {
  1: col1: optional string
  2: col2: optional string
  3: col3: optional string
}
Provided schema:
table {
  2: col2: optional string
  1: col1: optional string
  3: col3: optional string
}
Problems:
* col2 is out of order, before col1

icebergのspark optionには check-ordering という設定があります。

これは入力スキーマとテーブルスキーマが同じかチェックする設定で、デフォルトではTrueとなります。

icebergのGitHubレポジトリのmergeSchemaに関するissueで議論されているように、スキーマ変更するためにcheck-ordering をFalseにする必要があります。
Adding new columns (mergeSchema) · Issue #8908 · apache/iceberg · GitHub

GlueJobのリソースを作成するterraform設定に、以下のconfigを追加します。

--conf spark.sql.iceberg.check-ordering=false

GlueJobの変更を反映します

$ terraform apply

新しいカラムを持つcsvデータを読み込むGlue Jobを再び実行します。今度はJobが成功します。

$ aws glue start-job-run \
    --job-name update_iceberg_table_schema \
    --arguments '{"--file_name": "test_data_new_column.csv"}'

Glue Catalogのスキーマを確認すると、スキーマが変更されcol3が追加されているのを確認できます。

glue catalog with new column

test_tableの中身を確認してみると、既存のレコードには新しく追加されたcol3のカラムにnullが設定されています。

test_table

GlueJobから動的にiceberg tableのスキーマを変更できることができるようになりました。

参考

Pythonプロジェクトでflat layoutではなくsrc layoutが推奨される理由を理解する

本文中コード
github.com

flat layoutとsrc layoutについて

Pythonプロジェクトのディレクトリ構成について調べてたところ、flat layoutとsrc layoutという2種類のディレクトリ構成が存在することを知りました。
src レイアウト対フラットレイアウト - Python Packaging User Guide

flat layout

flat layoutはパッケージフォルダをプロジェクトのルート直下に配置するスタイルです。
flat layoutの有名なpythonプロジェクトだと、 pytorch, django, tensorflow があります。

.
├── README.md
├── pyproject.toml
└── my_package/
    ├── __init__.py
    └── module.py

src layout

一方、src layoutはsrcサブディレクトリにパッケージフォルダを配置するスタイルです。
src layoutの有名なpythonプロジェクトだと、transfomers, flask, black があります。

├── README.md
├── pyproject.toml
└── src/
     └── my_package/
        ├── __init__.py
        └── module.py

Pythonパッケージを開発する上ではsrc layoutが推奨されている

pytestの公式ドキュメントでは、src layoutが推奨されています。
Good Integration Practices - pytest documentation

Generally, but especially if you use the default import mode prepend, it is strongly suggested to use a src layout. Here, your application root package resides in a sub-directory of your root, i.e. src/mypkg/ instead of mypkg.

PyPA(Python Packaging Authority)のPython Packaging User GuideのGitHubレポジトリでも、src layoutを好むユーザーが多いことが伺えます。
https://github.com/pypa/packaging.python.org/issues/320

Python Packaging User Guideからsrc layoutとflat layoutの特徴のポイントを抜粋すると、以下のように書かれています。
src レイアウト対フラットレイアウト - Python Packaging User Guide

  • ソースコードを実行するために、src layoutはインストールステップが必要となるが、flat layoutはインストールステップが不要
  • Pythonインタープリタカレントワーキングディレクトリをインポートパスの先頭に含むため、flat layoutでは開発中のコードを使用してしまう危険があるが、src layoutではインストール済みパッケージを使用することが保証されている

自分はこれらの説明を読んだ時に、何となく雰囲気は分かるけど、自分事として理解できていないモヤモヤがありました。
実際にflat layoutとsrc layoutでパッケージ開発の流れを再現してみて、src layoutがパッケージテストの上で安全であることを理解してみようと思います。

flat layoutでパッケージ開発

パッケージ構成

code-for-blogpost/src_vs_flat_layout/flat_layout at main · nsakki55/code-for-blogpost · GitHub

.
├── mypkg_flat
│   ├── __init__.py
│   └── math.py
├── tests
│   ├── __init__.py
│   └── test_math.py
├── pyproject.toml
├── requirements-dev.txt
└── tox.ini

mypkg_flat というディレクトリをプロジェクトルート直下に作成しました。

パッケージ内のモジュールであるmath.py には、足し算と引き算を行うadd, substract関数を用意します。

def add(a: float, b: float) -> float:
    return a + b

def substract(a: float, b: float) -> float:
    return a - b

tests/test_math.py には mypkg_flat パッケージのテストを記述します。

from mypkg_flat.math import add, subtract

def test_add():
    assert add(2, 3) == 5

def test_subtract():
    assert subtract(5, 3) == 2

pyproject.tomlを使用してパッケージビルドを行います。
pyproject.tomlによるパッケージビルドの方法はnikkieさんの記事を参考にしました。
Pythonで自作ライブラリを作るとき、setup.pyに代えてpyproject.tomlを使ってみませんか? - nikkie-ftnextの日記
以下の内容のpyproject.tomlに記述します。mypkg_flat ディレクトリをビルド対象としています。

[project]
name = "mypkg-flat"
version = "0.1.0"
description = "Example package using flat layout"
requires-python = ">=3.11"
dependencies = [
    "pytest",
]

[build-system]
requires = ["setuptools"]
build-backend = "setuptools.build_meta"

[tool.setuptools]
packages = ["mypkg_flat"]

mypkg_flat パッケージのビルドを行います。distフォルダ内にビルド済みのパッケージファイルが作成されます。

$ python -m build

$ ls dist/
> mypkg_flat-0.1.0-py3-none-any.whl  mypkg_flat-0.1.0.tar.gz

requirements-dev.txtに mypkg_flat を含めます。

pytest
mypkg_flat

パッケージの作成ができたので、toxで作った仮想環境にビルド済みパッケージをインストールしてテストを実行します。
tox.iniに skipdist=true を設定することで、requirements-dev.txtのインストール時にビルドが走らないようにします。
Configuration - tox

[tox]
envlist = py312
skipsdist = true

[testenv]
install_command = pip install --find-links=dist {opts} {packages}
deps = -r requirements-dev.txt
commands =
    pytest  tests

tox -r コマンドでテストを実行します。 -r オプションをつけて仮想環境を作り直してます。

$ tox -r
py312: remove tox env folder /Users/satsuki/github/code-for-blogpost/src_vs_flat_layout/flat_layout/.tox/py312
py312: install_deps> pip install --find-links=dist -r requirements-dev.txt
py312: commands[0]> pytest tests
================================================================================ test session starts ================================================================================
platform darwin -- Python 3.12.5, pytest-8.3.3, pluggy-1.5.0
cachedir: .tox/py312/.pytest_cache
rootdir: /Users/satsuki/github/code-for-blogpost/src_vs_flat_layout/flat_layout
configfile: pyproject.toml
collected 2 items

tests/test_math.py ..                                                                                                                                                         [100%]

================================================================================= 2 passed in 0.00s =================================================================================
  py312: OK (1.07=setup[0.95]+cmd[0.12] seconds)
  congratulations :) (1.09 seconds)

開発中コードがパッケージ外から直接importされるのを確認する

パッケージに含まれない開発中のコードが意図せず使用される状況を再現してみます。

math.pyモジュールに掛け算を行うmultiple関数を開発中のコードとして追加します。

def add(a: float, b: float) -> float:
    return a + b

def substract(a: float, b: float) -> float:
    return a - b

def multiple(a: float, b: float) -> float:
    return a * b

パッケージをテストする tests/test_math.pyにmultiple関数のテストを追加します。

from mypkg_flat.math import add, substract, multiple

def test_add():
    assert add(2, 3) == 5

def test_subtract():
    assert substract(5, 3) == 2
    
def test_multiple():
    assert multiple(2, 5) == 10

パッケージのビルドを行っていない状態から、 mypkg_flat をインストールしてmultipleをimportしようとすると、パッケージに含まれていない関数を読み込もうとしてるのでエラーが発生します。

$ cd dist
$ pip install mypkg_flat-0.1.0.tar.gz 
$ python
>>> from mypkg_flat.math import multiple
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
ImportError: cannot import name 'multiple' from 'mypkg_flat.math'

この状態でテストを実行すると奇妙なことが起きます。
開発中コードを含めてビルドを実行してないにも関わらず、テストが通ってしまいます。

$ cd .. # プロジェクトのルートディクトリに移動
$ tox -r
py312: remove tox env folder /Users/satsuki/github/code-for-blogpost/src_vs_flat_layout/flat_layout/.tox/py312
py312: install_deps> pip install --find-links=dist -r requirements-dev.txt
py312: commands[0]> pytest tests
================================================================================ test session starts ================================================================================
platform darwin -- Python 3.12.5, pytest-8.3.3, pluggy-1.5.0
cachedir: .tox/py312/.pytest_cache
rootdir: /Users/satsuki/github/code-for-blogpost/src_vs_flat_layout/flat_layout
configfile: pyproject.toml
collected 3 items

tests/test_math.py ...                                                                                                                                                        [100%]

================================================================================= 3 passed in 0.01s =================================================================================
  py312: OK (1.32=setup[1.19]+cmd[0.13] seconds)
  congratulations :) (1.36 seconds)

何が起きてるか確認します。
toxで作成されたpython仮想環境に入って、 mypkg_flat パッケージの読み込み先を見てみると、ライブラリディレクトリ内ではなく、mypkg_flatディレクトリ中のコードを直接読みに行ってることがわかります。

$ source .tox/py312/bin/activate 
$ python
>>> import mypkg_flat
>>> mypkg_flat.__file__
'/Users/satsuki/github/code-for-blogpost/src_vs_flat_layout/flat_layout/mypkg_flat/__init__.py'

pythonのモジュール読み込みパス一覧を取得すると、カレントディレクトリが先頭にあるのを確認できます。

>>> import sys
>>> sys.path
['', '/Users/satsuki/github/code-for-blogpost/src_vs_flat_layout/flat_layout/.tox/py312/lib/python3.12/site-packages']

ドキュメントに記載されているように、pythonではデフォルトのPYTHONPATH設定では、モジュールの読み込みはライブラリディレクトリより、カレントディレクトリが優先されます。
6. Modules — Python 3.13.0 documentation

The directory containing the script being run is placed at the beginning of the search path, ahead of the standard library path. This means that scripts in that directory will be loaded instead of modules of the same name in the library directory.

そのため、パッケージを読み込んでるつもりが、実は開発中のコードを直接読み込んでいる状態が発生してしまいます。
この問題をsrc layoutで解決できることを確認します。

src layoutでパッケージ開発

パッケージ構成

code-for-blogpost/src_vs_flat_layout/src_layout at main · nsakki55/code-for-blogpost · GitHub

.
├── src
│   └── mypkg_src
│       ├── __init__.py
│       └── math.py
├── tests
│   ├── __init__.py
│   └── test_math.py
├── pyproject.toml
├── requirements-dev.txt
└── tox.ini

mypkg_src というディレクトリをsrcサブディレクトリ以下に作成しました。
パッケージ内のコードとテストは mypkg_flat と同じにするので省略します。

pyproject.tomlは以下の設定としました。パッケージのビルド対象をsrcディレクトリにしています。

[project]
name = "mypkg-src"
version = "0.1.0"
description = "Example package using src layout"
requires-python = ">= 3.12"
dependencies = [
    "pytest",
]

[build-system]
requires = ["setuptools >= 61.0"]
build-backend = "setuptools.build_meta"

[tool.setuptools]
package-dir = {"" = "src"}

mypkg_src パッケージのビルドを行います。

$ python -m build

$ ls dist/
mypkg_src-0.1.0-py3-none-any.whl  mypkg_src-0.1.0.tar.gz

requirements-dev.txtに mypkg_src を含めます。

pytest
mypkg_src

flat layoutの場合と同様の設定でtoxによるテストを実行します。
mypkg_src パッケージがインストールされ、pytestのコードから mypkg_src パッケージが読み込まれているを確認できます。

$ tox -r
py312: remove tox env folder /Users/satsuki/github/code-for-blogpost/src_vs_flat_layout/src_layout/.tox/py312
py312: install_deps> pip install --find-links=dist -r requirements-dev.txt
py312: commands[0]> pytest tests
================================================================================ test session starts ================================================================================
platform darwin -- Python 3.12.5, pytest-8.3.3, pluggy-1.5.0
cachedir: .tox/py312/.pytest_cache
rootdir: /Users/satsuki/github/code-for-blogpost/src_vs_flat_layout/src_layout
configfile: pyproject.toml
collected 2 items

tests/test_math.py ..                                                                                                                                                         [100%]

================================================================================= 2 passed in 0.00s =================================================================================
  py312: OK (1.62=setup[1.50]+cmd[0.12] seconds)
  congratulations :) (1.64 seconds)

開発中コードをimportできずテストが失敗することを確認する

flat layoutと同様に、開発中のコードであるmultiple関数を加えた場合の挙動を確認します。

multiple関数を含めたパッケージビルドを行う前に、テストを実行します。
パッケージに含まれていないmultiple関数の読み込みエラーがでて、開発中のコードが使われないことを確認できます。

$ tox -r
py312: remove tox env folder /Users/satsuki/github/code-for-blogpost/src_vs_flat_layout/src_layout/.tox/py312
py312: install_deps> pip install --find-links=dist -r requirements-dev.txt
py312: commands[0]> pytest tests
================================================================================ test session starts ================================================================================
platform darwin -- Python 3.12.5, pytest-8.3.3, pluggy-1.5.0
cachedir: .tox/py312/.pytest_cache
rootdir: /Users/satsuki/github/code-for-blogpost/src_vs_flat_layout/src_layout
configfile: pyproject.toml
collected 3 items

tests/test_math.py ..F                                                                                                                                                        [100%]

===================================================================================== FAILURES ======================================================================================
___________________________________________________________________________________ test_multiple ___________________________________________________________________________________

    def test_multiple():
>       assert multiple(2, 5) == 10
E       NameError: name 'multiple' is not defined

tests/test_math.py:10: NameError
============================================================================== short test summary info ==============================================================================
FAILED tests/test_math.py::test_multiple - NameError: name 'multiple' is not defined
============================================================================ 1 failed, 2 passed in 0.01s ============================================================================
py312: exit 1 (0.13 seconds) /Users/satsuki/github/code-for-blogpost/src_vs_flat_layout/src_layout> pytest tests pid=17455
  py312: FAIL code 1 (2.17=setup[2.04]+cmd[0.13] seconds)
  evaluation failed :( (2.20 seconds)

toxで作成された仮想環境に入って、 mypkg_src の読み取り先を見ると、ライブラリディレクトリ内からパッケージが読み込まれているのを確認できます。

$ source .tox/py312/bin/activate 

$ python
>>> import mypkg_src
>>> mypkg_src.__file__
'/Users/satsuki/github/code-for-blogpost/src_vs_flat_layout/src_layout/.tox/py312/lib/python3.12/site-packages/mypkg_src/__init__.py'

パッケージをビルドし直してテストを実行すると、テストが成功します。

$ python -m build

$ tox -r
py312: remove tox env folder /Users/satsuki/github/code-for-blogpost/src_vs_flat_layout/flat_layout/.tox/py312
py312: install_deps> pip install --find-links=dist -r requirements-dev.txt
py312: commands[0]> pytest tests
================================================================================ test session starts ================================================================================
platform darwin -- Python 3.12.5, pytest-8.3.3, pluggy-1.5.0
cachedir: .tox/py312/.pytest_cache
rootdir: /Users/satsuki/github/code-for-blogpost/src_vs_flat_layout/flat_layout
configfile: pyproject.toml
collected 3 items

tests/test_math.py ...                                                                                                                                                        [100%]

================================================================================= 3 passed in 0.00s =================================================================================
  py312: OK (1.92=setup[1.54]+cmd[0.38] seconds)
  congratulations :) (1.96 seconds)

src layoutではパッケージコードを実行するためにインストールが必要となり、開発中のコードが意図せず実行されるのを防げることを確認できました。

pythonプロジェクトはsrc layoutにすべきなのか?

pythonのパッケージ開発のテスト観点からは、src layoutの方が安全であることは分かりました。
パッケージ管理ツールであるpoetryでプロジェクト作成すると、 デフォルトではflat layoutで作成される一方、src layoutで作成するオプションも提供されています。
Commands | Documentation | Poetry - Python dependency management and packaging made easy

同じくpythonのパッケージ管理ツールであるuvでプロジェクトを作成すると、アプリケーションの場合はflat layout、パッケージの場合は src layoutで作成されます。
Projects | uv

uvの思想に従うなら

  • アプリケーションの場合 : flat layout
  • パッケージの場合 : src layout

で使い分けるのが今のpython界隈のデファクトスタンダードと言えるのでしょうか?

GitHub Star数が上位のpytonプロジェクトを見てみると flat layoutの構成をとってるものも多い印象です。
ML界隈でよく使われるプロジェクトで探してみると、以下のプロジェクトはflat layoutとなっていました。

これらのプロジェクトがflat layoutを採用してる思想を自分はまだ分かってないです。

参考