私はPythonを使用して解析してからPython-MySQLコネクタを使用してMySQLデータベースに挿入する約60GBのJSONファイルを持っています。各JSONファイルは約500MBです秒あたりのInnoDB書き込み数が少ない - AWS EC2からPythonを使用したMySQL RDSへ
60GBのJSONデータを保持するために、セカンダリボリュームを持つAWS r3.xlarge EC2インスタンスを使用しています。
次に、AWS RDS r3.xlarge MySQLインスタンスを使用しています。これらのインスタンスはすべて同じ地域と可用性ゾーンにあります。 EC2インスタンスは次のPythonスクリプトを使用してJSONを読み込み、解析してからMySQL RDSに挿入します。私のpython:
:私はそれを見ることができるのMySQL Workbenchを使用して、MySQLデータベースについて
:以下見ることができるのLinuxインスタンスIにhtopの使用
import json
import mysql.connector
from mysql.connector import errorcode
from pprint import pprint
import glob
import os
os.chdir("./json_data")
for file in glob.glob("*.json"):
with open(file, 'rU') as data_file:
results = json.load(data_file)
print('working on file:', file)
cnx = mysql.connector.connect(user='', password='',
host='')
cursor = cnx.cursor(buffered=True)
DB_NAME = 'DB'
def create_database(cursor):
try:
cursor.execute(
"CREATE DATABASE {} DEFAULT CHARACTER SET 'utf8'".format(DB_NAME))
except mysql.connector.Error as err:
print("Failed creating database: {}".format(err))
exit(1)
try:
cnx.database = DB_NAME
except mysql.connector.Error as err:
if err.errno == errorcode.ER_BAD_DB_ERROR:
create_database(cursor)
cnx.database = DB_NAME
else:
print(err)
exit(1)
add_overall_data = ("INSERT INTO master"
"(_sent_time_stamp, dt, ds, dtf, O_l, O_ln, O_Ls, O_a, D_l, D_ln, d_a)"
"VALUES (%(_sent_time_stamp)s, %(dt)s, %(ds)s, %(dtf)s, %(O_l)s, %(O_ln)s, %(O_Ls)s, %(O_a)s, %(D_l)s, %(D_ln)s, %(d_a)s)")
add_polyline = ("INSERT INTO polyline"
"(Overview_polyline, request_no)"
"VALUES (%(Overview_polyline)s, %(request_no)s)")
add_summary = ("INSERT INTO summary"
"(summary, request_no)"
"VALUES (%(summary)s, %(request_no)s)")
add_warnings = ("INSERT INTO warnings"
"(warnings, request_no)"
"VALUES (%(warnings)s, %(request_no)s)")
add_waypoint_order = ("INSERT INTO waypoint_order"
"(waypoint_order, request_no)"
"VALUES (%(waypoint_order)s, %(request_no)s)")
add_leg_data = ("INSERT INTO leg_data"
"(request_no, leg_dt, leg_ds, leg_O_l, leg_O_ln, leg_D_l, leg_D_ln, leg_html_inst, leg_polyline, leg_travel_mode)"
"VALUES (%(request_no)s, %(leg_dt)s, %(leg_ds)s, %(leg_O_l)s, %(leg_O_ln)s, %(leg_D_l)s, %(leg_D_ln)s, %(leg_html_inst)s, %(leg_polyline)s, %(leg_travel_mode)s)")
error_messages = []
for result in results:
if result["status"] == "OK":
for leg in result['routes'][0]['legs']:
try:
params = {
"_sent_time_stamp": leg['_sent_time_stamp'],
"dt": leg['dt']['value'],
"ds": leg['ds']['value'],
"dtf": leg['dtf']['value'],
"O_l": leg['start_location']['lat'],
"O_ln": leg['start_location']['lng'],
"O_Ls": leg['O_Ls'],
"O_a": leg['start_address'],
"D_l": leg['end_location']['lat'],
"D_ln": leg['end_location']['lng'],
"d_a": leg['end_address']
}
cursor.execute(add_overall_data, params)
query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s')
O_l = leg['start_location']['lat']
O_ln = leg['start_location']['lng']
D_l = leg['end_location']['lat']
D_ln = leg['end_location']['lng']
_sent_time_stamp = leg['_sent_time_stamp']
cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp))
request_no = cursor.fetchone()[0]
except KeyError, e:
error_messages.append(e)
params = {
"_sent_time_stamp": leg['_sent_time_stamp'],
"dt": leg['dt']['value'],
"ds": leg['ds']['value'],
"dtf": "000",
"O_l": leg['start_location']['lat'],
"O_ln": leg['start_location']['lng'],
"O_Ls": leg['O_Ls'],
"O_a": 'unknown',
"D_l": leg['end_location']['lat'],
"D_ln": leg['end_location']['lng'],
"d_a": 'unknown'
}
cursor.execute(add_overall_data, params)
query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s')
O_l = leg['start_location']['lat']
O_ln = leg['start_location']['lng']
D_l = leg['end_location']['lat']
D_ln = leg['end_location']['lng']
_sent_time_stamp = leg['_sent_time_stamp']
cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp))
request_no = cursor.fetchone()[0]
for overview_polyline in result['routes']:
params = {
"request_no": request_no,
"Overview_polyline": overview_polyline['overview_polyline']['points']
}
cursor.execute(add_polyline, params)
query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s')
O_l = leg['start_location']['lat']
O_ln = leg['start_location']['lng']
D_l = leg['end_location']['lat']
D_ln = leg['end_location']['lng']
_sent_time_stamp = leg['_sent_time_stamp']
cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp))
request_no = cursor.fetchone()[0]
for summary in result['routes']:
params = {
"request_no": request_no,
"summary": summary['summary']
}
cursor.execute(add_summary, params)
query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s')
O_l = leg['start_location']['lat']
O_ln = leg['start_location']['lng']
D_l = leg['end_location']['lat']
D_ln = leg['end_location']['lng']
_sent_time_stamp = leg['_sent_time_stamp']
cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp))
request_no = cursor.fetchone()[0]
for warnings in result['routes']:
params = {
"request_no": request_no,
"warnings": str(warnings['warnings'])
}
cursor.execute(add_warnings, params)
query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s')
O_l = leg['start_location']['lat']
O_ln = leg['start_location']['lng']
D_l = leg['end_location']['lat']
D_ln = leg['end_location']['lng']
_sent_time_stamp = leg['_sent_time_stamp']
cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp))
request_no = cursor.fetchone()[0]
for waypoint_order in result['routes']:
params = {
"request_no": request_no,
"waypoint_order": str(waypoint_order['waypoint_order'])
}
cursor.execute(add_waypoint_order, params)
query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s')
O_l = leg['start_location']['lat']
O_ln = leg['start_location']['lng']
D_l = leg['end_location']['lat']
D_ln = leg['end_location']['lng']
_sent_time_stamp = leg['_sent_time_stamp']
cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp))
request_no = cursor.fetchone()[0]
for steps in result['routes'][0]['legs'][0]['steps']:
params = {
"request_no": request_no,
"leg_dt": steps['dt']['value'],
"leg_ds": steps['ds']['value'],
"leg_O_l": steps['start_location']['lat'],
"leg_O_ln": steps['start_location']['lng'],
"leg_D_l": steps['end_location']['lat'],
"leg_D_ln": steps['end_location']['lng'],
"leg_html_inst": steps['html_instructions'],
"leg_polyline": steps['polyline']['points'],
"leg_travel_mode": steps['travel_mode']
}
cursor.execute(add_leg_data, params)
cnx.commit()
print('error messages:', error_messages)
cursor.close()
cnx.close()
print('finished' + file)
このpythonスクリプトは数日間悩まされていますが、MySQLのデータの約20%しかMySQLに挿入していません。
私の質問 - どのようにボトルネックを特定できますか?それはPythonスクリプトですか?少量のメモリを使用しているようですが、これを増やすことはできますか?私は(How to improve the speed of InnoDB writes per second of MySQL DB)あたりとしてInnoDBのバッファプールのサイズをチェックし、それが大規模であることが判明している:私は、私は信じていません同じ地域にRDSとEC2インスタンスを使用しておりますので
SELECT @@innodb_buffer_pool_size;
+---------------------------+
| @@innodb_buffer_pool_size |
+---------------------------+
| 11674845184 |
+---------------------------+
ネットワークのボトルネックがあります。私はどこで最大の貯蓄を探すべきかについての指針は大歓迎です!
EDIT
私は、私が問題につまずいているかもしれないと思います。解析中の効率化のために、JSONの各レベルを別々に記述しています。しかし、次に、より高いレベルのJSONのネストされた部分を照合するためにクエリを実行する必要があります。このクエリは、小さなデータベースを使用するとオーバーヘッドが低くなります。私はインサートの速度がこのdbで劇的に減少していることに気づいた。これは、JSONデータを適切に接続するために、大規模で絶えず増え続けるデータベースを検索する必要があるためです。私は、Pythonスクリプト内の任意のテーブル定義を見ることができない
私はそれを待っているよりも、この他を解決する方法がわからないです....
EC2とRDSは同じ地域にあると述べました。それらも同じ可用性ゾーンに入っていますか?もしそうでなければ、それはそれ以上の改善を見るのは非常に簡単な方法かもしれません。 –
はい - それと考えています。両方が同じ可用性ゾーンにあります – LearningSlowly
RDSインスタンスでプロビジョニングされたIOPを試しましたか? – mickzer