every Tech Blog

株式会社エブリーのTech Blogです。

データ品質担保のために取り組んだこと

はじめに

こんにちは!
開発1部デリッシュキッチンの蜜澤です。
現在はデリッシュリサーチという、食トレンド分析ツールの開発を行っています。
本記事では、デリッシュリサーチで提供するデータの品質担保をするために行なったことを紹介させていただきます。

データ品質担保の必要性

デリッシュリサーチは食トレンドを分析するために、ダッシュボードで様々なデータを提供しています。
データが間違っていると、誤った意思決定につながるおそれがあるため、データの正確性に細心の注意を払う必要があります。

また、提供するデータが多岐にわたるため、テーブルの依存関係が複雑になっていきつつあり、放置しておくと集計のロジックを間違えてしまう可能性や、似たようなデータで整合性が保たれない可能性があります。
実際のER図を一部抜粋すると以下のようになっており、結構複雑です。

複雑な集計の中でも安心してサービスをご利用していただくために、データの品質の担保には特に注力しています。

実際に取り組んだこと

以下の2つの取り組みを行いました。

  • テーブル作成処理の単体テストの作成
  • 元となる検索ログデータの増減に対するアラート作成

1つずつ詳細を紹介させていただきます。

テーブル作成処理の単体テストの作成

前述のER図を見てわかるようにテーブル数が多く、ETLが複雑になってしまうだけではなく、各テーブル作成の処理でも複雑なことをしているため、各テーブル作成の処理ごとに単体テストを行うようにして、テーブルごとに品質を担保できるようにしました。
入力データと期待される出力データを作成し、入力データを使用して実際のテーブル作成処理を行い、出力されたデータと期待される出力データを比較し、完全に一致しているかどうかを確認します。
具体例を用いて、どのようにテストを行なったのか紹介します。
ETLの作成はdatabricksを使用しています。

処理内容

検索ログデータから指定した期間(2024-01-01~2025-12-31)のデータを抽出し、検索ワードカラムの前後のスペースを削除する。
※今回は簡単な例にしていますが、実際にはもっと複雑な処理を行なっています。

コードの実装例

実際に作成したテストのコードを簡略化したものを紹介します。

期待される出力データを作成して、データフレームに格納します。
最終的にデータフレームが一致しているかのテストを行うため、全てのカラムでソートを行なっています。

columns = ['event_date', 'user_id', 'search_word']

expected_data = [
    ('2024-01-01', 2, 'キャベツ'),
    ('2024-01-01', 5, 'キャベツ'),
    ('2024-01-01', 6, 'キャベツ'),
    ('2024-01-01', 7, 'キャベツ'),
    ('2024-01-01', 8, 'キャベツ'),
    ('2024-01-01', 9, 'キャベツ'),
    ('2024-01-01', 10, 'キャベツ'),
    ('2024-01-01', 11, 'キャベツ 豚肉'),
    ('2024-01-01', 12, 'キャベツ 豚肉'),
    ('2025-12-31', 3, 'キャベツ')
]

expected_df = pd.DataFrame(expected_data, columns=columns).sort_values(['event_date', 'user_id', 'search_word'], ascending=[True, True, True]).reset_index(drop=True)

入力データ(検索ログデータ)を作成します。
以下の項目を確認できるように作成します。

  • 2024-01-01~2025-12-31のデータのみが抽出されるか
  • 前後のスペースが削除されるか
  • 前後ではない場所にスペースが入っている場合に削除されないか
columns = ['event_date', 'user_id', 'search_word']

input_data = [
    # 期待通りの期間のデータが入るかの確認
    ('2023-12-31', 1, 'キャベツ'),
    ('2024-01-01', 2, 'キャベツ'),
    ('2025-12-31', 3, 'キャベツ'),
    ('2026-01-01', 4, 'キャベツ'),
    # スペース削除の確認
    ('2024-01-01', 5, ' キャベツ'),
    ('2024-01-01', 6, 'キャベツ '),
    ('2024-01-01', 7, ' キャベツ'),
    ('2024-01-01', 8, 'キャベツ '),
    ('2024-01-01', 9, ' キャベツ '),
    ('2024-01-01', 10, ' キャベツ '),
    ('2024-01-01', 11, 'キャベツ 豚肉'),
    ('2024-01-01', 12, ' キャベツ 豚肉 ')
]

input_df = pd.DataFrame(input_data, columns=columns)
spark_input_df = spark.createDataFrame(input_df).createOrReplaceTempView("spark_input_df")

作成した入力データを開発環境のDeltaテーブルへ書き込みます。

input_data = spark.sql((f"""
    SELECT
        event_date,
        user_id,
        search_word
    FROM
        spark_input_df
"""))

input_data\
    .write\
    .format("delta")\
    .mode("overwrite")\
    .partitionBy("event_date")\
    .save(delta_table_path/search_log)

実際のETLの処理を開発環境で実行します。
処理内容の詳細は後述します。

args = {
    "env": "dev",
    "date": "2025-12-31"
}
dbutils.notebook.run("../01_SearchLog", 0, args)

上記の処理で書き込まれたデータを読み込み、データフレームに格納します。
データフレームの完全一致比較を行うため、全てのカラムでソートしています。

output = spark.sql((f"""
    SELECT
        event_date,
        user_id,
        search_word
    FROM
        delta_table_path/search_data
    ORDER BY
        event_date,
        user_id,
        search_word
"""))

output_df = output.toPandas()

出力データと期待出力データを比較します。

def assert_output_equals_expected(output_df: pd.DataFrame, expected_df: pd.DataFrame):
    output_df = output_df.reset_index(drop=True) 
    try:
        assert_frame_equal(output_df, expected_df)
        print("データフレームが完全に一致しています。")

    except AssertionError as e:
        dbutils.notebook.exit(f"データフレームが一致しませんでした:\n{e}")

assert_output_equals_expected(output_df, expected_df)

実際に実行した01_Searchの処理の内容はこちらになります。

dbutils.widgets.text("env", "dev")
dbutils.widgets.text("date", "yyyy-MM-dd")

end_date = dbutils.widgets.get("date")
env = dbutils.widgets.get("env")

search_data = spark.sql(f""" 
    select 
        event_date,
        time,
        user_id,
        regexp_replace(search_word, '^[\\u0020\\u3000]+|[\\u0020\\u3000]+$', '') as search_word --前後の半角スペースと全角スペースを削除
    from 
        delta_table_path/search_log
    where
        event_date >= '2024-01-01'
        and event_date <= '{end_date}'
""")

search_data\
    .write\
    .format("delta")\
    .mode("overwrite")\
    .partitionBy("event_date")\
    .save(delta_table_path/search_data)

課題・改善点

1つの処理を変更したことで、他の処理にも影響が及ぶ可能性があるため、基本的には全ての処理に対してテストを実行するのですが、量が多いため、現状だと全て完了するまでに1時間半ほどかかってしまいます。
依存関係を整理して、必要最低限のもののみテストを実施するようにすれば改善できるとは思います。

元となるログデータの増減に対するアラート作成

テーブル作成処理の単体テストの作成によって、デリッシュリサーチのために作成するテーブルのデータの品質は担保されるようになりますが、元の検索ログ自体に不具合が起きた場合には対処することができません。
例えば、検索ログデータのETLが遅延して、件数が正常ではなかった場合は、今回紹介した単体テストのみでは、対処することができません。
そこで、元の検索ログデータを使用して、検索ログ数の前週比を毎日集計して、閾値を上回る増加率・減少率だった場合にアラートを出す仕組みを作成しました。
アラートはPdMの人でも触れるようにするために、redashで作成しました。
以下のようなクエリを作成して、alert_flag=1となるレコードがあった場合にアラートを出すようにしました。

アラート用のSQL

実際にはユーザー属性ごとのログ数も確認しますが、今回は全体のログ数のみを確認するクエリを紹介します。

WITH search_log AS (
  SELECT
    event_date,
    user_id,
    search_word
  FROM 
    search_log
  WHERE
    event_date >= DATE_SUB(FROM_UTC_TIMESTAMP(CURRENT_DATE(), 'Asia/Tokyo'), 8)
    AND event_date < FROM_UTC_TIMESTAMP(CURRENT_DATE(), 'Asia/Tokyo')
),
daily_count AS (
  SELECT
    event_date,
    COUNT(*) AS count
  FROM 
    search_log
  GROUP BY 
    event_date
)
growth_rate AS (
  SELECT
    event_date,
    count AS current_count,
    LAG(count, 7) OVER (ORDER BY event_date) AS prev_week_count,
    ROUND((count - LAG(count, 7) OVER (ORDER BY event_date)) * 100.0 / LAG(count, 7) OVER (ORDER BY event_date), 2) AS growth_rate
  FROM
    daily_count
)
SELECT
    event_date,
    current_count,
    prev_week_count,
    CASE
        WHEN growth_rate < -20 OR growth_rate > 20 THEN 1
        ELSE 0
    END AS alert_flag
FROM
    growth_rate

課題・改善点

現状は過去のデータを元に1年のうち年n回程度発生する増加率・減少率をalert_flagの閾値に設定していますが、このnが決め打ちになってしまうので、適切な閾値を設定するのが難しいです。
閾値を厳しくし過ぎると本当に問題が起きている時に気付けず、閾値を緩くしすぎてしまうと頻繁にアラートが鳴り信憑性がなくなってしまうので、運用していく中でちょうど良い閾値を見つけていきたいです。

まとめ

データの品質担保をするために取り組んだことについて紹介させていただきました。
課題や、レビューとテスト作成にかなりの時間がかかるといった辛い点はありますが、データの品質担保ができているため、実施してよかったと思っています!
同じような課題を持っている方の参考になれたら幸いです。
最後まで読んでいただきありがとうございました。