こんにちは、ラクマでサーバサイドエンジニアをやっている飯塚です。
最近は、バーピーチャレンジというダイエットトレーニングをやり始めました。1日たった4分の運動で済むと聞いてやり始めたのですが、、、自分にはかなり過酷です。ご興味ある方、是非やってみてください。
さて、今回は数千万レコード以上をもつテーブルに対して、インデックスを持たないカラムに対して検索を行ったときのお話をします。
作業はtemp_aテーブルからインデックスを持たないflag_tmpカラムの値が1のレコードをすべて抽出するというものでした。
もしレコード数が少ないテーブルであれば
select * from temp_a where flag_tmp = 1
のように単純な全件検索のSQLで検索結果が瞬時に出るかと思います。
しかし、レコード数が数千万以上あるテーブルに対して同じことをしようとするとサーバにディスクI/Oによる高負荷を与えてしまいます。
今回、開発チーム全員で共有して利用しているDBに対して検索を行っていたため、自分のSQLでみんなに迷惑を掛けることわけにはいきませんでした。
うーん、困った。
SQLをidの範囲で分割してみました
そこで、まずはプライマリキーを使ってテーブルを分割する方法を考えました。
ラクマではRuby on Railsでモデル定義をしたテーブル構成になるため、デフォルトでidという名前のカラムがプライマリキーとして設定されています。
このidカラムで抽出範囲を絞ることで下図のようにSQLを水平分割してみることにしました。
上の図では、SQLの抽出範囲を絞ることでサーバ負荷は軽減されました。
ただ、1つのSQLを水平分割(5,000万レコードの例ですと5,000個のSQLを実施)して、最終的に結合までの一連の処理を人力で行うのでは無理があります。
そこで、今回は一連の処理をプログラミングすることにしました。
言語は、pandasというデータ処理が簡単に実施できる強力なライブラリがあるPythonで実施してみることにしました。
処理に時間がかかるので並列処理をつかってみました
Pythonで一連の処理はできるようになったのですが、思ったよりも全体の処理が終わるまで時間が掛かってしまいました。
下図のように逐次処理のため全てのSQLの実施時間の総和分だけ結果が出るまで待たなければなりませんでした。
これは辛い。。。
そこでDBサーバの負荷が許される程度のセッション数で並列処理を実施してみることにしました。
今回はmultiprocessingモジュールをつかって並列処理を実現しました。
サンプルコード
今回のPythonコードの一部をここで公開します。
MySQLの接続
環境依存ではありますがsshトンネルを使ってMySQLサーバに接続しています。
sshトンネルは https://pypi.org/project/sshtunnel/ こちらのsshtunnelパッケージを使用しました。
設定はconfigparserパッケージを使ってconfig.iniファイルで設定するようにしています。
import pymysql import paramiko from sshtunnel import SSHTunnelForwarder import pandas as pd from multiprocessing import Pool import os import configparser import sys ## config.iniファイルを同じディレクトリに置いてください ## ##[SERVER] ##PRIVATE_KEY_FILE_PATH=sshのプライベートキーのpath ##SQL_HOSTNAME= ##SQL_USERNAME= ##SQL_PASSWORD= ##SQL_DATABASE= ##SQL_PORT= ##SSH_HOST= ##SSH_USER= ##SSH_PORT= ##POOL_SIZE= def mysqlconnect(): private_key_file_path = config_server['PRIVATE_KEY_FILE_PATH'] mypkey = paramiko.RSAKey.from_private_key_file(private_key_file_path) tunnel = SSHTunnelForwarder( (config_server['SSH_HOST'], int(config_server['SSH_PORT'])), ssh_username = config_server['SSH_USER'], ssh_pkey = mypkey, remote_bind_address = (config_server['SQL_HOSTNAME'], int(config_server['SQL_PORT']))) tunnel.start() conn[os.getpid()] = pymysql.connect(host='127.0.0.1', user=config_server['SQL_USERNAME'], passwd=config_server['SQL_PASSWORD'], db=config_server['SQL_DATABASE'], port=tunnel.local_bind_port)
並列処理部分
今回はプロセスベースの並列処理を取り入れてみました。
プロセスプール生成時にmysqlconnect関数を呼び出しています。
プールサイズはconfig.iniファイルにPOOL_SIZEとして設定するようにしました。
execQueryという処理を非同期に複数プロセスで実施します。
処理結果がresultsに格納されます。
poolSize = int(config_server['POOL_SIZE']) pool = Pool(poolSize, initializer=mysqlconnect) multiple_results = [pool.apply_async(execQuery, (i, )) for i in range(len(idsArr))] results = [res.get() for res in multiple_results]
分割SQL処理部分
分割したSQLはpandasのread_sql_queryを呼び出すと実行結果がpandas DataFrameに格納されます。
def execQuery(n): params = { 'startId': idsArr[n][0], 'endId': idsArr[n][1] } query = query() return pd.read_sql_query( sql = query, con = conn[os.getpid()], params = params, ) def query(): return """ select * from temp_a where flag_tmp = 1 and id between %(startId)s and %(endId)s """
idsArrにはbetween句の範囲を配列で前もって指定しています。
下の例では、[1, 10000], [10001, 20000], ... , [49990001, 50000000]を格納しています。 (この辺、もう少し改良の余地はありそうですが、ひとまず動くものを作ってみました。)
idsArr = [] ID_MAX = 50000000 GET_NUM = 10000 endId = 0 while (endId < ID_MAX): startId = endId + 1 endId = startId + GET_NUM - 1 if(endId > ID_MAX): endId = ID_MAX idsArr.append([startId, endId])
CSV出力
最終的にpandasのDataFrameに縦方向の連結をして、csvファイルresult.csvファイルに出力しています。
resultsDf = pd.concat(results) resultsDf.to_csv('result.csv', index=False, encoding='utf_8_sig')
もうちょっと楽がしたいですね…
Python pandas、multiprocessingを使うことで、並列処理+SQL処理結果の結合を簡単に行うことができました。 DBへ比較的低負荷+早く結果を取得できるようになりました。
しかし、分析のためのデータ抽出なら独自の方法に頼らず、もう少し楽にいきたいですよね。
AWS Athena使ってみました
そこでラクマのデータベースサービスはAWSのRDS(Aurora)サービスを利用しているため、RDSデータをAmazon S3に保持して、S3上のデータをAmazon AthenaでSQL検索してみることにしました。
Athenaを使うと以下の悩みを解消してくれます。
- INDEXを考慮せずにSQLを実施したい。
- 勝手に並列処理してほしい。
- DBの負荷を考慮したくない。 実施内容は下記の通りです。
RDSのDBスナップショットを取得
- Amazon RDSサービスを選択
- [RDS > データベース]画面でDBスナップショットを取るインスタンスを選択
- アクションで[スナップショットの取得]を選択
- [スナップショットの取得]画面でスナップショット名を設定して、[スナップショットの取得]ボタンをクリック
DBスナップショットをS3へエクスポート
- [RDS>スナップショット]画面で取得したスナップショットを選択
- アクションで[Amazon S3へエクスポート]を選択
- [Amazon S3へのエクスポート]画面で、エクスポート先を設定します。対象テーブルを限定したい場合は、[エクスポートされたデータ]項目の[エクスポートデータの量]を一部に設定して、schema.テーブル名で指定します。
- [Amazon S3へのエクスポート]ボタンをクリックします。
AthenaでS3へエクスポートされたデータのテーブル作成
Athena QueryエディタでテーブルをS3にエクスポートされたデータからテーブルを定義します。
実際にSQL検索してみました
フルテーブルスキャン最高です。
初めからこれやっておけばよかったと後悔しました。
まとめ
今回データをDBの負荷をかけずに取得する方法をいろいろと試してみました。
最終的にAWS Athenaを使うことで楽して検索することができました。
こちら参考程度ではありますが、それぞれかかった時間は、以下の通りです。
- AuroraのDB(約463GB)のスナップショット作成 5分程度
- スナップショットをS3にエクスポート 50分程度
リアルタイムにデータ解析したい場合に、この方法では、時間にズレが生じてしまいます。
データポンプなどシステムを構築してDB差分ができるだけ出ないような工夫が必要になるかとおもいます。
いずれにしましても分析は抽出後の作業も重要になりますので、それまでの作業が簡素化できたのはとてもよかったと思います。