こんにちは、BASE株式会社Data Strategyチームの杉です。
ショッピングアプリ「BASE」では、検索にAmazon Cloudsearchを使用していました。今回、検索基盤をAmazon Elasticsearch Service(以下、ES)に移行し、Data Strategyチームで管理をする方針にしました。
この記事では商品が更新された際などにどのように検知し、データをESにいれるようにしたかなど、基盤の部分をメインにご紹介をします。
1. 背景
検索は新しいショップに出会うきっかけを作ってくれたり、探していた商品をいち早く見つけることができることができます。
そのため、検索機能はどのECサイトなどでも見かける存在であり、活用している人も多いのではないでしょうか。
例えばショッピングアプリ「BASE」の検索はこのような画面になっています。
このようにさまざまな便利さをもっている検索機能ですが、ショッピングアプリ「BASE」では継続的な検索性能の改修、改善ができていないという問題がありました。
これらの課題に対し、今回検索基盤の移行を行うことで検索性能改善への第一歩を進めました。
2. 新基盤の移行について
今回の移行はAmazon CloudsearchからESへデータを移行するだけに見えます。
しかし、内部的には管理をData Strategyチームに移行するため、商品が更新された際に検知、データの取得や同期など全体的に新しく作り直す必要がありました。
また、実際に移行計画を進めていくと、様々な問題の壁に当たりました。
例えば
- Data Strategyチームの管理に変えるために参照するデータベースを変える必要がある
- データを全てロードし直す必要がある
- 商品更新時にできる限りリアルタイムに更新をしたいがどう実装するべきか
- 商品情報に関するテーブルがたくさんある
などが挙げられます。
これらの課題が存在したため、手探りでlogstashを試したりembulkでデータ同期を試みたりもしながら、現在の基盤を作りました。
2-1. システム構成
今回は商品検索のみの移行をしましたが、検索を行う上で商品情報に関するテーブルは多くあります。
また、これらのテーブルがそれぞれ更新が起きた際にESにもデータを同期させる必要があります。
これらを考慮し、最終的に以下のシステム構成で実装を行いました。
- 商品情報の更新
- 定期的に動いているbatchがS3から前回起動情報を取得
- データ更新の有無の確認
- 更新分のデータを取得
- ESのデータを更新
- S3へ更新後の情報を記録
という流れで動いています。これを短い時間で繰り返すことでリアルタイムに近い間隔でデータを更新することが実現できました。この「短い時間」は定期実行時間を何分毎と設定しているわけではなく、可能な最短時間で動かしています。
S3に入っている前回起動情報はデータ同期をしているテーブルのそれぞれの実行情報を記録しています。このような記録をし、都度取得をすることで急にDBの同期が上手くいかなくなった場合にも、自動で復旧するような仕組みになっています。
さらに、batchはデータの取得からESのデータ挿入まで全てPythonで作りました。このPythonでの実装時にいくつかの細かい設定をしました。
こちらはinsert時のコード例です。
es = Elasticsearch( ..., timeout=timeout # (1) ) retry_count = 0 while retry_count < retry_max: # (2) try: ... es.bulk(body) time.sleep(1) # (3) break except BaseException: ... time.sleep(1) retry_count += 1
(1) timeoutを明示的に書く
timeoutを書かない場合、デフォルトの秒数となります。しかし、実際に動かしてみると稀にtimeoutになることがありました。
bulk時にはサイズで区切っていたのですが、商品情報は商品説明文などが長いことがあり、想定よりサイズが大きくなってしまうことがありました。この際にtimeoutが発生してしまっていたのですが、伸ばすことでtimeoutでのエラーはなくなりました。
(2) retry処理をいれる
こちらも同様にbulk時の処理で稀に失敗することがありました。
しかし同じデータをもう一度試すと成功することも多かったため、retry処理をいれることで対応をしました。
(3) sleepをいれる
今回、商品情報が更新されたら更新分を全て更新する仕様です。こちらも頻繁に起こることではありませんが、ある時間に大量の商品情報更新がかかることもあります。
その際に短時間に何度もbulkを行うと、内部キューが溜まりすぎてしまうことがありました。上限値を超えた場合、破棄されてしまうため、データが欠損してしまう恐れがあります。
ESの設定で上限値を変更するという手段もありますが、上限値を変えても送る量が上限値を超えないという保証はなかったため、スピードを緩和させることで対応をしました。
このように、とても細かい部分の設定ではありますが、これらの処理をいれることでデータの欠損もなくスムーズにESにデータをいれるためのシステムを実装することができました。
2-2. 初期ロード
通常の商品が追加や更新された際のデータ更新はシステム構成で書いたような内容で行われています。
しかし、今回ESには商品情報がゼロの状態から始めるため、今までのデータを入れ直す必要がありました。
対応策としては、別途batchを作り初期ロード専用の作業を行いました。
初期ロードのbatchは以下のような流れでデータをいれるようにしました。
- メインの商品テーブル以外は一気に全部取得
- user_idもしくはitem_idごとにデータをまとめる
- 商品テーブルをID区切りでデータを取得し、2のデータとjoin
これをIDを変えながら繰り返すことで今までのデータを全部入れました。
通常時のESのinsertはupdateもしくはdeleteを使用していますが、初期ロードではindexを使い少しでも速くなるようにしました。
2-3. 例外テーブル
システム構成でも書いたように、今回商品に関するテーブルは多くあります。
テーブルに何かしらの変更に起きた際に更新をかけるという方法でうまくいかないテーブルも存在しました。
具体的には
- batchで計算した結果を格納するため同時刻に何十万、何百万のレコードがinsertされるテーブル
- 更新頻度がとても高く、約1分間に紐づくレコードが何十万、何百万存在するテーブル
があります。
これらのテーブルに関しては、上の処理とは別の処理も加えてテーブル内容の同期を行なっています。
2-3-1. batch計算結果を格納しているテーブル
こちらに関しては、常にinsertが走っているわけではなく、dailyやweeklyといった頻度であったことから、insert時に全てのデータを更新するのではなく、少しずつ更新をする方針にしました。
- batchでの計算結果の追加
- 定期的に動いているbatchがS3から前回データを取得
- ID区切りでデータを確認
- データを取得
- 前回データとの差分を確認
- 差分発生データのみESを更新
- S3へ更新後の情報を記録
大まかな流れは通常動いているシステムと同様ですが、S3に前回のデータを保存している点と更新分全てを取得するのではなく、ID区切りで他の更新の妨げにならない量に抑えている点が異なるポイントとなります。
このような工夫をすることで、他のテーブルの更新にも影響がでず、スムーズに更新をすることが可能になりました。
2-3-2. 更新頻度が高いテーブル
更新頻度が高いテーブルの問題点としては、同時刻に更新しなくてはいけないレコードが多く、best effortでの更新を行なっているとどんどん詰まっていき、全体の更新が遅くなるということがありました。
更新レコードが多く、更新も常に起こっているためbatch計算結果テーブルのようにIDで区切って少しずついれるようなこともできませんでした。
検討の対象となった更新頻度が高いテーブルでは、多くの情報が入っており、少しの更新でもレコード全体に更新がかかってしまっていました。
そのため、必要な情報のみをS3へ保存し、更新が起きた際にS3の情報と比較をし、必要な情報に更新が起きた際のみ、ESのデータも更新をする方法にしました。
S3に前回情報を保存し、毎回取得することはデータ更新の速度に影響してしまう可能性もありましたが、幸いにも本当に更新すべきデータがかなり減ったため、速度アップにつながりました。
おわり
今回は移行後の基盤についてをメインにご紹介しました。
このように実装を行い、現在のショッピングアプリ「BASE」では移行後のシステムで稼働をしています。
実際には基盤完成後にAPIのresponse timeが遅い問題の対応や、CTRの改善などを行なっています。こちらの詳細については来週ご紹介させていただきます。
移行を行うことで継続的な検索性能の改修、改善をしやすい環境を作ることができました。これからさらなる改善を行なっていきたいと思います。