はじめに
こんにちは!Data Platformチームでデータエンジニアとして働いている @shota.imazeki です。
分析基盤の構築・運用などの側面から社内のデータ活用の促進を行っています。
BASEではAurora MySQLにあるデータをEmbulkを用いてBigQueryに連携しています。BigQueryへ連携されたデータは分析基盤としてLookerなどを通して社内利用されています。
このデータ連携処理にはいくつかの課題があり、それを解決するためにEmbulkからAurora S3 Export機能を用いた連携処理に切り替えることにしましたので、それについて紹介していきたいと思います。
※この切り替えについては現状、試験的に一部のDBのみの切り替えとなっていますが、運用上の大きな課題が出てこなければ徐々に切り替えていく予定です。
切替前のデータ連携処理
先述した通り、BASEでは主にEmbulkを利用してAurora MySQLにあるデータをBigQueryに連携しています。以下のようなアーキテクチャです。
本番DBからレプリケーションされた分析用DBに対して、EmbulkからSQLをDaily実行する形でデータ連携を行なっています。ワークフロー管理にはAirflowを利用しています。
Embulkによる主な連携パターン
EmbulkからBigQueryにデータを連携する際は、テーブルの仕様やデータ量に応じて主に3パターンに分けています。
DELETE INSERT
BigQuery側で対象日付分を削除し、その後に対象日付分をINSERTするパターンです。事前に削除を行うのは処理の再実行などによって同じデータが入り込まないようにするためです。主にログデータなどの更新されないデータがこれに当てはまります。
REPLACE
テーブルの全量洗い替えです。更新や物理削除が行われるテーブルは基本的にこの方式になります。例えばユーザー情報などです。
MERGE
対象日付以後に更新のあったデータを対象に差分更新を行うパターンです。基本的にはREPLACEやDELETE INSERTで対応したいですが、テーブルのデータ量が大きい場合にこちらを選択します。例えば注文や発送関連のテーブルが該当します。
課題
Embulkを用いたデータ連携処理の課題について触れていきます。
レプリケーション遅延によるデータの不整合
連携元のデータとして本番DBからレプリケーションされてきた分析用DBのデータを使用しています。この分析用DBは別の処理としても使われていることがあり、その負荷などによってか時折レプリケーション遅延が発生します。この遅延が連携処理のDaily実行時に発生していると、本来更新されるべきはずのデータが更新されなくなります。これによって一部データの不整合が発生することがありました。
物理削除を検知できない
Embulkでは連携するデータをSQLで取り出すため、レコードが物理削除されていたとしても、それを検知することができません。なので基本的には物理削除が発生するテーブルについてはREPLACEで対応することになります。ただし連携元のテーブルのデータ量が大きすぎる場合、Embulkサーバーのディスク容量を圧迫してしまいます。そのため、物理削除が発生する且つテーブルのデータ量が大きいものに関しては連携が厳しい状態です(MERGEによって連携すること自体は可能だが信頼できるデータではなくなってしまう)。
select column1, column2, column3 from table where modified >= TARGET_DATE -- 更新日時が対象日以降
新しいデータ連携処理
上記のような課題を踏まえて、新たに構築したデータ連携処理が以下です。Aurora S3 Export機能を利用し、parquet形式でS3にスナップショットを出力したのちに、BigQuery Data Transfer Serviceを用いてS3から直接BigQueryへデータをインポートしています。
ここからはこの処理を採用する前に調査したことについて触れていきます。
調査・検証内容
Aurora S3 Export機能を採用するにあたって、調査や検証したことについて触れていきます。
出力元DBへの負荷
スナップショットとして出力するDB側への負荷がどれだけあるのかをまずは確認しました。AWSで公開されているドキュメントにて、以下の記述があるように、DBクラスターをクローンすることで出力元DBへの負荷は起きないようです。
Amazon Aurora は DB クラスターをクローンして、クローンからデータを抽出し、そのデータを Amazon S3 バケットに保存します。
ここについてはAWS SAの方に伺い、認識の齟齬がないかを確認させて頂きました。
連携処理にかかる時間
データ連携処理が深夜に開始して、翌朝の9時以降にまでかかってしまう場合は新しいデータ連携処理に切り替えることはできません。間に合うかどうかを確認するために、実際の分析用DBと同じインスタンスタイプ、同じデータ量のテーブルを2つ用意し、検証を行いました。選んだテーブルはEmbulkだと連携に1つ目が約15分、2つ目が約30分程度かかる重たいテーブルになります。各処理にかかった時間は以下の通りで、合計で30分程度です。
最初にインスタンスを起動する時間が20分と長いですが、これはDB Cluster単位で必ず最初に行われるものになっています。その後続の処理はいずれも数分で終わっており、全体として30分程度でEmbulkを用いた時と大差ない時間です。最初の起動時間を考慮しなくていいのであればEmbulkよりも短い時間でデータ連携が可能になりそうです(Exportするテーブル数やEmbulk側での並列実行数次第)。この結果から新しいデータ連携処理に切り替えても時間がかかりすぎることはないだろうと判断しました。
コスト
コストについても念の為確認しておきましたが、大きくかかりすぎるということはないだろうという判断に至りました。
- Aurora S3 Export: 100GBで$1.2~1.3程度
- S3からのデータ転送(アウト): 100GBで$11.4程度
- parquet形式でデータ圧縮されるため、思ったほどのコストにはならないと考えてます
- BigQuery Data Transfer Service: $0
実装時のポイント
ここからは新しいデータ連携処理を実装していくにあたって考慮が必要だった点について触れていきます。
Aurora S3 Export
今回はboto3を用いてS3 Exportの処理をキックすることにしました。
タスク識別子(ExportTaskIdentifier)に日付を入れる
S3 Export処理をキックするには以下のparameterを指定してstart_export_taskを実行します。
response = client.start_export_task( ExportTaskIdentifier='string', SourceArn='string', S3BucketName='string', IamRoleArn='string', KmsKeyId='string', S3Prefix='string', ExportOnly=[ 'string', ] )
ExportTaskIdentifierには任意のtask idを指定する必要があるのですが、以前実行した時と同じ値を指定することはできません。またこのExportTaskIdentifier(及びS3Prefix)はS3 Bucket内のURIにもなります。そのため、ExportTaskIdentifierの値の末尾にyyyymmdd形式で日付を入れることにしました。イメージは以下です。
# 理想はこちら。 s3://S3BucketName/table_name/.../*.parquet # 実際はこちら。S3 URIが日付で異なる。 s3://S3BucketName/ExportTaskIdentifier_yyyymmdd/table_name/.../*.parquet
これ自体は大したポイントではないのですが、後続のBigQuery Data Transfer Serviceにおいて考慮すべき点になります。
S3 Export処理の完了を定期的に確認する
start_export_task関数を実行すると実行指令が飛んだ直後にresponseが返ってくるため、Export処理が完了したかどうかが分かりません。完了を確認するためにはdescribe_export_tasksを数分おきに実行します。
response = client.describe_export_tasks( ExportTaskIdentifier='string', SourceArn='string' )
今回、describe_export_tasksを定期的に実行するのにAirflowのSensorクラスを使うことにしました。以下に実装例を示しておきます。このように定義することで5分間隔(poke_interval)で10回(timeout)、python_callableに渡した関数が実行されます。その関数でdescribe_s3_export_taskを実行させ、trueが返って来れば後続の処理に進むという形です。
def poke_describe_s3_export_task(**kwargs): # describe_s3_export_taskを実行 # 完了したかどうかでtrue, falseで返す return true poke_describe_s3_export_task = PythonSensor( task_id="poke_describe_s3_export_task", mode='poke', poke_interval=300, timeout=300 * 10, soft_fail=False, python_callable=poke_describe_s3_export_task, provide_context=True, dag=dag )
BigQuery Data Transfer Service
S3にparquet形式でデータが出力されれば、次にS3からBigQueryにインポートする処理を行います。今回はGoogle Cloud CLI コマンドでの実装としました。主に [bq mk](https://cloud.google.com/bigquery/docs/reference/bq-cli-reference?hl=ja#bq_mk)
コマンドを使っています。--transfer_run
****を指定することでBigQuery Data Transfer Serviceを実行することができます。
bq mk --transfer_run [--run_time=RUN_TIME | --start_time=START_TIME --end_time=END_TIME] CONFIG
runtime parameterによるS3 URIの日付指定
BigQuery Data Transfer Serviceの転送設定を定義する時にS3 URIを指定するのですが、上述した通り、S3 URIは日付別に異なる点を考慮しなければなりません。その時に使うのがruntime parameterです。これを使うことで日付部分を動的に変更することができます。{run_time-9h|"%Y%m%d"}のように実行時刻から時間をずらしたり、フォーマットを変更することができます。例えばruntimeが 2024-04-01 05:00:00
の場合に {run_time-9h|"%Y%m%d"}
を指定すると 20240331
という値になります。
# 定義する際のS3 URI s3://S3BucketName/ExportTaskIdentifier_{run_time-9h|"%Y%m%d"}/table_name/*.parquet # 実際に参照されるS3 URI(2024-04-01 05:00:00に実行された場合) s3://S3BucketName/ExportTaskIdentifier_20240331/table_name/*.parquet
今後の課題
試験的な運用からの脱却
負荷やコスト、連携にかかる時間が試算でしかできないので、一部のDB(プロダクトへの影響が極力低いDBを選定)に限定しての運用になっています。今後は他のDBへの展開を行っていきたいと思っています。
Airflowのバージョンアップ
現状、boto3やGoogle Cloud CLI コマンドでの実装となっていますが実はAirflowには以下のようなオペレーターがあり、boto3などを使わずとも実装が可能です。
今回使わなかった理由はBASEのAirflowサーバーのバージョンが低くて利用ができなかったからです。マネージド(Amazon Managed Workflows for Apache Airflow)への移行を考えていたり、バージョンを上げるための他ワークフローへの影響範囲が確認できていないため、一旦は今回の処理の実装を優先しました。今後はより実装をシンプルにするためにAirflowのバージョン上げを行い、上記オペレーターに切り替えていきたいと思っています。
最後に
試験運用ではありますが、データ連携処理の切り替えを行うことができました。これによってより多くのデータを分析基盤に集めていくことが可能になったと思っています。またEmbulkへの負荷が下がったおかげで全体としての連携時間も早まっています。
最後となりますが、弊社ではデータエンジニアを募集しています。上記で述べた課題以外にもBASEの分析基盤には多くの課題があって、とてもやりがいのある仕事かなと思っております。ご興味のある方は気軽にご応募ください!