背景
こんにちは、開発本部 DELISH KITCHEN Retail HUB NetSuperグループに所属するフルスタックエンジニアをやらせていただいています、ホーク🦅アイ👁️です。2024/2/9、Amazon Kinesis Data Firehose から Amazon Data Firehoseに名称変更されてから半年ほど経過しておりますが最新の設定情報などが公開されていることが少ないと感じたため今回記事を書くに至りました。
前提
今回の記事を書くにあたっての前提条件は以下のようになっております。
- ECS Fargate上にWEBアプリケーションコンテナが存在し、そのコンテナは標準出力にアクセスログを出力している
アクセスログは、JSON形式ログとスペース区切り形式ログが混在している
# JSON形式ログ例 {"message": "access", "key": "value"} # スペース区切り形式ログ例 127.0.0.1 - - [09/Sep/2024:01:54:49 +0000] "GET /healthcheck HTTP/1.1" 200 236 "-" "curl/7.74.0"
- TaskDefinitionで
logDriver
をawslogs
に設定しその標準出力はCloudWatch Logsに常に転送されている - 主な作業を調査・実施した年月日は2024年7月12日
要件
要件1
- WEBアプリケーションログはJSON形式ログとスペース区切り形式ログが混在しているのでそれぞれを別のS3プレフィックスに保存したい
要件2
- JSON形式ログは{unique_code}/日付でパーティションをして1行ずつ改行されるようにS3保存させたい
- スペース区切り形式ログはノンパーティションで1行ずつ改行されるようにS3保存させたい
- 但し、これらは自前のLambdaを使わずに行うこと!
実装手段
要件1に対する実装
CloudWatch Logsのサブスクリプションフィルターを使うとログの種類を2つに分けることができます。
コンソール上にも注意書きがありますがサブスクリプションフィルターはLog Groupごとに2個までしか設定できないので注意してください。以下、Terraformで3つ目を適用した時のエラーです。
Error: putting CloudWatch Logs Subscription Filter (XXXX): operation error CloudWatch Logs: PutSubscriptionFilter, exceeded maximum number of attempts, 25, https response error StatusCode: 400, RequestID: XXXX, LimitExceededException: Resource limit exceeded.
Terraformを使って反映
# pointだけ絞ってピックアップ resource "aws_cloudwatch_log_subscription_filter" "application_log" { name = "application-log" role_arn = var.subscription_filter_webserver_arn log_group_name = aws_cloudwatch_log_group.webserver.name destination_arn = var.kinesis_firehose_stream_applicationlog_arn filter_pattern = "{$.message = \"access\"}" } resource "aws_cloudwatch_log_subscription_filter" "access_log" { name = "access-log" role_arn = var.subscription_filter_webserver_arn log_group_name = aws_cloudwatch_log_group.webserver.name destination_arn = var.kinesis_firehose_stream_accesslog_arn filter_pattern = "\" \"" }
- スペース区切り形式ログのfilter_patternは半角スペース1文字で可能でTerraformで記述するときはダブルクォーテーションで括る必要があります。
要件2に対する実装
- Terraformを使って反映
# pointだけ絞ってピックアップ dynamic_partitioning_configuration { enabled = "true" } processing_configuration { enabled = "true" processors { type = "RecordDeAggregation" parameters { parameter_name = "SubRecordType" parameter_value = "JSON" } } processors { type = "MetadataExtraction" parameters { parameter_name = "JsonParsingEngine" parameter_value = "JQ-1.6" } parameters { parameter_name = "MetadataExtractionQuery" parameter_value = "if .context.unique_code then {unique_code: .context.unique_code} else {unique_code: \"NONE\"} end" } } processors { type = "Decompression" parameters { parameter_name = "CompressionFormat" parameter_value = "GZIP" } } processors { type = "CloudWatchLogProcessing" parameters { parameter_name = "DataMessageExtraction" parameter_value = "true" } } processors { type = "AppendDelimiterToRecord" } }
- JSON形式ログの方は、Dynamic Partitioning(動的パーティショニング)を使えばJSONパースが行われてS3のプレフィックスにパーティションされます。この際、1ログごとに付与される改行コードがなくなってしまいます。そこで改めてDestination(送信先)データに改行コードを付与するためにProcessorsのAppendDelimiterToRecord typeを設定する必要があります。Terraformで適用するとコンソール上では「
New line delimiter(改行の区切り文字)
」がenabled(有効)になります。一方で、スペース区切り形式ログの方はDynamic Partitioningを使う必要がないのでTransform and convert records(レコードを変換および転換)機能を行うだけで改行コードが付与されているので改めてAppendDelimiterToRecordを設定する必要はありません。
問題
問題1
Dynamic PartitioningのJSONパースにおいて指定したパーティションキーにNULL値があるときパーティション作成に失敗していた
"errorCode":"DynamicPartitioning.MetadataExtractionFailed","errorMessage":"partitionKeys values must not be null or empty"
問題2
Firehose自体のエラーログに以下のエラーメッセージが出てS3保存に失敗していた
errorCode":"DynamicPartitioning.MetadataExtractionFailed","errorMessage":"Non UTF-8 record provided. Only UTF-8 encoded data supported"
問題3
terraform applyを実行すると以下のエラーメッセージが出て適用に失敗した
operation error Firehose: UpdateDestination, https response error StatusCode: 400, RequestID: d7b3d246-ecb3-a51f-88ba-1557ca6eae2a, InvalidArgumentException: Enabling source decompression is not supported for existing stream with no Lambda function attached.
- Terraform provider hashicorpのバージョンを5.44->5.58(当時の最新版)に上げるも変化なし
問題4
- Athenaでクエリ発行するときに、0件ヒットになってしまう
- JSONコンテンツのみデータ取得できていない感じ
- 結論、S3に保存しているデータはGZIP圧縮されているが拡張子が.jsonなのでそれを認識できていないため解凍せずにそのまま読み込もうとしている様子
- 拡張子を.gzにしたら読み込めた
- プレーンテキスト状態の.jsonファイルと圧縮状態の.gzファイルを混在しても両方同時に読み込めていた
- JSONコンテンツのみデータ取得できていない感じ
解決策
問題1に対する解決策
- JQのif-then-else制御構文を使うことでNULL値を特定の固定文字列に置換してしまえば解決できました。以下のようにTerraformでparameter_value値に記述して適用します。尚、この値は長文なので実際にコンソール上で設定画面のDynamic partitioning keys表示を見ても文字列全文は出てこないですが正しい挙動になるので問題ありません。
parameter_value = "if .context.unique_code then {unique_code: .context.unique_code} else {unique_code: \"NONE\"} end"
問題2に対する解決策
原因調査をするにあたって、ChatGPTに質問してその情報を足がかりに進めることにしました。
質問スクリプト
"errorCode":"Lambda.ProcessingFailedStatus","errorMessage":"ProcessingFailed status set for record" というエラーが出て先ほど教えたJSONデータログをutf-8変換に失敗しました。なぜでしょうか?
回答
・Firehoseに渡す前にLambda関数でutf-8に変換しておく
- ChatGPTの回答を受けて「結局Lambdaを間に挟まないとダメなのか?!」と疑問視しつつ、該当するBluePrintのLambda関数をReadingするとそもそもCloudWatch LogsからFirehoseに渡る過程でサブスクリプションフィルターを利用するとデータ構造が生ログではなくなっていることが判明しました。つまり、Firehoseに渡るデータは何もしないと圧縮データかつJSON構造変更、そして元ログデータ自体がBASE64エンコード化していました。故に、解凍、パース、デコード処理が必要になるということでした。
{ "messageType": "DATA_MESSAGE", "owner": "123456789012", "logGroup": "log_group_name", "logStream": "log_stream_name", "subscriptionFilters": [ "subscription_filter_name" ], "logEvents": [ { "id": "01234567890123456789012345678901234567890123456789012345", "timestamp": 1510109208016, "message": "log message 1" }, { "id": "01234567890123456789012345678901234567890123456789012345", "timestamp": 1510109208017, "message": "log message 2" } ... ] }
- 参考)Transform source data in Amazon Data Firehose - Amazon Data Firehose
- 便利なサブスクリプションフィルターを使わない選択肢はないので、これを使いつつパースするFirehoseの新たな機能が2024年2月27日にリリースされたようでその機能を使うことで自前でLambda関数を用意せずにFirehoseの機能だけで解決させることができました。
- 具体的には、コンソールで言うところの特定Data Firehoseのconfigurationページを開いて「Transform and convert records」→「Decompress source records from Amazon CloudWatch Logs」と「Extract message fields only from log events」をONにします。
- ちなみに、 Firehose新機能の内部構造は結局Lambda関数を使っているようです(メッセージ抽出機能自体は追加料金はありませんがおそらくCloudWatch Logsからのソースレコード解凍機能部における追加料金はLambda関数料金分なのではないかと推測)。
問題3に対する解決策
- 結論としては、Terraformでresourceのattributeであるprocessing_configuration内でDecompressionとCloudWatchLogProcessingの2つのtypeのみを設定すれば解決策2の設定が適用されます。しかし原因はよくわからないですがTerraformでDecompressionとCloudWatchLogProcessingを設定せずにFirehoseリソースを新規作成した後に設定変更という形で前述の2つのtype設定をTerraformで更新適用しようとするとエラーになった(plan時はエラーは出ない)ので途中で変更する場合は工夫が必要ということが判明しました。以下、具体的な工夫手順です。
- まず適当に既存のLambda関数を用意した上でそのLambdaを使って変換処理を行う記述も追記して適用
- その後、Lambda関数による変換記述をコメントアウトなどしてそのprocessors部分だけ削除更新するterraform applyを実施することが可能なのでそれを適用
processors { type = "Lambda" parameters { parameter_name = "LambdaArn" parameter_value = "arn:aws:lambda:${region}:${account_id}:function:${parse_func_name}:$LATEST" } }
問題4に対する解決策
- FirehoseでS3に転送する時に再圧縮して保存させているのでそのファイルが圧縮されたファイルであることをAthenaに認識させるためにはContent-Typeではなく拡張子を.gzにすることであると判明したため、その設定をTerraformに施し適用します。
resource "aws_kinesis_firehose_delivery_stream" "app_server_log" { name = "app-server-log" destination = "extended_s3" extended_s3_configuration { bucket_arn = var.access_logs_arn role_arn = var.iam_role_kinesis_stream_app_server_arn buffering_interval = 300 buffering_size = 64 compression_format = "GZIP" custom_time_zone = "Asia/Tokyo" file_extension = ".json.gz" # <= この部分!
- ここで、元々の経緯は、圧縮しているにもかかわらず。.json拡張子にしていた理由としてChromeブラウザでS3コンソールからダウンロードを実行した時、自動で圧縮ファイルを認識して解凍してローカルディスクに保存する。その時に拡張子.gzのままだとファイルを開いたときにエラーが出て開けないのでファイル名変更で拡張子を手動で.gz部分を削除して.json拡張子にさせてから開かないといけないという手間が発生することに起因します。
総括
結論
- Amazon Data Firehose解凍機能のみを使ってCloudWatch LogsのログメッセージをS3に転送すること自体は他の記事でもありました。しかし、本記事のようにJSON形式ログの変換を追ってTerraformを使ってまとめている記事はなかったように思います。
- Amazon Data Firehose解凍機能を使うべき理由は、公式DOCによるLambdaで提供しているblueprints関数テンプレートが非推奨(deprecated)となっていることにあります。ただし、2024年9月執筆時点では、未だLambda関数を作成するコンソールで該当のblueprintsを選択および作成できます。
- Dynamic Partitioningを使う際に、JQのif-then構文を駆使してパーティションキーを柔軟に設定できました。
- Amazon Data Firehose解凍機能をTerraformで適用する場合、注意点があることがわかりました。
現状の課題
- Amazon Data Firehose解凍機能が有償である点ですが、blueprintsのLambdaを設置した時の料金と比較しても大差がないくらい安い点で現状は採用しております。もし完全に無償でこの辺りを構築する場合は、そもそもECS FargateでFluentdなどの外部ログエージェントを使って直接S3に転送するアーキテクチャを採用することになると思います。
みなさまの快適なログライフを!
参考
改行系記事
- https://dev.classmethod.jp/articles/amazon-kinesis-data-firehose-transform-source-records-with-aws-lambda/
- https://dev.classmethod.jp/articles/kinesis-data-firehose-dynamic-partitioning-json-parse/
- https://dev.classmethod.jp/articles/the-idea-of-automatically-inserting-newline-codes-between-records-without-using-a-lambda-processor-in-amazon-kinesis-data-firehose-aws-cdk/
- [アップデート] Amazon Data Firehose に CloudWatch Logs ログイベントからメッセージデータのみを抽出出来るオプションが追加されたので有効にしてみた | DevelopersIO
- インフラエンジニアが生成AIを活用してログ解析してみた
公式DOC
Terraform関連