2015-12-01 8 views
15

私はPythonを使用して解析してからPython-MySQLコネクタを使用してMySQLデータベースに挿入する約60GBのJSONファイルを持っています。各JSONファイルは約500MBです秒あたりのInnoDB書き込み数が少ない - AWS EC2からPythonを使用したMySQL RDSへ

60GBのJSONデータを保持するために、セカンダリボリュームを持つAWS r3.xlarge EC2インスタンスを使用しています。

次に、AWS RDS r3.xlarge MySQLインスタンスを使用しています。これらのインスタンスはすべて同じ地域と可用性ゾーンにあります。 EC2インスタンスは次のPythonスクリプトを使用してJSONを読み込み、解析してからMySQL RDSに挿入します。私のpython:

MySQL WorkBench Output

:私はそれを見ることができるのMySQL Workbenchを使用して、MySQLデータベースについて htop of python process

:以下見ることができるのLinuxインスタンスIにhtopの使用

import json 
import mysql.connector 
from mysql.connector import errorcode 
from pprint import pprint 
import glob 
import os 

os.chdir("./json_data") 

for file in glob.glob("*.json"): 
    with open(file, 'rU') as data_file: 
     results = json.load(data_file) 
     print('working on file:', file) 

    cnx = mysql.connector.connect(user='', password='', 
     host='') 

    cursor = cnx.cursor(buffered=True) 

    DB_NAME = 'DB' 

    def create_database(cursor): 
     try: 
      cursor.execute(
       "CREATE DATABASE {} DEFAULT CHARACTER SET 'utf8'".format(DB_NAME)) 
     except mysql.connector.Error as err: 
      print("Failed creating database: {}".format(err)) 
      exit(1) 

    try: 
     cnx.database = DB_NAME  
    except mysql.connector.Error as err: 
     if err.errno == errorcode.ER_BAD_DB_ERROR: 
      create_database(cursor) 
      cnx.database = DB_NAME 
     else: 
      print(err) 
      exit(1) 

    add_overall_data = ("INSERT INTO master" 
     "(_sent_time_stamp, dt, ds, dtf, O_l, O_ln, O_Ls, O_a, D_l, D_ln, d_a)" 
     "VALUES (%(_sent_time_stamp)s, %(dt)s, %(ds)s, %(dtf)s, %(O_l)s, %(O_ln)s, %(O_Ls)s, %(O_a)s, %(D_l)s, %(D_ln)s, %(d_a)s)") 

    add_polyline = ("INSERT INTO polyline" 
     "(Overview_polyline, request_no)" 
     "VALUES (%(Overview_polyline)s, %(request_no)s)") 

    add_summary = ("INSERT INTO summary" 
     "(summary, request_no)" 
     "VALUES (%(summary)s, %(request_no)s)") 

    add_warnings = ("INSERT INTO warnings" 
     "(warnings, request_no)" 
     "VALUES (%(warnings)s, %(request_no)s)") 

    add_waypoint_order = ("INSERT INTO waypoint_order" 
     "(waypoint_order, request_no)" 
     "VALUES (%(waypoint_order)s, %(request_no)s)") 

    add_leg_data = ("INSERT INTO leg_data" 
     "(request_no, leg_dt, leg_ds, leg_O_l, leg_O_ln, leg_D_l, leg_D_ln, leg_html_inst, leg_polyline, leg_travel_mode)" 
     "VALUES (%(request_no)s, %(leg_dt)s, %(leg_ds)s, %(leg_O_l)s, %(leg_O_ln)s, %(leg_D_l)s, %(leg_D_ln)s, %(leg_html_inst)s, %(leg_polyline)s, %(leg_travel_mode)s)") 
    error_messages = [] 
    for result in results: 
     if result["status"] == "OK": 
      for leg in result['routes'][0]['legs']: 
       try: 
        params = { 
        "_sent_time_stamp": leg['_sent_time_stamp'], 
        "dt": leg['dt']['value'], 
        "ds": leg['ds']['value'], 
        "dtf": leg['dtf']['value'], 
        "O_l": leg['start_location']['lat'], 
        "O_ln": leg['start_location']['lng'], 
        "O_Ls": leg['O_Ls'], 
        "O_a": leg['start_address'], 
        "D_l": leg['end_location']['lat'], 
        "D_ln": leg['end_location']['lng'], 
        "d_a": leg['end_address'] 
        } 
        cursor.execute(add_overall_data, params) 
        query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s') 
        O_l = leg['start_location']['lat'] 
        O_ln = leg['start_location']['lng'] 
        D_l = leg['end_location']['lat'] 
        D_ln = leg['end_location']['lng'] 
        _sent_time_stamp = leg['_sent_time_stamp'] 
        cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp)) 
        request_no = cursor.fetchone()[0] 
       except KeyError, e: 
        error_messages.append(e) 
        params = { 
        "_sent_time_stamp": leg['_sent_time_stamp'], 
        "dt": leg['dt']['value'], 
        "ds": leg['ds']['value'], 
        "dtf": "000", 
        "O_l": leg['start_location']['lat'], 
        "O_ln": leg['start_location']['lng'], 
        "O_Ls": leg['O_Ls'], 
        "O_a": 'unknown', 
        "D_l": leg['end_location']['lat'], 
        "D_ln": leg['end_location']['lng'], 
        "d_a": 'unknown' 
        } 
        cursor.execute(add_overall_data, params) 
        query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s') 
        O_l = leg['start_location']['lat'] 
        O_ln = leg['start_location']['lng'] 
        D_l = leg['end_location']['lat'] 
        D_ln = leg['end_location']['lng'] 
        _sent_time_stamp = leg['_sent_time_stamp'] 
        cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp)) 
        request_no = cursor.fetchone()[0] 
      for overview_polyline in result['routes']: 
       params = { 
       "request_no": request_no, 
       "Overview_polyline": overview_polyline['overview_polyline']['points'] 
       } 
       cursor.execute(add_polyline, params) 
       query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s') 
       O_l = leg['start_location']['lat'] 
       O_ln = leg['start_location']['lng'] 
       D_l = leg['end_location']['lat'] 
       D_ln = leg['end_location']['lng'] 
       _sent_time_stamp = leg['_sent_time_stamp'] 
       cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp)) 
       request_no = cursor.fetchone()[0] 
      for summary in result['routes']: 
       params = { 
       "request_no": request_no, 
       "summary": summary['summary'] 
       } 
       cursor.execute(add_summary, params) 
       query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s') 
       O_l = leg['start_location']['lat'] 
       O_ln = leg['start_location']['lng'] 
       D_l = leg['end_location']['lat'] 
       D_ln = leg['end_location']['lng'] 
       _sent_time_stamp = leg['_sent_time_stamp'] 
       cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp)) 
       request_no = cursor.fetchone()[0] 
      for warnings in result['routes']: 
       params = { 
       "request_no": request_no, 
       "warnings": str(warnings['warnings']) 
       } 
       cursor.execute(add_warnings, params) 
       query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s') 
       O_l = leg['start_location']['lat'] 
       O_ln = leg['start_location']['lng'] 
       D_l = leg['end_location']['lat'] 
       D_ln = leg['end_location']['lng'] 
       _sent_time_stamp = leg['_sent_time_stamp'] 
       cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp)) 
       request_no = cursor.fetchone()[0] 
      for waypoint_order in result['routes']: 
       params = { 
       "request_no": request_no, 
       "waypoint_order": str(waypoint_order['waypoint_order']) 
       } 
       cursor.execute(add_waypoint_order, params) 
       query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s') 
       O_l = leg['start_location']['lat'] 
       O_ln = leg['start_location']['lng'] 
       D_l = leg['end_location']['lat'] 
       D_ln = leg['end_location']['lng'] 
       _sent_time_stamp = leg['_sent_time_stamp'] 
       cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp)) 
       request_no = cursor.fetchone()[0] 
      for steps in result['routes'][0]['legs'][0]['steps']: 
       params = { 
       "request_no": request_no, 
       "leg_dt": steps['dt']['value'], 
       "leg_ds": steps['ds']['value'], 
       "leg_O_l": steps['start_location']['lat'], 
       "leg_O_ln": steps['start_location']['lng'], 
       "leg_D_l": steps['end_location']['lat'], 
       "leg_D_ln": steps['end_location']['lng'], 
       "leg_html_inst": steps['html_instructions'], 
       "leg_polyline": steps['polyline']['points'], 
       "leg_travel_mode": steps['travel_mode'] 
       } 
       cursor.execute(add_leg_data, params) 
     cnx.commit() 
    print('error messages:', error_messages) 
    cursor.close() 
    cnx.close() 
    print('finished' + file) 

このpythonスクリプトは数日間悩まされていますが、MySQLのデータの約20%しかMySQLに挿入していません。

私の質問 - どのようにボトルネックを特定できますか?それはPythonスクリプトですか?少量のメモリを使用しているようですが、これを増やすことはできますか?私は(How to improve the speed of InnoDB writes per second of MySQL DB)あたりとしてInnoDBのバッファプールのサイズをチェックし、それが大規模であることが判明している:私は、私は信じていません同じ地域にRDSとEC2インスタンスを使用しておりますので

SELECT @@innodb_buffer_pool_size; 
+---------------------------+ 
| @@innodb_buffer_pool_size | 
+---------------------------+ 
|    11674845184 | 
+---------------------------+ 

ネットワークのボトルネックがあります。私はどこで最大の貯蓄を探すべきかについての指針は大歓迎です!

EDIT

私は、私が問題につまずいているかもしれないと思います。解析中の効率化のために、JSONの各レベルを別々に記述しています。しかし、次に、より高いレベルのJSONのネストされた部分を照合するためにクエリを実行する必要があります。このクエリは、小さなデータベースを使用するとオーバーヘッドが低くなります。私はインサートの速度がこのdbで劇的に減少していることに気づいた。これは、JSONデータを適切に接続するために、大規模で絶えず増え続けるデータベースを検索する必要があるためです。私は、Pythonスクリプト内の任意のテーブル定義を見ることができない

私はそれを待っているよりも、この他を解決する方法がわからないです....

+1

EC2とRDSは同じ地域にあると述べました。それらも同じ可用性ゾーンに入っていますか?もしそうでなければ、それはそれ以上の改善を見るのは非常に簡単な方法かもしれません。 –

+0

はい - それと考えています。両方が同じ可用性ゾーンにあります – LearningSlowly

+0

RDSインスタンスでプロビジョニングされたIOPを試しましたか? – mickzer

答えて

1

....しかし、我々は大規模なデータ操作を試してみませんか - MySQLにロードする際には、常にデータベースインデックスを無効にします。制約/外部キーの適用もありますが、これもロードするときは無効にする必要があります。

自動コミットは、Connector/Pythonで接続するときにデフォルトで無効になっています。

しかし、私はすべてのコミットを見ることができない -

あなた現在のコード内のオプションを要約すると

を無効/

(ロード用の)削除 - インデックス
- 制約 - 外部キー - トリガー

ローディングプログラム

- 無効に自動コミット - n個のレコードが(Nが使用可能な、あなたのバッファサイズに依存します)今までにコミット

1

私englistが悪い

私はこの作業を行う場合、私意志

  1. 使用mysqインプツールをtxtをするためのpythonコンバートJSONを使用して、インポートTXTは、MySQLへ

あなたはPythonの+のMySQLのallinoneを行う必要がある場合、i「はmaster'multiple発生からrequest_noを選択する理由

insert table values(1),value(2)...value(xxx) 

を使用することをお勧め、私のenglistは非常にpoor.soあるJSON

から読まれるべきです。..

0

この情報があれば、スクリプトの両方がほとんどアイドル状態であるように見えます。 MySQLレベルで何かを調整するのは時期尚早です。

あなたのプログラムが何をしているかをより詳細に把握する必要があります。

各クエリの所要時間、取得するエラーの数などを記録します。

これらのSELECTsは、問題が発生した場合でも、パフォーマンスを上げるためにインデックスを追加する必要があります。

関連する問題