every Tech Blog

株式会社エブリーのTech Blogです。

【2025春】DynamoDB Itemの一括削除を実践

【2025春】DynamoDB Itemの一括削除を実践
【2025春】DynamoDB Itemの一括削除を実践

 こんにちは、開発本部 RetailHUB開発部 NetSuperグループに所属するフルスタックエンジニアをやらせていただいています、ホーク🦅アイ👁️です。早春、3/6に「AWS Innovate: Generative AI + Data」が開催され、今後も生成AIアプリケーションと親和性の高いDynamoDBと向き合う方々が増えていくと思い本記事を書くことにしました。

背景

 弊社ではネットスーパーシステムのオプションサービスとしてネイティブアプリも提供しております。そのインフラアーキテクチャの一部にDynamoDBがあります。主に、アプリ利用ユーザのログインセッションを管理しています。ごく最近、小売様の要望対応を行った結果、アプリにログイン中ユーザのセッションを削除する必要が出てきました。いくつかの解決策はありましたが対応の緊急性や工数の観点あるいは、一定数のユーザの再ログインをシームレスに行ってもらうUXのため、本記事にあるようにアプリケーションが読み書きしているDynamoDBのセッションテーブルのItemを直接削除する方法を選択しました。

システムの前提条件(制限事項)

 実際に一括削除プログラムを実装をするにあたり、現状の弊社運用中DynamoDBの設定やテーブル設計について以下のことを考慮しなければなりませんでした。

  • アプリケーション上でセッション管理に使っている対象テーブルは2つ(仮にA=User,B=Sessionとする)存在

  • 対象テーブル2つともPartition key(パーティションキー)はセッションIDのみ

  • 対象テーブル2つともSort key(ソートキー)は設定なし

  • 対象テーブル2つともItem総数は1万強

  • 対象テーブル2つともCapacity mode(キャパシティモード)は、On-demand(オンデマンド)で起動中

  • テーブルAにはGlobal Secondary Index (GSI) の設定はない

  • テーブルBにはGSIの設定がある。但し、今回無関係のAttribute(属性)にのみ設定済

 また、運用面での要件は以下の通りです。

  • 当日、システムメンテナンスを1時間実施。その時間内で作業完遂するスケジュール策定

  • GSI作成時にRead/Write負荷がかかり相当数の時間を要する可能性があるため普段の運用に支障をきたさないようにGSIの追加は行わない(そのための別メンテナンス時間を取る日程調整が厳しかった)

前提を要件としたコード設計

 上述の前提条件を基にどのようなコード設計が必要か列挙します。

  • パーティションキーではない属性(ユーザID)を基に対象者を抽出するためFull Scan(フルスキャン)検索をしなければならない

  • スキャンは、1MBの制限でPagination(ページネーション)が発生するため、LastEvaluatedKey要素が存在しなくなるまでページネーション単位で検索を繰り返す

  • 属性:time_to_liveをNumber型で定義しているがこの値の最新UNIX Timestamp 1件のみを抽出

  • time_to_liveはソートキーではないのでScanIndexForward=falseによるORDER BY DESCを使えないので最新かどうかはプログラム側で判定

  • boto3のリトライ機構はクライアントセッション全体のThrottling(スロットリング)には対応しているがbatch_write_item()関数のUnprocessedItemsには非対応なので自前でリトライ機構の実装が必要

  • リトライ機構として、Exponential Backoff(指数バックオフ)+Jitter(ランダム遅延)を採用

実装例

deleteUserSession.py

# usage: python deleteUserSession.py <profile> <file>
# <profile>: AWS CLIのprofile名
# <file>: 複数userIdが1行に1つずつ列挙されたファイル
import boto3
import time
import random
import traceback
import sys
from collections import defaultdict

# コマンドライン引数からprofile[第1引数]、ファイル名[第2引数]を取得
args = sys.argv

# DynamoDBクライアント
my_session = boto3.Session(profile_name=args[1])
dynamodb = my_session.client('dynamodb')

# テーブル名
table_a_name = "User"
table_b_name = "Session"

# 外部ファイル(plain text)のパス
INPUT_FILE_PATH = args[2]

# ファイルから `userId` の値を読み込む
def read_a_values_from_file():
    with open(INPUT_FILE_PATH, "r") as file:
        return [line.strip() for line in file.readlines() if line.strip()]

# 指定 `userId` の最新Itemをフルスキャン検索して取得
def scan_table_for_latest_record(a_value):
    latest_record = None
    last_evaluated_key = None  # ページネーション用

    while True:
        # Scan 実行(last_evaluated_key がある場合のみ渡す)
        scan_params = {
            "TableName": table_a_name,
            "FilterExpression": "userId = :a_value",
            "ExpressionAttributeValues": {":a_value": {"S": a_value}}
        }
        if last_evaluated_key:
            scan_params["ExclusiveStartKey"] = last_evaluated_key  # None の場合は追加しない
        response = dynamodb.scan(**scan_params)

        # 取得データの中から最新の `time_to_live` を持つItemを探す
        for item in response.get("Items", []):
            if latest_record is None or int(item["time_to_live"]["N"]) > int(latest_record["time_to_live"]["N"]):
                latest_record = item  # 最新の `time_to_live` を持つItemを更新

        # ページネーションのチェック
        last_evaluated_key = response.get("LastEvaluatedKey")
        if not last_evaluated_key:
            break  # 次のページがなければ終了

    return latest_record

# Userテーブルから `userId` の値を元に最新Itemを取得
def get_latest_records_from_table_a(a_values):
    items_by_a = {}

    for a_value in a_values:
        latest_record = scan_table_for_latest_record(a_value)
        if latest_record:
            items_by_a[a_value] = latest_record["sessionId"]["S"]  # `sessionId` の値を保存

    return list(items_by_a.values())

# Sessionテーブルの検索 & 削除
def delete_from_table_b(b_values):
    items_to_delete = []
    
    # Sessionテーブルから `sessionId` の値で検索して該当Itemを取得
    for b_value in b_values:
        response = dynamodb.query(
            TableName=table_b_name,
            KeyConditionExpression="sessionId = :b_value",
            ExpressionAttributeValues={":b_value": {"S": b_value}}
        )
        items_to_delete.extend(response['Items'])

    # 取得したItemをBatchWriteItemで削除
    deleted_count = 0
    batch_size = 25
    for i in range(0, len(items_to_delete), batch_size):
        batch = items_to_delete[i:i + batch_size]
        request_items = {
            table_b_name: [{'DeleteRequest': {'Key': {'sessionId': item['sessionId']}}} for item in batch]
        }
        response = dynamodb.batch_write_item(RequestItems=request_items)

        # 未処理のItemがあればリトライ
        retry_count = 0
        while response.get('UnprocessedItems'):
            retry_count += 1
            wait_time = min(2 ** retry_count + random.uniform(0, 1), 60)  # 指数バックオフ+ランダム遅延
            time.sleep(wait_time)
            response = dynamodb.batch_write_item(RequestItems=response['UnprocessedItems'])

        deleted_count += len(batch)
        print(f"経過:{deleted_count} 件を削除しました")

    return deleted_count

def main():

    try:
        # 外部ファイルから検索対象の `userId` のリストを取得
        a_values = read_a_values_from_file()
        if not a_values:
            print("外部ファイルに検索対象がありません")
            return

        # Userテーブルから最新の `sessionId` 値を取得
        b_values = get_latest_records_from_table_a(a_values)
        if not b_values:
            print("Userテーブルに該当するItemが見つかりませんでした")
            return
        
        # b_valuesの要素数を表示
        print(f"対象者数: {b_values.__len__()}")

        # SessionテーブルのItem一括削除
        deleted_count = 0
        deleted_count = delete_from_table_b(b_values)
        print(f"総数:{deleted_count} 件を削除しました")

    except Exception as e:
        print(traceback.format_exc())

if __name__ == "__main__":
    main()

解説

read_a_values_from_file()関数について

# ファイルから `userId` の値を読み込む
def read_a_values_from_file():
    with open(INPUT_FILE_PATH, "r") as file:
        return [line.strip() for line in file.readlines() if line.strip()]

 以下のようなフォーマットのテキストファイルを1行ずつ読み込みユーザIDのリストを返します。

userIdList.txt
100001
100019
100223
...

scan_table_for_latest_record()関数について

# 指定 `userId` の最新Itemをフルスキャン検索して取得
def scan_table_for_latest_record(a_value):
    latest_record = None
    last_evaluated_key = None  # ページネーション用

    while True:
        # Scan 実行(last_evaluated_key がある場合のみ渡す)
        scan_params = {
            "TableName": table_a_name,
            "FilterExpression": "userId = :a_value",
            "ExpressionAttributeValues": {":a_value": {"S": a_value}}
        }
        if last_evaluated_key:
            scan_params["ExclusiveStartKey"] = last_evaluated_key  # None の場合は追加しない
        response = dynamodb.scan(**scan_params)

        # 取得データの中から最新の `time_to_live` を持つItemを探す
        for item in response.get("Items", []):
            if latest_record is None or int(item["time_to_live"]["N"]) > int(latest_record["time_to_live"]["N"]):
                latest_record = item  # 最新の `time_to_live` を持つItemを更新

        # ページネーションのチェック
        last_evaluated_key = response.get("LastEvaluatedKey")
        if not last_evaluated_key:
            break  # 次のページがなければ終了

    return latest_record

 LastEvaluatedKeyが存在する限りwhileループを続けてページネーション毎にItemのtime_to_live属性値が最新(=最大)のものに更新し続けます。最後まで到達した後に最新1件のItemだけを返します。

get_latest_records_from_table_a()関数について

# Userテーブルから `userId` の値を元に最新Itemを取得
def get_latest_records_from_table_a(a_values):
    items_by_a = {}

    for a_value in a_values:
        latest_record = scan_table_for_latest_record(a_value)
        if latest_record:
            items_by_a[a_value] = latest_record["sessionId"]["S"]  # `sessionId` の値を保存

    return list(items_by_a.values())

 対象リストのユーザID1つずつスキャンして最新1件のItemをリストに格納し、ユーザID数繰り返します。最後にItemのセッションIDリストを返します。

delete_from_table_b()関数について

# Sessionテーブルの検索 & 削除
def delete_from_table_b(b_values):
    items_to_delete = []
    
    # Sessionテーブルから `sessionId` の値で検索して該当Itemを取得
    for b_value in b_values:
        response = dynamodb.query(
            TableName=table_b_name,
            KeyConditionExpression="sessionId = :b_value",
            ExpressionAttributeValues={":b_value": {"S": b_value}}
        )
        items_to_delete.extend(response['Items'])

 まず、セッションIDがSessionテーブルのパーティションキー属性になっているのでQuery(クエリ)検索を使って高速処理することができます。

# 取得したItemをBatchWriteItemで削除
deleted_count = 0
batch_size = 25
for i in range(0, len(items_to_delete), batch_size):
    batch = items_to_delete[i:i + batch_size]
    request_items = {
        table_b_name: [{'DeleteRequest': {'Key': {'sessionId': item['sessionId']}}} for item in batch]
    }
    response = dynamodb.batch_write_item(RequestItems=request_items)

 次に、全て取得した対象Itemリストをbatch_write_item()関数で25件(AWS側の仕様制限)を1単位として繰り返し一括削除していきます。

# 未処理のItemがあればリトライ
retry_count = 0
while response.get('UnprocessedItems'):
    retry_count += 1
    wait_time = min(2 ** retry_count + random.uniform(0, 1), 60)  # 指数バックオフ+ランダム遅延
    time.sleep(wait_time)
    response = dynamodb.batch_write_item(RequestItems=response['UnprocessedItems'])

deleted_count += len(batch)
print(f"経過:{deleted_count} 件を削除しました")

 その後、削除処理に失敗したUnprocessedItemsが存在する限り、削除処理をリトライし続けます。リトライ間隔を指数バックオフ+ランダム遅延により徐々に長くしていくことでリトライの成功率を上げる試みをしています。最後に、削除した件数を返します。

main()関数について

def main():

    try:
        # 外部ファイルから検索対象の `userId` のリストを取得
        a_values = read_a_values_from_file()
        if not a_values:
            print("外部ファイルに検索対象がありません")
            return

        # Userテーブルから最新の `sessionId` 値を取得
        b_values = get_latest_records_from_table_a(a_values)
        if not b_values:
            print("Userテーブルに該当するItemが見つかりませんでした")
            return
        
        # b_valuesの要素数を表示
        print(f"対象者数: {b_values.__len__()}")

        # SessionテーブルのItem一括削除
        deleted_count = 0
        deleted_count = delete_from_table_b(b_values)
        print(f"総数:{deleted_count} 件を削除しました")

    except Exception as e:
        print(traceback.format_exc())
  1. ユーザIDファイル読み込み
  2. Userテーブルから削除対象ユーザのセッションIDを検索
  3. 対象者数を出力
  4. Sessionテーブルから対象セッションを検索して削除実行
  5. 削除Item数を出力
  6. 1−5の処理途中で例外が発生した場合は、トレース出力
  7. 終了

結果

  • 検索ユーザ数:577名
  • 対象者数:139名
  • 削除Item総数:139件
Executed in usr time sys time
138.78 secs 14.20 secs 1.55 secs

総括

 本記事では、DynamoDBのセッション管理テーブルの大量のItemを一括に削除する方法としてAWS SDK for Pythonで実装したプログラムを紹介しました。本番環境の実Item群を削除処理した結果、スロットリングは発生せずに全て成功するに至りました。今回は、既存の設計に変更を加えることなく運用中のテーブルにおいてなるべく影響を及ばさないためにBDIを追加せずにフルスキャン検索しましたが、もっと大量のItemが格納されている場合には処理の高速化が死活問題になる可能性はありますので、皆様のケースに合わせて柔軟に対応いただければと思います。

 また、DynamoDBはPartiQLをサポートしています。簡単な問い合わせならば従来のSQLライクな構文で実行可能という意味で便利です。今回のケースにおいてはパーティションキー、ソートキー、インデックスが設定されていない属性に対しては制限されてしまうため使えませんでしたが、キーの設定がなされているケースであればなお有効であると思います。

 今回は一時的な対応のみでしたのでCLIとして実装してローカルマシンからAWSアカウントにログインして実行しましたが、AWS Lambda+EventBridgeを使って定期的に古い使われなくなったItemを削除するパターンにも有効かと思います。

 これにて本記事の結びとさせていただきます。

参考

最後に

エブリーでは、ともに働く仲間を募集しています。

テックブログを読んで少しでもエブリーに興味を持っていただけた方は、ぜひ一度カジュアル面談にお越しください!

corp.every.tv