はじめに
こんにちは。 開発本部 開発1部 デリッシュリサーチチームでデータエンジニアをしている吉田です。
エブリーではサイネージ端末を利用した広告配信サービスを提供しており、サイネージ端末からのログを収集しています。
従来はTreasureData-SDKを利用して端末からTreasureData上のテーブルにログを送信していましたが、ログ収集基盤を移行することになりました。
今回、AWSのマネージドサービスを活用して、スケーラブルなログ収集基盤を構築したので、その内容を紹介します。
背景
従来のログ収集基盤では、TreasureData-SDKを利用して端末からTreasureData上のテーブルにログを送信していました。
これはコストや処理の面で非常に便利でしたが、ログの送信がSDKに依存しているため、仮にTreasureData以外のログ収集基盤へ移行する場合には、アプリケーションのコードを変更する必要がありました。
サイネージ端末の都合上、アプリケーションのアップデートを極力行わずにログ収集基盤を変更できるようにしたい、という要望がありました。
そこで、アプリケーションの改修をすることなく、バックエンドをいつでも自由に変更できる、より柔軟なログ収集基盤の構築を目指しました。
アーキテクチャ
新しく構築したログ収集基盤は以下のコンポーネントで構成されています。
- API Gateway: アプリケーションからのログ送信用エンドポイント
- SQS: ログメッセージをバッファリング
- Lambda: SQSからメッセージを取得し、処理を行う
- Firehose: 処理されたログをS3に配信
- S3: ログの長期保存
この構成により、以下のメリットを実現しています:
- スケーラビリティ: 突発的な大量のログにも対応可能
- 信頼性: SQS によるメッセージの永続化とデッドレターキューによる失敗処理
- 運用負荷の軽減: マネージドサービスの活用
全体のアーキテクチャは以下のようになります。
コスト面では、リクエスト数に応じて課金されるAPI Gatewayの料金が、この基盤全体の大部分を占めることになります。
しかし今回は、そのコストよりも、将来的にログ収集基盤を柔軟に変更できるというメリットを重視し、この構成を採用しました。
詳細
API Gateway
API Gatewayはアプリケーションからログを受け取るエンドポイントを提供するとともに、SQSにメッセージを送信する役割を担います。
API Gateway から SQS へのインテグレーションでは、マッピングテンプレートを使用して、受信したJSONペイロードを SQS メッセージ形式に変換します。
terraformのコードは以下のようになります。
resource "aws_api_gateway_integration" "logs_post_integration" { rest_api_id = aws_api_gateway_rest_api.this.id resource_id = aws_api_gateway_resource.logs.id http_method = aws_api_gateway_method.logs_post.http_method integration_http_method = "POST" credentials = aws_iam_role.api_gateway.arn type = "AWS" uri = "arn:aws:apigateway:${var.aws_region}:sqs:path/${var.aws_account_id}/${aws_sqs_queue.primary.name}" request_parameters = { "integration.request.header.Content-Type" = "'application/x-www-form-urlencoded'" } request_templates = { "application/json" = "Action=SendMessage&MessageBody=$util.urlEncode($input.body)" } passthrough_behavior = "WHEN_NO_TEMPLATES" }
SQS
SQSはログメッセージをバッファリングし、Lambda関数による処理を待ちます。
API GatewayとLambdaの間にSQSを挟むことで、Lambdaの処理が失敗した場合でもメッセージを保持し、後で再処理できるようになります。
また、SQSのデッドレターキューを設定し、指定回数処理に失敗したメッセージを別のキューへ送信して処理させることで、エラーの確認を容易にするとともに、ログの喪失を防ぎます。
terraformのコードは以下のようになります。
resource "aws_sqs_queue" "dead_letter" {} resource "aws_sqs_queue" "primary" { name = "${var.prefix}-primary-queue" delay_seconds = 0 # 配信遅延秒 visibility_timeout_seconds = 30 # メッセージの可視性タイムアウト秒 max_message_size = 262144 # 256KB message_retention_seconds = 3 * 24 * 60 * 60 # メッセージの保持期間 (最大14日) receive_wait_time_seconds = 20 # メッセージ受信待機時間秒 sqs_managed_sse_enabled = true redrive_policy = jsonencode({ deadLetterTargetArn = aws_sqs_queue.dead_letter.arn maxReceiveCount = 3 # メッセージがDLQに移動するまでの最大受信回数 }) } resource "aws_sqs_queue_redrive_allow_policy" "this" { queue_url = aws_sqs_queue.dead_letter.id redrive_allow_policy = jsonencode({ redrivePermission = "byQueue", sourceQueueArns = [aws_sqs_queue.primary.arn] }) }
redrive_policyは、メッセージがデッドレターキューに移動する条件を設定します。
今回は最大3回の受信失敗でデッドレターキューに移動するように設定しています。
Lambda
Lambda関数はSQSからメッセージを取得し、処理を行います。
トリガーとしてSQSを設定し、メッセージがキューに追加されると自動的に起動します。
terraformのコードは以下のようになります。
resource "aws_lambda_function" "primary" { function_name = "${var.prefix}-log-sender" role = aws_iam_role.lambda.arn filename = data.archive_file.dummy.output_path architectures = ["arm64"] handler = "main.lambda_handler" runtime = "python3.13" memory_size = 128 # MB timeout = 5 # 秒 logging_config { log_format = "JSON" system_log_level = "WARN" application_log_level = "INFO" } lifecycle { # 環境変数の変更を無視する # 別リポジトリで管理 ignore_changes = [ environment, ] } } resource "aws_lambda_event_source_mapping" "primary" { event_source_arn = aws_sqs_queue.primary.arn function_name = aws_lambda_function.primary.arn enabled = false batch_size = 1 maximum_batching_window_in_seconds = 0 function_response_types = ["ReportBatchItemFailures"] scaling_config { maximum_concurrency = 500 } lifecycle { ignore_changes = [ # enabledを無視する # コードデプロイ後に手動で有効化する enabled, ] } }
aws_lambda_event_source_mappingのfunction_response_typesでReportBatchItemFailures
を指定しているため、Lambda関数で処理に失敗したメッセージだけがSQSに戻されます。
上記のTerraformではbatch_size=1と設定しているため、この設定は現状では効果がありませんが、将来的にbatch_sizeを変更する可能性を考慮して、このように設定しています。
Data Firehose
FirehoseはLambdaからデータを受け取り、S3に配信します。
Lambdaから直接S3に書き込む場合、S3のAPI呼び出しコストやマイクロファイル(小さなファイル)が多数生成されるといった問題が発生します。
特に、マイクロファイルが大量に存在すると、後段でデータ処理をする際のパフォーマンスが著しく低下する原因となります。
Firehoseを利用するとバッファリング、データの圧縮、パーティショニング、S3への書き込みなどを自動で行ってくれるため、これらの問題を解決できます。
terraformのコードは以下のようになります。
resource "aws_kinesis_firehose_delivery_stream" "primary" { name = "${var.prefix}-primary-stream" destination = "extended_s3" extended_s3_configuration { bucket_arn = aws_s3_bucket.this.arn role_arn = aws_iam_role.firehose.arn buffering_size = 128 # MB buffering_interval = 300 # 秒 file_extension = ".json" # プレフィックスの設定 prefix = "logs/!{partitionKeyFromQuery:event_name}/!{partitionKeyFromQuery:year}/!{partitionKeyFromQuery:month}/!{partitionKeyFromQuery:day}/!{partitionKeyFromQuery:hour}/" error_output_prefix = "errors/!{firehose:error-output-type}/" dynamic_partitioning_configuration { enabled = true } processing_configuration { enabled = true processors { type = "MetadataExtraction" parameters { parameter_name = "JsonParsingEngine" parameter_value = "JQ-1.6" } parameters { parameter_name = "MetadataExtractionQuery" parameter_value = "{year:.time | gmtime | .[0] | tostring,month:.time | gmtime | (.[1] + 1) | (. | if . < 10 then \"0\" + tostring else tostring end),day:.time | gmtime | .[2] | (. | if . < 10 then \"0\" + tostring else tostring end),hour:.time | gmtime | .[3] | (. | if . < 10 then \"0\" + tostring else tostring end),event_name:.event_name}" } } } cloudwatch_logging_options { enabled = true log_group_name = aws_cloudwatch_log_group.firehose_primary.name log_stream_name = aws_cloudwatch_log_stream.firehose_primary.name } s3_backup_mode = "Disabled" } server_side_encryption { enabled = true key_type = "AWS_OWNED_CMK" } depends_on = [ aws_iam_role_policy_attachment.firehose ] }
動的パーティショニングの設定により、ログ中に含まれるイベント名と時間を元にS3のプレフィックスを動的に生成します。
MetadataExtractionQueryでは、JQを利用してJSON形式のログデータからパーティション分割に必要なキー(イベント名、年、月、日、時)を抽出しています。
これにより、後続のETLなどで効率的にデータを処理できるようになります。
S3
S3はFirehoseによって配信されたログデータの長期保存先として機能します。
バージョニングを設定して意図しない削除に備えるとともに、ACLをプライベートに設定して外部からのアクセスを制限します。
Lambdaの実装
Lambda関数では、SQSから受け取ったメッセージを処理し、Firehoseに送信します。
以下に実装の一部を示します。
def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]: batch_item_failures: List[Dict[str, str]] = [] record: Dict[str, Any] for record in event.get("Records", []): message_id: Optional[str] = record.get("messageId") try: message_body_str: Optional[str] = record.get("body") log_data: Dict[str, Any] = json.loads(message_body_str) # Send to Firehose firehose_success: bool = send_to_firehose(log_data) # その他の送信 if not firehose_success: batch_item_failures.append({"itemIdentifier": message_id}) except json.JSONDecodeError as e: batch_item_failures.append({"itemIdentifier": message_id}) continue except Exception as e: batch_item_failures.append({"itemIdentifier": message_id}) continue if batch_item_failures: return {"batchItemFailures": batch_item_failures} else: return {"batchItemFailures": []}
前述のLambdaのterraformコードでfunction_response_types = ["ReportBatchItemFailures"]
としているため、関数の戻り値は{"batchItemFailures": [失敗したメッセージID]}
とします。
これにより、複数メッセージを受け取った際に、失敗したメッセージのみをSQSに戻せます。
エラーとなるメッセージがない場合は、{"batchItemFailures": []}
を返します。
Lambdaのデプロイ
Lambdaのコードデプロイにはlambrollを利用しています。
コードと環境変数を管理するリポジトリと、Terraformでインフラを管理するリポジトリを分離しています。
これにより、インフラの変更とアプリケーションコードの変更のライフサイクルを分け、それぞれが独立して安全にデプロイできる体制を整えています。
lambroll init --function-name {function-name} --download --profile {aws-profile}で既存のLambdaのコードと設定ファイルをダウンロードできます。
このプロジェクトでは、function.dev.jsonのようにfunction.{env}.jsonという形式で環境ごとの設定ファイルを管理しています。
コードやfunction.jsonの編集後、lambroll deploy --function=function.{env}.json --profile {aws-profile}
でデプロイを行います。
まとめ
今回、AWSのマネージドサービスを活用して、スケーラブルなログ収集基盤を構築しました。
API Gatewayを利用することで、アプリケーションのアップデートを行うことなく、ログ収集基盤を変更できる柔軟性を持たせています。