every Tech Blog

株式会社エブリーのTech Blogです。

GASを使ってRedashからデータ抽出して、G Spreadsheetを作成する

はじめまして、データストラテジストのoyabuです。 RedashからCSVでデータをエクスポートして、GoogleDriveに保存、更にCSVをSpreadsheet化してようやく可視化の準備が整うの、めんどくさいですよね。それらを自動化するGASを作ったので、書きます

注意点

極限までサボりたかったのでChatGPTに聞いてツギハギして動けばヨシの精神で作りました。出来上がったものをみて、コードの整然さやエラーハンドリングについて、思うところは多々ありますがそのままにしています。社内用に展開しているものは複数クエリに対応しているのですが、1->nになると本筋と関係のない話題が増えるので今回はデータを抽出するクエリが1つに限定されたGASのコードを考えることにします

課題

まずそもそもなんでこれやったかです。以下が理由です

  • RedashのQuery API はSpreadhsheet上でIMPORTDATAできて便利だが、パラメータを使っているクエリのデータは抽出できない
  • 基本CSV->G Spreadsheetに変換したうえで加工することが多いので、施行回数が多くなるとつらい
  • using redashAPI with GASの記事はいくつかみかけるが、ポーリングをsleepなどにまかせていて、重いクエリがそもそも回せない

やったこと

上記問題を解くために、以下を実施しました

  • パラメータつかってるクエリからもAPI使ってデータ抽出できるようにする
  • 時間がかかるクエリはポーリングする(GASのtimeoutである6minに負けない)
  • CSV->スプレッドシート化まで自動化

中でも本記事で触れるのはあんまり情報として見ない(気がする)以下の項目です

  • RedashのUser API Keyの簡単な解説とGASでの使い方
  • GASのトリガーを使ったポーリング

その他についてはよく見るので、詳細については本記事では触れません

RedashのUser API Keyの簡単な解説と使い方

詳しくは公式を参照していただければと思うのですが、めんどくさいです。まずresponseが書いてありません。愚直にrequestして、responseをみる必要があります。つらいです

API

今回やりたいことを実現するうえでの主役はこいつになります /api/queries/<id>/results クエリIDとパラメータを渡してPOSTしたときにキャッシュされた結果があればそれを、なければクエリを実行するエンドポイントです

responseの中にstatusを格納したキーが無く、query_resultキーがあればデータが返ってきた。なければクエリが実行されたので、ポーリングしてデータが返ってくるまで待つ。の判断をしないといけないのでちょっとゾワゾワします

親切にやるならこっちでstatusを判断してObjectとしてラップして返しちゃう関数を作るのがよいと思いますが、今回のコンセプトは極限までサボる。です。心を鬼にしてChatGPTが出したものを正として進めていきます。

余談ですがChatGPTは一瞬で80点までは出してくれるのですが、100点までChatGPTオンリーで詰めるのはちょっとしんどいと思ってます。(百里を往くものは九十を半ばとす。なので、このあたりは自前の実装でも同じことは言えますが、一段抽象化されているのでコントロールが効きづらく、十里がより遠くなる印象)

出来上がり is belowなのですが、ここに関する白眉なコードはこんな感じです

function _getJobId(queryId, param) {
  const apiUrl = `${host}/api/queries/${queryId}/results`;
  const data = {
    parameters: param
  };
  const payload = JSON.stringify(data);
  const options = {
    'method' : 'post',
    'contentType': 'application/json',
    'headers': {
      'Authorization': 'Key ' + apiKey
    },
    'payload' : payload,
    'muteHttpExceptions': true
  };
  let results = UrlFetchApp.fetch(apiUrl, options);
  let isResult = !!JSON.parse(results).query_result;
  console.log(isResult);
  if (isResult) {
    console.log(JSON.parse(results).query_result.data.rows);
    return JSON.parse(results).query_result.data;
  }
  const jobId = JSON.parse(results).job.id;
  return jobId;
}

なんとキャッシュがあれば配列を、なければjobIdを返します。ChatGPTが言うので仕方ないですが結構しんどいです とはいえ、/api/queries/<id>/results の仕様上どこかでこんな感じの処理が必要になってきます(もうちょい考慮してwrapするべきという議論は置いておきます)

GASのトリガーを使ったポーリング

GASのtimeoutが6minなので、sleepで待ってもそもそも重いクエリの実行が無理になってしまいます。 一方でGASはトリガーが結構な量作れたりするので、これを使ってポーリングすると楽になるシーンが多いです。 変数は渡せないので、そこはspreadsheetのシート上に持たせる解き方で頑張ります。(ほんとは実行者のみ編集できるセルとかにしたほうがいいけど、サボります)

例えばこんな感じです。今回は自分を呼んでます

  data = getJobResult(jobId);
  if (!data) {
    ScriptApp.newTrigger('generateRedashFiles').timeBased().after(min * 60 * 1000).create();
    return;
  }

結果のキャッシュがなかったときは、Redash側でクエリが実行されるので、一旦ジョブIDをスプレッドシートに保存しといて、再度自分を呼び出したときに参照するようにします こんな感じでトリガーが作成されます。急いでなかったり、重めのクエリかもなー。というときは30minとかで良さそうな気もします

出来上がり

ChatGPTにいっぱい聞いてツギハギしてちょっとだけ手直しした結果がこれです。とりあえず動きます。使い方は後述します。

const host = '${redashのホスト名}';
const apiKey = '${redah user API Key}';
const urlSheetName = 'URLリスト'; // TODO: edit as each env
const ss = SpreadsheetApp.getActiveSpreadsheet();
const folderId = '${データ保存先のG DriveフォルダID}';
let min = 30;

function generateRedashFiles() {
  let sheet = ss.getSheetByName(urlSheetName);

  let data;
  let url = sheet.getRange(2, 2).getValue();
  let fileName = sheet.getRange(2, 1).getValue();
  let jobId = sheet.getRange(2, 3).getValue();

  if (!jobId || jobId == '') {
    jobId = getJobId(url);
    // jobID or data
    if (jobId instanceof Object) {
      data = jobId;
      json2Csv(fileName, data);
      return;
    } else {
      sheet.getRange(2, 3).setValue(jobId);
    }
  }

  data = getJobResult(jobId);
  if (!data) {
    ScriptApp.newTrigger('generateRedashFiles').timeBased().after(min * 60 * 1000).create();
    return;
  }
  json2Csv(fileName, data);
  importCsvFilesToSpreadsheet();
}

function getJobResult(jobId) {
  const jobStatusUri = `${host}/api/jobs/${jobId}?api_key=${apiKey}`;
  let queryResultId = null;
  const jobStatus = JSON.parse(UrlFetchApp.fetch(jobStatusUri)).job;
  const status = jobStatus.status;
  if (status === 3 || status === 4) {
      queryResultId = jobStatus.query_result_id;
  } else {
    return;
  }
  const jobResultUri = `${host}/api/query_results/${queryResultId}.json?api_key=${apiKey}`;
  results = UrlFetchApp.fetch(jobResultUri);
  return JSON.parse(results).query_result.data;
}

function getJobId(url) {
  let payload = generatePayload(url);
  let queryId = payload[0];
  let param = payload[1];
  let jobId = _getJobId(queryId, param);
  return jobId;
}

function _getJobId(queryId, param) {
  const apiUrl = `${host}/api/queries/${queryId}/results`;
  const data = {
    parameters: param
  };
  const payload = JSON.stringify(data);
  const options = {
    'method' : 'post',
    'contentType': 'application/json',
    'headers': {
      'Authorization': 'Key ' + apiKey
    },
    'payload' : payload,
    'muteHttpExceptions': true
  };
  let results = UrlFetchApp.fetch(apiUrl, options);
  let isResult = !!JSON.parse(results).query_result;
  console.log(isResult);
  if (isResult) {
    console.log(JSON.parse(results).query_result.data.rows);
    return JSON.parse(results).query_result.data;
  }
  const jobId = JSON.parse(results).job.id;
  return jobId;
}

function generatePayload(url) {
  url = url.split('#')[0];
  let match = url.match(/\/queries\/(\d+)\/source/);
  let queryId = match ? match[1] : null;
  console.log(queryId);
  let params = {};
  let queryString = url.split('?')[1];
  const regex = /^\d{4}-\d{2}-\d{2}--\d{4}-\d{2}-\d{2}$/;
  if (queryString) {
    let pairs = queryString.split('&');
    pairs.forEach((pair) => {
      let kv = pair.split('=');
      let key = decodeURIComponent(kv[0]).substring(2);
      let val = decodeURIComponent(kv[1] || '');
      if (regex.test(val)) {
        let dates = val.split('--');
        val = {
          'start': dates[0],
          'end': dates[1]
        }
      }
      params[key] = val;
    });
  }
  console.log(params);
  return [queryId, params];
}

function getFolderId() {
  const folderUrl = ss.getSheetByName(settingSheetName).getRange('B1').getValue();
  console.log(folderUrl);
  const matches = folderUrl.match(/[-\w]{25,}/);
  if (!matches) {
    throw new Error('Invalid folder URL');
  }
  return matches[0];
}

function moveFileToFolder(fileId) {
  const folder = DriveApp.getFolderById(folderId);
  const file = DriveApp.getFileById(fileId);
  file.moveTo(folder);
  console.log(`File "${file.getName()}" has been moved to folder "${folder.getName()}"`);
}

function importCsvFilesToSpreadsheet() {
  var folder = DriveApp.getFolderById(folderId);
  var csvFiles = folder.getFilesByType(MimeType.CSV);

  // 新しいスプレッドシートを作成し、特定のフォルダに移動
  var spreadsheet = SpreadsheetApp.create('summary');
  var spreadsheetFile = DriveApp.getFileById(spreadsheet.getId());
  folder.addFile(spreadsheetFile);
  DriveApp.getRootFolder().removeFile(spreadsheetFile);

  var firstSheet = true;

  while (csvFiles.hasNext()) {
    var file = csvFiles.next();
    var fileName = file.getName();
    var csvData = Utilities.parseCsv(file.getBlob().getDataAsString());

    if (firstSheet) {
      // 最初のCSVファイルの場合、既存のシートを使用
      var sheet = spreadsheet.getSheets()[0];
      sheet.setName(fileName);
      firstSheet = false;
    } else {
      // 2つ目以降のCSVファイルの場合、新しいシートを作成
      var sheet = spreadsheet.insertSheet(fileName);
    }

    // CSVデータをシートに書き込む
    var range = sheet.getRange(1, 1, csvData.length, csvData[0].length);
    range.setValues(csvData);
  }

  // 最後にスプレッドシートを開く
  SpreadsheetApp.getActiveSpreadsheet().toast('CSVファイルの集約が完了しました。', '完了', 5);
  var url = spreadsheet.getUrl();
  Logger.log('スプレッドシートのURL: ' + url);
}

// json to csv
function json2Csv(fileName, data) {
  // CSV文字列を生成
  let csvContent = '';

  // ヘッダーを追加
  const headers = data.columns.map(column => column.friendly_name);
  csvContent += headers.join(',') + '\n';

  // 各行のデータを追加
  data.rows.forEach(row => {
    const rowValues = data.columns.map(column => {
      const value = row[column.name];
      // CSVの規則に従って、カンマや改行を含む値をダブルクォートで囲む
      return `"${value.toString().replace(/"/g, '""')}"`;
    });
    csvContent += rowValues.join(',') + '\n';
  });

  // CSVファイルをGoogleドライブに保存
  const file = DriveApp.createFile(fileName + '.csv', csvContent, MimeType.CSV);

  // ファイルのURLをログに出力
  Logger.log('CSVファイルが作成されました: ' + file.getUrl());
  moveFileToFolder(file.getId());
}

使い方

  1. URLリスト シートを作って、こんな感じに設定します

jobID部分は後で更新されます

  1. GASに上記の出来上がりコードを貼って generateRedashFiles 関数を実行します

  1. ジョブIDが更新され、トリガーが登録されます

  1. 時間が来るとトリガーが実行され、結果がキャッシュされていれば指定のフォルダにCSVとスプレッドシートが保存されます

終わりに

ところどころ手直しはしたいですが、とりあえず動くものができました。 今回は簡単のために単一クエリに話しを限定しましたが、RedashAPIをGASで動かせることの最大の強みは、パラメータだけ変えたクエリをスプレッドシート上で大量に作れるところだと思います。 なんだかんだパラメータを設定する、、他のパラメータも別画面で設定する、、結果がでるまで待つ。。やっぱり別の設定のがよいな。。設定し直す。。結果がでるまで待つ。。みたいな作業がBIツールを使っているとどうしても発生しがちなので、そこをG Suiteにまかせて富豪的に解決出来るのは他の作業ができて個人的には便利なところかと思っています。

それではさようなら

Amazon Bedrock ワークショップ に参加しました!

はじめに

こんにちは!トモニテにて開発を行なっている吉田です。

今回は先日参加した Amazon Bedrock ワークショップに参加させいただいたのでそこで学んだことについて紹介します!

ワークショップは AWS 様からエブリー向けに開催いただきました。

Amazon Bedrock とは

Amazon Bedrock は、高性能な基盤モデル (Foundation Model) の選択肢に加え、生成 AI アプリケーションの構築に必要な幅広い機能を提供する完全マネージド型サービスです。 特徴としては以下が挙げられます。

  • ユースケースに最適な基盤モデルを簡単に試すことができる
  • 調整や検索拡張生成 (RAG) などの手法を使用してデータに合わせて非公開でカスタマイズ可能
  • サーバーレスであるため、インフラストラクチャを管理する必要がない

aws.amazon.com

ワークショップの流れ

当日の流れとしては簡単な自己紹介から始まり AWS の方に今回のテーマである Amazon Bedrock(以下、Bedrock とします) について、Bedrock を利用するにあたり必要となる知識について講義をいただきその後ワークショップ用にご準備いただいたソースを使い各自で使ってみるという流れでした。

その中で学んだ Bedrock とその周辺知識について以下に簡単に紹介します。
Bedrock は、生成 AI アプリケーションの構築に必要な幅広い機能を有していますが、そもそも AI とは、人間のように学習し、理解し、反応し、問題を解決する能力を持つ技術のことを指します。
また基盤モデルとは、大量のデータから学習し、広範な知識と能力を持つ大規模な機械学習モデルのことで、その特徴は、入力プロンプトに基づいて、さまざまな異なるタスクを高い精度で実行できる点にあります。
タスクには、自然言語処理 (NLP)、質問応答、画像分類などがあり、テキストによるセンチメントの分析、画像の分類、傾向の予測などの特定のタスクを実行する従来の機械学習モデルと比べて、基盤モデルはサイズと汎用性で差別化されています。
基盤モデルが可能とすることは以下の通りです。

言語処理

基盤モデルには、自然言語の質問に答える優れた機能があり、プロンプトに応じて短いスクリプトや記事を書く機能さえあります。また、NLP 技術を使用して言語を翻訳することもできます。

視覚的理解

基盤モデルは、特に画像や物理的な物体の識別に関して、コンピュータビジョンに適しています。これらの機能は、自動運転やロボット工学などのアプリケーションで使用される可能性があります。また、入力テキストからの画像の生成、写真やビデオの編集が可能です。

コードの生成

基盤モデルは、自然言語での入力に基づいて、さまざまなプログラミング言語のコンピュータコードを生成できます。基盤モデルを使用してコードを評価およびデバッグすることもできます。

人間中心のエンゲージメント

生成 AI モデルは、人間の入力を使用して学習し、予測を改善します。重要でありながら見過ごされがちな応用例として、これらのモデルが人間の意思決定をサポートできることが挙げられます。潜在的な用途には、臨床診断、意思決定支援システム、分析などがあります。 また、既存の基盤モデルをファインチューニングすることで、新しい AI アプリケーションを開発できます。

音声からテキストへ

基盤モデルは言語を理解するため、さまざまな言語での文字起こしやビデオキャプションなどの音声テキスト変換タスクに使用できます。

aws.amazon.com

一方で基盤モデルが苦手とすることもあります。

  • 基盤モデルは大量の GPU を消費する(=コストがかかる)
    • 適材適所の基盤モデル利用が重要
  • Hallucination
    • 嘘の情報を答えてしまう
  • プロンプトのトークン数の制限
    • プロンプトに含められるトークン数には基盤モデルによって制限がある
  • 回答に冪等性がない
    • 特定の入力に対して毎回同じ結果を返すとは限らない

(ワークショップ内資料より引用)

これら基盤モデルの苦手ポイントを解決するために有効な手法の一つとして RAG(Retrieval Augmented Generation)が挙げられます。 RAG とは外部の知識ベースから事実を検索して、最新の正確な情報に基づいて大規模言語モデル(LLM)に回答を生成させることができます。 次節では実際に RAG を用いてタスクを実行してみます!

実際に使ってみた

Amazon Bedrock の Knowledge Bases for Amazon Bedrock(以下、Knowledge Bases とします) を使用すると、Bedrock の基盤モデルを、RAG のために企業データに安全に接続することができるとのことで実際に使ってみました!
Knowledge Bases ではデータソースとしてS3を指定し埋め込みモデルを選択します。(今回はAmazon Titan Embeddingsを選択)
そしてベクトルデータベースについては新しいベクトルストアを作成するか、他で作成したベクトルストアがある場合にはそれを設定することもできます。今回は初めての利用ということで新しくベクトルストアを作成しました。

データソースには弊社のサービスであるトモニテで 2023 年 8 月に実施された「トモニテ子育て大賞 2023」と昨年実施の「MAMADAYS 総選挙 2022」の内容を保存しました。(1 つのバケットに 2 つのオブジェクトがある状態です。)

election2023.tomonite.com

tomonite.com

※2023 年 8 月にブランドリニューアルを行ったことから名称が異なっております
関連記事はこちら tech.every.tv

使ってみた結果がこちらです!

質問内容:mamadays 総選挙、トモニテ子育て大賞それぞれのデカフェ飲料部門の最優秀賞について教えてください

回答: ※Knowledge Bases for Amazon Bedrock 実行結果のスクリーンショット

実際のコンテンツ:(左: MAMADAYS総選挙 2022、右: トモニテ子育て大賞 2023) ソースを明示した上で質問に回答しており、2 つのバケットから異なる情報を引き出すことができていました!

ただ質問によっては片方の情報のみ回答したり、異なる回答をすることもありましたが今回は web ページをそのまま pdf 化して保存しただけだったのでページ内の情報を適切にテキスト化した上でソースとして保存すればより回答の正確性は向上するのかなと思いました! 現に、ほとんど文字の羅列である企画書をソースとして保存しその内容について回答を求めた場合はほぼほぼ適切な情報を返してくれていました。

ワークショップでの学び

普段は SE として業務に関わっていることもあり AI や機械学習に関わる機会は少ないですが今回のワークショップは Bedrock の深い理解を得ることができ、非常に有意義な時間となりました。学んだことを社内に持ち帰り業務に役立てていきたいと思います。

終わりに

ワークショップの開催にあたり、多くのリソースを提供していただいた AWS の皆様に心から感謝申し上げます。

ネットスーパーアプリ GraphQL から REST へ移行始めました

はじめに

こんにちは、retail HUBで Software Engineer をしているほんだです。
今回は私が現在着手している事業譲渡されたアプリを社内で持続的なプロダクト開発を行える状態にするリプレイスプロジェクトをどのように行っているか紹介しようと思います。
この記事ではリプレイスを行うにあたってどのようなことを課題に感じてその課題に対してどのような解決策をとったか主にサーバーの実装について説明しています。

ネットスーパーアプリとは

現在弊社ではネットスーパーアプリとして Web アプリとスマホアプリの二つのシステムを提供しています。
Web アプリは販促コンテンツの設定や売り上げの管理・集計を行うことが可能な管理システムと受け取り方法に応じた価格変更や送料変更にも対応し、消費者の柔軟な買い物を実現するお客様向けアプリを 17 の小売り様に、スマホアプリでは Web アプリのお客様向けアプリと同等の機能を Android と iOS のアプリとして株式会社リウボウストア様にリウボウネットスーパーとして提供しています。
こちらのサービスは以前株式会社ベクトルワン様が開発・運用していたものを事業譲渡されたものです。

リプレイス前の実装

リプレイス前の実装は上記図のようになっていました。
ネットスーパーアプリは GraphQL Mesh で作成された GraphQL Gateway Server を呼びその裏では AppSync と Lambda を用いて GraphQL が実装されていました。
GraphQL のリゾルバーに当たる Lambda は Python で書かれていました。

リプレイスの背景

課題点

既存の実装では下記のような問題があったため今回リプレイスを行うに至りました。

  • 社内に知見が少ないインフラ構成や、言語で実装されている。
  • Appsync, Lambda を用いた GraphQL の実装がチューニング不足もあるかもしれないが遅かった。
  • 重複する場合やコアとなるロジックを切り出すのに Lambda レイヤーにする必要があり管理が大変だった。

リプレイスを進めるにあたり満たしたいこと

リプレイスを進めるにあたり満たしたいこととしては下記のようなことを意識しています。

  1. このプロダクトは現状は1小売様向けとなっていますが今後小売りの拡大やバグを見つけたときに早期対応、機能の追加をできるような持続的なプロダクト開発をできるようにする。
  2. DB は既存の Web アプリのものを用いるため大量の table 、小売りごとに特定の table の有無があるものを適切扱えるようにする。
  3. サーバーの実装と同時並行でアプリの実装もすすめられるようにする。

リプレイス後の技術スタック

リプレイス後のインフラ構成は上記図のようになる予定です。
リプレイス前に用いていた GraphQL Gateway Server は GraphQL を REST に移行また今後は REST に統一していく点から導入している必要がなくなったため廃止しました。
満たしたいこと 1 にあげた持続的なプロダクト開発をできるようにすることを満たすためになるべく社内に知見があるものを選定するようにしました。
インフラに関しては社内の他のサービスでも使われていて知見が豊富な ECS を、開発に用いる言語に関しても Python から社内の知見が豊富な Go、framework は echo を採用しました。
満たしたいこと 2 DB を適切に扱えるようにすることを満たすために Go の ORM は sqlboiler を採用しました。具体的な理由については後述します。
満たしたいこと 3 サーバーとアプリの実装の最適化を満たすために OpenAPI を用いたスキーマ駆動開発を実践しています。OpenAPI を用いてエンドポイントの仕様を事前に決めておくことでサーバーとクライアントが並列に実装を行えるようにしています。
OpenAPI 定義書の作成には Stoplight Studio を用いています。
次にリプレイスにあたり特筆する点について説明していきます。

oapi-codegen

oapi-codegen は Stoplight Studio で作成した OpenAPI 定義書から Go のコードを作成するために用いています。
oapi-codegen を用いて Go のコードを作成することで Request Header の値や Query Parameter の validation を自分で実装する必要がなくなります。
また、ルーティングも任せることは可能ですがその場合全てのエンドポイントに middleware を反映することになり個別に設定することができなくなってしまうため今回は用いていません。
main 関数の実装は下記のようになります。

func main() {
    e := echo.New()

  wrapper := openapi.ServerInterfaceWrapper{
        Handler: handler.NewHandler(chainSchemaMap),
    }

  g := e.Group("")
    g.Use(echomiddleware.Recover())

  // 認可なしエンドポイント
  {
    g.GET("/policy", wrapper.GetPolicy)
  }

  // 要認可エンドポイント
  g.Use(middleware.Authorize())

  {
        g.GET("/items", wrapper.GetItems)
        g.GET("/items/:id", wrapper.GetItem)
    }

  e.Logger.Fatal(e.Start(":1323"))
}

sqlboiler

sqlboiler は toml ファイルを記述し実際に DB に接続することでその table 定義を元に Go の struct を生成することができ、既存の table の数だけ stuct として書き直す手間が省けます。
今回、小売ごとの DB の差分は特定の table の有無なため local 環境に全ての table を持つ DB を作成し、それを参照することで小売ごとのカスタマイズを含む全ての table の struct を作成できます。
Go のサーバーと DB の接続には一つのユーザーを用いているため、どの小売のアプリがどの DB にアクセスできるかは Go で map を定義することで対応しています。

Stoplight Studio

スキーマ駆動開発のための OpenAPI 定義書は Stoplight Studio を用いて記述しています。
Stoplight Studio は GUI 形式で OpenAPI 定義書を編集できるツールとなっています。
また、ツール内から Mock Server や実際のサーバーを叩くことができるので OpenAPI を書く用途だけでなく、どのようなリクエストでどのようなレスポンスが返ってくるかも確認することも容易となっています。

まとめ

まだリプレイス作業が始まったばかりでレイテンシーの改善などは具体的に測れていないの結果として捉えることは今後やっていく必要があるなと感じました。
リプレイスを行っていくにあたっても最終系から逆算し何が必要かということをまとめられていなかった点もあり適切な工数見積もりができなかったり後手になることもあったので今後はそういった点も意識していきたいです。
今回私自身実務の Python のコードに触れるのが初めてでそれを慣れ親しんだ Go に書き換えるという経験は言語の長所などを改めて捉え直す貴重な機会になったなと思います。

DELISH KITCHENのレシピのレコメンドにTwo-stage Recommender Systemsを導入するまでの道のり

こんにちは。 開発本部のデータ&AIチームでデータサイエンティストをしている古濵です。

引き続き、私がフルコミットしているDELISH KITCHENのレシピレコメンドについてまとめていきます。 前回の投稿の続きのような位置づけです。

私自身の苦悩も含めた思考過程と実際に取り組んだことについてまとめていきます。

背景

DELISH KITCHENではユーザの嗜好に寄り添ったアプリのパーソナライズに向けた開発をしています。 大きく3つの課題を解決するために、アプリのパーソナライズに注力しています。

  1. 受動的に提示するレシピのパーソナライズ不足

    サービスの成長に伴い、ユーザ数もレシピ数も増えているのに対して、アプリのロジック部分は更新されていない状態が続いています。 そのため、ユーザが好みのレシピの発見の機会を増やすために、レシピのレコメンドの開発を進めています。

  2. ロジックの癒着

    DELISH KITCHENでは、一部の機能がサーバー側の簡易な集計ロジックをもとに提供しているため、サーバー側の実装と密結合となっている部分があり、データ&AIチームが継続的にロジックの改善に集中できない状態です。 そのため、データ&AIチームがオーナーシップを持ってロジックを開発し、サーバーエンジニアがロジック改善に伴う修正を対応せずとも運用できる状態を目指しています。

  3. ML活用が部分的にしか行われていない

    ユーザの行動データやレシピの栄養素データなど多くのデータが利活用できる状態なのに対して、MLをプロダクトに活用する動きが部分的にしかできていません。 そのため、MLをプロダクトに活用する事例づくりや、ML基盤の構築が必要となっており、データ&AIチーム総手で取り組んでいます。

    直近のML事例は以下をご覧ください

レシピレコメンド開発の道のり

構成

対象面は、最近見たレシピからおすすめの枠です。

レシピレコメンドの全体構成は以下のとおりです。

Data Sourceとなるdelta lakeからデータを読み込み、ロジック用の集計をdatabricksのnotebookで実装します。 実装した結果をレコメンド結果としてdelta lakeに保存し、そのデータと同じフォーマットのデータを推論結果のデータストアとして、Delish ServerのRedis(ElastiCache for Redis)内にデプロイします。
このパイプラインをデイリーのバッチで実行し、推論結果をサーバー側で取得して、DELISH KITCHENアプリで表示できるようにしています。

最初にやったこと

ルールベースのベースライン

まず、ルールベースのベースライン作成をしました。具体的には、ルールベースロジックnotebook内で、ユーザごとに検索経由で視聴した動画の中で長く再生したレシピ順にレコメンドする集計をしました。
集計データをrule base resultとしてdelta lakeに保存します。 Redisへデプロイするためにフォーマット整形も別のnotebookで実装し、recommend resultとして保存します。

最近見たレシピからおすすめ、というタイトルともシナジーもあり、ユーザにとってもわかりやすいレコメンドになったと思います。

数年間更新されていなかった既存ロジックと、ルールベースのベースラインを比較するためにA/Bテストしました。
結果として大きく改善しましたが、以下の2点のことがわかりました。

  1. 既存ロジックとルールベースのベースラインでは、レコメンド対象のユーザ数が異なることが判明し、ルールベースのベースラインが改善したというよりも、レコメンドを展開しているユーザ規模を増やすことができたことが改善の大きな要因だということ

  2. レコメンド対象に含まれていないユーザは、アプリ上で最近見たレシピからおすすめ枠が表示されていないということ

上記のような手探りの状態から始まりました。 とはいえ、ルールベースのベースラインを作って検証し、新たな課題を得ることができたと思います。

ルールベースのベースラインの限界

ルールベースのベースラインを作成した時点で、ルールベースの限界も感じていました。理由は2つあります。

1つ目は、動画をあまり再生しないユーザも一定数いることが見えてきたからです。
レコメンドをする順序を再生秒数の多い順にしていましたが、動画をあまり再生しないユーザにとっては、嗜好に添わないレシピが上位に表示されることになります。
それは、DELISH KITCHENのユーザは、動画を見るためにアプリを使っているわけではなく、レシピを探すためにアプリを使っているからではないかと推測しています。 あくまで動画はレシピを選定するための手段にすぎず、目的はレシピを探すことであり、Youtubeなどの動画サービスとは異なる思考が必要だと感じました。
そのため、動画を再生せず、材料などが見れるレシピ詳細を見てからレシピを選定しているユーザもいると考えました。

2つ目は、実装コストに対して、大きな改善は望めないと思ったからです。
多くのルール作成したり、レコメンド順序を細かくチューニングするなどすれば、より良いレコメンドができるかもしれませんが、その実装をするコストに対して改善の幅は小さいだろうと感じていました。
実際に、ルールベースのベースラインにレシピ詳細の表示ログを追加してA/Bテストしたところ、大きな改善はしませんでした。

MLロジックに向けての情報収集

ルールベースに限界を感じていたため、MLロジックに向けての情報収集を始めました。

まずは、ブログ記事を読み漁り、引用されている論文など目を通しました。 MLの導入やレコメンドのアルゴリズムに関して多くの知識があったわけではなかったため、基礎や体系的に学べる書籍を購入して読み進めました。

また、Kaggleなどコンペティションで実施された解法なども参考になりました。 コンペティションの場合、コードが公開されているケースが多くあり、コードを読んで理解する助けになりました。

情報収集した気づきとして、書籍で紹介される協調フィルタリング(行列分解など)のような手法は、コンペティションの解法ではメインで使われていないということです。
あくまで主軸となっていたのは、Two-stage Recommender Systemsと呼ばれる、候補生成とリランキングの2つからなる手法でした。 候補生成の一つとして協調フィルタリングなどが使われているケースはたくさんありましたが、メインとして使われている解法は多くなかった印象でした。

Two-stage Recommender Systemsは、大規模なユーザ x アイテムの組み合わせを全て扱うのではなく、ユーザ一人当たりに対して候補を生成し、その候補を並び替え(リランキング)するという手法です。 手法の肝としては、情報検索における検索クエリの結果が候補であり、検索結果をどの順序で並び替えるかがリランキングに該当するのかなという所感を受けています。
正しくは確認できていないですが、Covington Paul, Adams Jay, and Sargin Emre. 2016. Deep neural networks for Youtube recommendations. In RecSys. 191–198.で提案された手法が有名であり、その後Two-stage Recommender Systemsという言葉が広まったかなと思います。

Two-stage Recommender Systemsを実装するために、まず、どのような候補を生成できるかのアイデアを一覧化しました。 候補の肝となるeventログをデータソースとして、SQLで集計可能な候補を中心に整理しています。 後述する候補生成モジュールでは、このアイデア一覧をもとに、候補を生成するためのクエリを一元管理しています。

候補生成のアイデアを整理する中で、ルールベースのベースラインである検索経由で視聴したレシピを候補の一つとして扱えるのではという考えが浮かびました。 Two-stage Recommender Systemsであれば、検索経由で視聴したレシピレシピ詳細を表示したレシピの2種類をそれぞれ候補として扱い、ルールベースにおける並び替えの限界を、MLロジックでリランキングすることでユーザの嗜好にあった順にレコメンドできると考えました。

Two-stage Recommender Systemsの実装

候補生成

まずは、候補生成をするパイプラインの作成から始めました。 全体構成としては、ルールベースロジックのnotebookが候補生成notebookに置き換わります。 候補生成notebookでは、複数の候補を一括で生成するために、候補生成モジュールを用いています。

候補生成モジュールを作成した経緯として、レコメンド開発をしていく上で、今後多くの候補を作るだろうと予測していたためです。
DELISH KITCHENで全てのレシピからレコメンドする場合、ユーザ一人当たり5万レシピ強になります。 候補生成は、このレシピの数を減らす役割がありますが、特定の候補からだけでレコメンドした場合、特定の人気レシピや上位のポジションに位置するレシピばかりがレコメンド対象になる可能性があります。
本ブログ執筆時点では、検索経由で視聴したレシピレシピ詳細を表示したレシピを候補にしていますが、さらに複数候補からの組み合わせでレコメンドしたいケースも出てくると考えました。

そこで、候補生成モジュールを作成し、候補生成に関する集計ロジックを一元管理することにしました。 使い回しやすい 2-stage recommender systemの デザインパターンを考えて実装した話 を参考に、Candidate、QueryGererator、Evaluatorのクラスを作成し、これを候補生成モジュールと呼称します。

Candidateに対して、それぞれQueryGereratorとEvaluatorが依存しています。 メインとなるCandidateは、以下のメソッドを持っています。

  • generate
    • QueryGereratorからクエリ(=query)をstringを受け取り、spark.sql(query)を実行
    • QueryGereratorでは、候補を生成するためのクエリと、クエリがアウトプットするスキーマを保持
    • クエリはspark.sql()で実行可能なクエリであり、スキーマはpyspark.sql.typesのStructTypeで定義
  • evaluate
    • Evaluatorクラスで定義された評価関数を使って、生成した候補とground truthを比較
    • 評価関数には、precision@k, recall@k, map@k等
  • validate
    • generateで生成したデータや、evaluateで評価するデータに対して、簡単なバリデーションを実施
    • バリデーションにはdataframeの空チェック、カラム数チェック、カラム名チェック、カラムの型チェック等

候補生成モジュールを用いて候補生成notebookを実行し、候補を生成します。 サンプルコードとしては、以下のとおりです。

from src.recommend_system.candidate_generation.candidate import Candidate

ground_truth_table = spark.sql(...)
# example)
# user_id, recipe_id
# aaaaaaa, 111111111
# ...

candidate = Candidate(
    delta_schemas=delta_schemas,
    user_col="user_id",
    item_col="recipe_id",
    candidate_col="candidate_recipes",
    ground_truth_col="recipe_ids"
)
candidate_names = candidate.catalog_schema.keys()
# example)
# candidate_names = ["検索経由の視聴", "レシピ詳細の到達"]

for candidate_name in candidate_names:
    # 候補生成
    candidate.generate(
        candidate_name,
        date
    )

    # 評価
    candidate.evaluate(
        candidate_name,
        ground_truth_table,
        eval_topk=[3, 8, 10],
        mlflow_eval_cache_name=candidate_name
    )

    # 保存
    results = candidate.generated_candidates[candidate_name]
    results.write \
        .format("delta") \
        .mode("overwrite") \
        .option("mergeSchema", "true") \
        .save(f'path/{candidate_name}')

candidate_namesに生成する候補名となるkeyが格納されます。 これは、QueryGeneratorクラスで定義した、候補を生成するためのクエリとクエリがアウトプットするスキーマをもとに、Candidateクラスのcatalog_schemaに格納されます。

QueryGeneratorクラスの、候補を生成するためのクエリとクエリがアウトプットするスキーマは、以下のような定義をしています。
新規で候補を追加したい場合、以下のような実装を追加するだけでOKです。

# わかりやすくするために一部日本語にしています

class QueryGenerator:

    def __init__(self, delta_schemas: DeltaSchema):
        self.catalog = {
            "検索経由の視聴": {
                "query": {
                    "func": self.fetch_検索経由の視聴,
                    "params": { "days": 30 }
                },
                "schema": StructType([
                    StructField("user_id", StringType()),
                    StructField("recipe_id", LongType()),
                    StructField("seconds", DoubleType()),
                ])
            },
            ...
        }
        ...

    # Candidateクラスでgenerateメソッドが呼ばれたときに、このメソッドが呼ばれる
    def get_query(self, candidate_name: str, date: str) -> str:
        ...
        return query

    def fetch_検索経由の視聴(self, from_date: str, to_date: str) -> str:
        return f"""
            SELECT
                user_id,
                recipe_id,
                sum(seconds) AS seconds
            FROM
                ...
            WHERE
                event_date BETWEEN '{from_date}' AND '{to_date}'
                AND ...
            GROUP BY
                1,
                2
        """

リランキング

生成した候補から得られたuser_id x recipe_idの組み合わせを用いて、リランキングをします。
リランキングでは、教師あり機械学習を用いてリランキングモデルを作成し、予測結果を降順でソートして上位k個をレコメンド対象とします。 今回は、LightGBMを使ってリランキングモデルを作成しました。

候補群の作成

まず、各候補をfull outer joinして、user_id x recipe_idの組み合わせとなる候補群を作成します。 今回の場合は、検索経由で視聴したレシピレシピ詳細を表示したレシピの2種類の候補になります。こうして作成された候補群がリランキングの対象となります。

特徴量の作成

次に、リランキングモデルの学習をするための特徴量を作成します。 特徴量は、ユーザの行動データやレシピの栄養素データなどを使って作成します。
行動データは動画の表示及び視聴やレシピ詳細の表示、最後のアクセスからの経過日数、アプリ内の様々なタップログを用いています。
栄養素データは、DELISH KITCHENのレシピのメタデータにあるカロリー、たんぱく質、脂質、糖質などの栄養素を使っています。 栄養素データはDELISH KITCHENのWebサイトで公開されており、ユーザがレシピを選定する上で重要な指標になっていると考えています。

目的変数の設定

次に、正解ラベルを用意します。 これを目的変数とします。 今回の学習では、候補生成時点よりも未来の時間軸にユーザが視聴したレシピを正解ラベルとします。 そのため、正解ラベルは視聴した=1、視聴していない=0を持ちます。
正解ラベルは、最近見たレシピからおすすめの枠で視聴されたレシピのみに限定せず、アプリ上の全ての枠で視聴された動画を対象としました。 その理由は以下の3つです。

  1. 最近見たレシピからおすすめの枠は、ユーザにレシピを再度見てもらうための枠だと位置づけており、ユーザが興味を持ちそうなレシピを広く反映させたいため
  2. 最近見たレシピからおすすめ枠の視聴レシピだけでは、すべての枠で視聴されたレシピの数に比べて、正解ラベルの数が少なくなるため
  3. 最近見たレシピからおすすめ枠経由のレシピだけを用いると、正解ラベルが既存ロジックのバイアスの影響を受けるため

学習

次に、候補群に対して、特徴量と正解ラベルをleft joinして学習データを作成します。 学習データをもとに、再視聴の有無を予測するための二値分類問題として、LightGBMで学習します。 正解ラベルは視聴していない=0の方が圧倒的に多いため、0となる方をダウンサンプリングしています。 学習後、mlflowを使ってモデルを保存します。

予測

次に、学習データと同じ特徴量を使って、最新の候補群に対して予測します。 予測結果を降順でソートし、上位k個がリランキングモデルにおけるレコメンド対象になります。

評価

最後に、同じ候補群を用いて、ルールベースのベースラインとリランキングモデルの性能を評価します。 ルールベースのベースラインは再生秒数で降順にソートし、上位k個がルールベースのベースラインにおけるレコメンド対象になります。
リランキングモデルとルールベースのベースラインの各評価指標もmlflowで記録し、モデルの性能を比較できるようにしています。

A/Bテスト

controlをルールベースのベースライン、testをTwo-stage Recommender Systemsによるレコメンド、としてA/Bテストしました。 A/Bテストの結果、ある指標において、リランキングモデルによるレコメンドがルールベースのベースラインよりも改善されたことがわかりました。

まとめ

本ブログでは、DELISH KITCHENのレシピのレコメンドにTwo-stage Recommender Systemsを導入するまでの道のりについてまとめてきました。

現時点では検証段階であり、あくまでTwo-stage Recommender Systemsの一通りの実装をしただけに過ぎません。 リランキングモデルの目的変数の設定は深く検討できておらず、特徴量も既存のものを使いまわしているため、モデルの性能は十分とは言えません。 バイアスの考慮なども含めるとチューニングして改善する余地は多くあります。

そんな未だ手探りの状態とも言えますが、ユーザの嗜好に寄り添ったアプリを目指した改善が少しずつできていると思います。
CTOの今井が過去のブログでも記載している「事業を推進する開発組織になる」を目指して、データ&AIチームとして、引き続きプロダクトに寄り添った開発を進めていきたいと考えています。
私個人としても、MLをプロダクトに導入するという非常に挑戦的な取り組みをできており、裁量を持って開発をできていることに成長を実感しています。

データ&AIチームでは一緒に働く仲間を募集しています! 動画メディアでAI/MLプロダクトの推進にご興味のある方はぜひ、以下のURLからご応募ください。

corp.every.tv

DevEnableグループを 新設しました!

はじめに

エブリーでCTOをしている今井です。先日の池のブログ でも少し触れておりますが、2月にDevEnableグループ を設立したので、その紹介と設立した背景ついてお話しできればと思います。

tech.every.tv

DevEnableグループとは

DevEnableグループはCTO室に属しているグループで、開発本部を横断し、組織の活性化・成長環境の提供・発信・広報の強化・採用など、さまざまな課題解決を推進するグループです。

DevEnableという名前は Developer Enablement から取られており、「社内外から憧れる開発組織へ」というのをミッションに、エンジニア自身やエンジニア組織がより活性化し、成果を出し続けられる人・組織にすることを目標としています。

Developer Enablementは各社定義もかなり幅があるように感じておりますが、自分は、エンジニア自身の成長はもちろん、組織とのコラボレーション、これから迎えるメンバーの採用やその方の早期活躍に向けたオンボーディング、また自社だけでないエンジニアコミュニティの活性化など、かなり広義にとらえております。

DevEnableグループでは音頭を取ったり、活動がやりやすい場の提供をすることで推進し、活動自体は開発本部に所属する全員で行なっていきたいと考えております。

なぜ作ったのか

自分がCTOになった時から口酸っぱく言ってきたのが、「事業を推進する開発組織になる」ということでした。それが浸透してきたのもあり、各メンバーが技術だけじゃなく事業を考え開発に向き合ってくれるようになった一方で、相対的に技術に関する取り組みが減り、振り返ると技術的な挑戦ができてないと感じることも多くなりました。

また採用観点でも、まだまだエブリーを知っていただけてないことが多かったり、エブリーは知っているが具体的に今何をしてる会社かわからないなどの声をいただくことも多く、課題を感じていました。

それらの課題に向き合うために生まれたのがDevEnableグループの前身となる、組織活性化委員会でした。

前身: 組織活性化委員会

上記の課題に対して、特にDeveloper Experienceに興味がる有志で結成された組織活性化委員会です。この委員会では、TechTalkや社内勉強会の開催、挑戦WEEEKの実施、アドベントカレンダーの開催など、組織の活性化に向けた様々な活動を行ってきました。詳しくはいくつかブログにもなっているので、ぜひ一読ください。

tech.every.tv

これらの活動を通じて、組織内のコミュニケーションが活性化し、開発者同士の繋がりが強くなるなどの成果が見られました。一方で、有志で集まった非公式な組織であるが故の活動のやりにくさがあったり、より広い課題に取り組みたい、また今後も継続的な取り組みが必要だと感じていたので、正式な組織とすることにしました。

足元の取り組み

具体的には大きく3つの軸で活動する予定です。

細かい内容はまだまだ詰めている途中なものもあり、追加や変更あるとは思いますが、 一部詳細な内容も含めてご紹介できればと思います。

1. 社内活性化

こちらは組織活性化委員会時代からの引き継いだものが主になります。

「挑戦WEEK」、「TechTalk」、「勉強会」などがあります。 それぞれ、ブログにもなっておりますので、こちらも合わせて読んでいただけると嬉しいです!

tech.every.tv

2. 外部発信・コミュニティ貢献

昨年度よりテックブログの執筆推進を進め、半年で50~60本ほどの記事を上げることができる体制になってきました。今年はそれに加えて、技術だけじゃなく人や取り組みにフォーカスした記事の執筆なども増やしていきたいと考えています。

また、今年から国内カンファレンスへの協賛も積極的に行なっていくことで、国内の技術コミュニティへの貢献もしていきたいと考えております。さっそく6月のGoConferrenceへの協賛が決まりました!(これに関しては後日またきちんとご報告できればと思います。) このほか、勉強会の開催など、技術系のコミュニティへ積極的に貢献していきたいと考えておりますので、何か弊社で貢献できそうなことがあれば、ぜひ気軽に連絡いただけると嬉しいです。

3. 採用およびオンボーディング

課題にも書きましたが、エンジニアは全職能において絶賛採用中ではあるものの、あまり認知されてないという課題があります。上記の発信に加えて、採用面でも発信を強化するとともに、より会社の魅力が伝わるような会社説明資料の刷新から採用プロセスの見直し、リファラル採用のサポートなども進めています。

また、入社後早期に活躍できる仕組み作りにも取り組みたいと考えており、まずは4月に入社する新卒向けのオンボーディングプログラムを作成しています。

最後に

私たち DevEnable グループは、まだ発足したばかりですが、今後も「社内外から憧れる開発組織へ」というミッションの実現に向けて、様々な施策に取り組んでいきます。 何度も言いますが、弊社は全方位で積極採用中です!

DevEnableグループをおもしろうそうと思った方や、そんなグループが活躍してる組織で働きたいと思った方はぜひお話しましょう! corp.every.tv