はじめに
こんにちは。DELISH KITCHEN開発部の村上です。
エブリーが運営しているサービスのDELISH KITCHENやトモニテではプレゼントキャンペーンが定期的に行われており、ユーザーさんは開催中の複数のキャンペーンから気になるものを選んでいくつかの設問に答えることで応募することができるようになっています。
今回はそのプレゼントキャンペーンのETL基盤をStep Functionsを利用してサーバーレスで構築した話を紹介させていただきます。
Step Functions導入の背景
当時の技術選定で意識したことは次のようなことでした。
- 運用面と費用面で低コストで実現できる
- データエンジニア以外でも開発・運用できる
- 将来的なデータの増加や実行頻度の変更に対応してスケールできる
特にこの基盤においては初めは少人数で運用コストをいかに下げてPoCしていけるかが重要だったので運用コストは重要な判断基準になっていました。その中でAWSではワークフローエンジンの選択肢としてGlue WorkflowやMWAAもありましたが、AWSサービスとの連携も豊富でjobの実行基盤が縛られず、当時はデータ量や処理負荷からしてjobとしてlambdaを採用することでコストも最小限にできるStep Functionsを選びました。また、Step FunctionsはGlue jobとも接続可能なので、データ量の増加により処理負荷が高くなってくれば、差し替えも検討でき、柔軟性が高いことも決め手となりました。
ステートマシンの構成
Step Functionsのワークフローはステートマシンと呼ばれています。ここからはエブリーで使っているステートマシンの構成とStep Functionsの機能を使ってどういうふうにETLを実現しているかを説明させていただきます。
全体
全体としては現状は大きく分けて、データ変換とカタログ更新の二つに分けれており、ステートマシンの実行はEventBridgeからスケジュールして定期的に行っています。オンタイムでの別データ基盤への転送をする場合はこのステートマシンを拡張することによって依存関係を制御することができます。次にそれぞれの処理の詳細を説明します。
データ変換
プレゼントキャンペーンの要件として、それぞれのキャンペーンは独自のデータを持っていて、そのデータ変換の要件もキャンペーンやその実施期間によって異なります。また、開催されるキャンペーン数も次第に多くなっていくことが想定されていたのでStep Functionsで提供されているMapステートを用いた動的並列化をしてそれぞれの変換処理からS3の格納までを行なっており、ある程度データ量が多くなってきた時にでもlambdaの実行時間の制限を回避できるようにしています。Mapステートに対してあらかじめ登録してあるキャンペーン内容を以下のような構造で渡して並列実行を行なっており、自身で指定することで特定のキャンペーンの再実行も可能です。
{ "campaigns": [ { "platform": "service1", "campaign": "campaign1" }, { "platform": "service2", "campaign": "campaign2" } ] }
カタログ更新
前のフローで求められた要件でのデータ変換までは満たせていますが、このままだと他の基盤へのデータ転送や分析が困難なため、Athenaで抽出できるようにしています。ただ、先ほど説明させていただいた通り、キャンペーンはそれぞれ独自のスキーマを持っており、自前でやろうとすると毎回自分でデータカタログのスキーマ定義をしないといけません。そこでその運用コストを省くためにGlue Crawlerで変換したS3のデータを読み込ませて自動でカタログが作られるようにしています。Step Functionsには非同期な処理をWaitステートとChoiceステートを活用して待てるようになっており、実行結果が返ってきたらChoiceステートを使ってステートマシンを終了できるようになっています。
活用する上でのTips
このようなステートマシンを構築していく中でStep Functionsを活用する上でのTipsがいくつかあったので紹介させていただきます。
エラー通知
Step Functionsはステートマシン自体がエラーになった時を発火タイミングとしてEventBridgeと連携することができるようになっているのでそれを組み合わせることで簡単に通知自体はできてしまいますが、EventBridgeが受け取れる情報には制約があり、ステートマシンの何がどんな理由で失敗したのかがコンソールに行くまでわかりません。また後述しますが、一部処理が失敗したときにステートマシン自体を失敗とできないケースもあるのでEventBridgeだけでは満たせないこともあり、EventBridgeに加えて他の通知手段を用意しないといけない状況でした。
そこで実行エラーをハンドリングするCatchフィールドを使って、エラー情報を取得し、エラー通知を送るlambdaを独自で用意しています。後続のタスクに以下の情報をlambdaであればeventとして受け取れるのでそれを通知したい内容によって加工することで素早くエラー内容を把握できる体制を作ることができます。
{ "FunctionName": "step functionsのarn", "Payload": { "Execution": { "Id": "実行ID", "Input": { // それぞれの環境でのjobの発火内容の詳細が入る }, "StartTime": "実行開始時間", "Name": "実行名", "RoleArn": "ロールのarn" }, "param": { "Error": "Error", "Cause": "エラー内容" } } }
並列実行時のジョブキャンセル回避
先ほどのデータ変換のフェーズでMapステートによる動的並列処理を行なっていると説明しましたが、実はMapステートを使っただけだと並列する処理の一部が失敗した時に後続の処理は全て停止してしまいます。これは求められる要件にもよると思いますが、ある一部の処理が失敗してもそのエラーを通知しつつ、それ以外の処理は続けて欲しいケースはあると思います。
そういった場合に並列処理の中にエラー時の例外処理を追加し、エラー通知を行った上で正常終了させることによって実現可能です。これにより他の処理は後続のタスクに移行させ、影響を最小限にとどめることができます。
ペイロードサイズの制限回避
Step Functionsでは各処理で実行した結果を後続のタスクに渡すためにデータの受け渡しを行うシーンが多いかと思います。ただ、ペイロードサイズには制限があり、遷移時に渡せるデータは256KBまでです。例えばあるMapステートでの並列処理の返り値がサイズを超える場合、以下のようなエラーが出て処理は中断されてしまいます。
The state/task 'Map' returned a result with a size exceeding the maximum number of bytes service limit.
これを防ぐためにステートマシンを作る上で意識したい2つのポイントがあります。
大きいペイロードはS3や外部ストレージを使う
あらかじめ大きくなると予想されるデータは後続のタスクに渡す前にデータをS3やDBに保存して、その格納先の情報を後続に渡すことにより、データサイズを小さくすることができます。 また、同じ問題は並列処理を実行するMapステートに渡すJSON配列で起きる可能性もあります。現在ではStep FunctionsにDistributed MapステートがMapステートとは別で提供されており、S3内の大規模データをもとに並列処理が実行できるようになっているので、並列数や並列時に渡したいデータ量が増える場合には利用を検討すると良いと思います。
出力をフィルタリングして必要なデータに絞って渡す
lambdaなどで挙動を制御できる場合にはその処理の中で出力を最小限にすると思いますが、AWSのサービスとノーコードで連携する場合にその処理が出力するデータ自体は制御することができません。例えば、ステートマシンの中でathenaのクエリ結果を取得しようとすると以下のようなJSONが出力されます。
{ "QueryExecution": { "EngineVersion": { "EffectiveEngineVersion": "Athena engine version 2", "SelectedEngineVersion": "Athena engine version 2" }, "Query": "発行したクエリ", "QueryExecutionContext": { "Database": "データベース" }, "QueryExecutionId": "実行ID", "ResultConfiguration": { "OutputLocation": "s3のロケーション" }, "ResultReuseConfiguration": { "ResultReuseByAgeConfiguration": { "Enabled": false } }, "StatementType": "DML", "Statistics": { "DataScannedInBytes": 191831, "EngineExecutionTimeInMillis": 1704, "QueryPlanningTimeInMillis": 796, "QueryQueueTimeInMillis": 647, "ResultReuseInformation": { "ReusedPreviousResult": false }, "ServiceProcessingTimeInMillis": 85, "TotalExecutionTimeInMillis": 2436 }, "Status": { "CompletionDateTime": 1693159279869, "State": "SUCCEEDED", "SubmissionDateTime": 1693159277433 }, "SubstatementType": "SELECT", "WorkGroup": "ワークグループ名" } }
ただ、多くの場合で欲しいデータは実際にクエリ結果がどこのs3に保存されているかだと思います。その場合には次のように入出力制御のためのOutputPathパラメータを使ってフィルタリングをすることによって欲しいデータだけを後続に渡すことができます。
$.QueryExecution.ResultConfiguration
実行結果
{ "OutputLocation": "s3のロケーション" }
他にも出力制御にはResultSelectorやResultPathなど出力をコントロールするパラメータが提供されており、これらを活用することにより欲しい結果に絞ったデータを作ることができます。
現状の課題
ここまでエブリーにおけるStep Functionsの活用事例について紹介させていただきましたが、しばらく運用している中で次のような課題も出てきました。
ステートマシンのIaC化
ステートマシン自体はASL(Amazon States Language)で記述されているのでコードで管理できないわけではありません。ただ、個人的にこの記述自体の学習コストが高く、ステートマシンが複雑になればなるほどコンソールのビジュアライズされたWorkflow Studioで組んだ方が直感的なケースが多いと感じます。
今はそれほどステートマシン自体の変更を行う機会が少ないので大きな問題にはなっていませんが、今後の運用を考えるとIaC化は行なっておいた方が良いので、AWS CDKやTerraformでの管理方法などチームでPoCを行っていきたいと思っています。
各lambdaの管理
Step Functionsでステートマシンを作る場合にペイロードのちょっとした加工をして後続の処理に渡したいケースもたまにあり、提供される機能では足りない場合に徐々にlambdaの数が増えていってました。特に今回紹介しているステートマシン以外にいくつか別で新しくステートマシンを作っていくとなった時にコードを管理しているリポジトリでどのlambdaがどのステートマシンに影響しているのか見通しが悪くなってきています。
ここについては共通部分とそれぞれのステートマシン独自に使われるlambdaを階層わけして、まとめていきながら、前述したステートマシンのIaC化を推進できればその依存関係も新しいメンバーが理解しやすくなるのではと考えているので、チームで話しながらリファクタリングを進めていきたいです。
おわりに
今回のStep Functions導入により、コストを最小限まで削減しながらETL基盤構築を行うことができました。今後、運用するチーム体制や扱うデータの規模によってはGlueやMWAA、他のよりデータ処理に特化したサービスを選択した方が良い場合もあると思うので、事業の状況に合わせてシステム改善ができればと考えています。
またエブリーでは日々たくさんのバッチ処理が実行されていますが、その中では依存関係を起動タイミングで暗黙的に制御しているものもあったりするのでデータのETLだけではなく、そういったバッチ処理の実行にも今後ワークフローエンジンの導入を考えていきたいです。
ここまでお読みいただき、ありがとうございました。