こんにちは、開発本部 開発2部 RetailHUB NetSuperグループに所属するホーク🦅アイ👁️です。🎋七夕🎋が近いですが皆さまいかがお過ごしでしょうか。
背景
最近我々のチームで管理しているAWSサービスにおいて本番環境のS3サービスが毎月右肩上がりにコスト上昇していることがわかりました。aws s3api
コマンドや、python SDKの get_cost_and_usage_with_resources
APIを駆使してコスト増の原因を調査した結果、非常に多くのGetObjectが呼ばれていることがわかりBIツールで日々分析データを出すクエリにてパーティションが効いてない状態であると推定し、クエリ見直しを行っている最中です。
ところで、先月開催されたAWS Summit 2025にて「Amazon S3 によるデータレイク構築と最適化」という興味深いセッションがありました(参考資料)。Athenaで発行するクエリの最適化においてパーティションの課題解決策としてS3 Tablesを選択するという話が特に注目ポイントでした。
現状運用中のアーキテクチャはS3にそのまま保存しているデータをAthena経由で検索しているのですが、そのストレージをS3 Tablesに移行すればパーティションを気にすることなくBIツールでの重いクエリ発行を抑えることができ、S3コストを結果的に減らせるのではないか、という期待からS3 Tablesに即座にかつ簡単に移行できるのか実際に調査してみることにしました。
- 通常S3運用に関する過去記事:tech.every.tv
前提条件
- 現在運用中の通常S3を使用したデータレイクアーキテクチャを崩さない移行を考える
要件
- Amazon S3 Tablesを使うこと
- Amazon Data Firehoseを使ってS3 TablesにINSERTできること
- Amazon Athenaを使ってS3 TablesにSELECTできること
- INSERTするデータは既存のS3データと同じであること
S3 Tablesを導入したトランザクショナルデータレイクアーキテクチャの構築
データ集約管理において、通常のS3を使用したアーキテクチャをデータレイクアーキテクチャと呼びます。一方で、Apache IcebergサポートのS3 Tablesの特徴であるトランザクショナルを採用するアーキテクチャをトランザクショナルデータレイクアーキテクチャと呼びます(Amazon Redshiftのような従来のデータウェアハウスとの統合も兼ねる場合はさらにレイクハウスアーキテクチャという)。
今回の調査では、Terraformを使わずAWSコンソールだけで構築を試みました。手順自体は概ね以下のリンク記事を参照したり、公式DOCにある手順を参照にするだけで問題なく設定はできました。
あとは、この参照記事が2025年3月時点をベースにしていて5月14日にData Firehoseがresource linkを使わなくても直接S3 Tables用のGlue CatalogをDestinationに指定できるようになったのでその部分は省略できます。また、参照記事のCLIベースの手順部分も全てコンソール上で行えるようになっています。
ただし、1点注意があって2025年7月現在、リソースリンクを使わない場合、新規作成はできますが、ConfigurationのEditをするとSaveするときになぜかdefaultのGlue Catalogを見てしまうバグがあり、設定変更を完遂できませんので設定変更をする可能性を残しておきたい場合は、依然としてリソースリンクを使った方法で作成してください。
# Saveしようとしたら出てきたエラーメッセージ Your Firehose stream was not updated Unable to update the Firehose stream XXXXX. Wait a few minutes and try again. If the problem persists, go to AWS Support Center . API response CatalogConfiguration cannot be updated.
既存のデータを移行する方法を模索
問題発生
既存のS3ファイルをGetしてファイルを読み込んでS3 TablesにINSERTできるように成形し直してData Firehoseに渡すスクリプトをpython SDK put_record_batch
で実装して試してみたところ、CLIでテストしたときにBase64してからJSON形式にしてリクエストして成功したので同じリクエストデータを成形して実行するとエラーでコケてしまいました。
# 成功したときのCLIコマンド例 aws firehose put-record \ --delivery-stream-name "$DELIVERY_STREAM_NAME" \ --record "{\"Data\":\"$BASE64_DATA\"}"
# Destination error logs details "errorCode":"Iceberg.InvalidPayload","errorMessage":"Delivery failed due to invalid input format. Make sure that the payload provided is in JSON Format."
解決策
試行錯誤した結果、BASE64は不要のようでそのままPlainなJSON文字列を送ると上記のエラーが出なくなり、うまくINSERTされてAthenaでSELECTできました。
余談ですが、当初はData Firehoseに渡す時点でテーブルスキーマに存在しないカラム名(JSONオブジェクトキー名)が一つでもあるとうまくINSERTされなかったので使わないKey-Valueセットをリクエストデータから省く前処理を追加していたのですが、2025年7月現在は未定義のKey-Valueセットが付随しているJSONデータでも問題なくINSERTできていました。
これで後は移行タイミングで対象prefixの全オブジェクトに対してpythonスクリプトを実行すれば過去データの全移行も可能かと思います。
並行運用の場合
次に、既存の運用も継続したまま並行してS3 Tablesも導入して徐々に移行していくというのを想定してみます。導入後の新ログだけS3 TablesにもINSERTすることでそれ以降のログは見れるようにするという場合、先ほどのpythonスクリプトをLambdaにして指定のS3バケットのprefixにPUTされたタイミングをトリガーにセットしてオブジェクトをGetするようにすればうまくいきました。
新規データを直接配信する方法
まずは、単純にCloudWatch LogsのSubscription filtersのDestination ARNをS3 Tables用のData Firehose ARNに切り替えてパイプライン接続してみました。すると、当然のように以下のエラーでINSERT失敗してました。
# Destination error logs details "errorCode":"Iceberg.InvalidPayload","errorMessage":"Delivery failed due to invalid input format. Make sure that the payload provided is in JSON Format."
そこで、次にConfigurationのTransform recordsでLambdaで生ログを加工してデータを与える必要があるのでその設定を編集してSaveしようとしたら今度は以下のエラーが出てしまいました。
2 validation errors detected: Value at 'icebergDestinationUpdate.catalogConfiguration.catalogARN' failed to satisfy constraint: Member must have length greater than or equal to 1; Value at 'icebergDestinationUpdate.catalogConfiguration.catalogARN' failed to satisfy constraint: Member must satisfy regular expression pattern: arn:.*:glue:.*:\d{12}:catalog(?:(/[a-z0-9_-]+){1,2})?
どうやら既存のData Firehoseをコンソール上から設定追加するのは現状だと不可能のようです。。
次に新しくData Firehoseを作成し直してそこであらかじめ用意したLambda Functionをセットしておきました。すると、今後はLambdaにInvokeFunctionの許可を設定していないというエラーが出てしまいました。
"errorCode":"Lambda.InvokeAccessDenied","errorMessage":"Access was denied. Ensure that the access policy allows access to the Lambda function."
そこでData Firehoseに割り当てているIAMポリシーを編集してLambdaのInvoke権限を追加しました(前述の公式DOC参照)。そうするとlambda実行自体は成功したのですが、今度はLambdaの処理がエラーになってしまいました。
"errorCode":"Lambda.FunctionError","errorMessage":"Check your function and make sure the output is in required format. In addition to that, make sure the processed records contain valid result status of Dropped, Ok, or ProcessingFailed"
試行錯誤の結果、Data Firehoseにreturnで返す時のJSON文字列にする場合は、またdataキーの値がBase64エンコーディングされていないといけないということなのでそれを対応してようやくINSERTでき、AthenaでもSELECTできました!
まとめ
コンソール上での環境構築は完成できたのでS3からS3 Tablesへの移行はできることがわかりました。今後は、移行後に実際どれだけBIツールでのクエリ発行によるコスト削減に貢献したかを検討予定です。また、Terraformを使って新規プロダクト開発環境での構築を目指す予定です。
ちなみに、今回の調査で実際に半月ほどS3 Tablesにデータを流し続けていましたがBillsを見てみるとCompaction関連(コンパクションを行ったオブジェクトの数に対してのコストと圧縮処理を実行する(裏で密かにLambdaが走る?)コストの2種類)のコストも掛かっているのに気づきました。PUTのコストは通常のS3と同様でしたが、追加でCompaction関連のコストも嵩むのは注意が必要かもです。
最後に
エブリーでは、ともに働く仲間を募集しています。
テックブログを読んで少しでもエブリーに興味を持っていただけた方は、ぜひ一度カジュアル面談にお越しください!