2016-06-30 11 views
3

4つのデータベースからそれぞれ4つのテーブルを読み取るプロセスがあります。私は4つのテーブル合計と1 postgresデータベースにそのデータを統合しています。 (元の4つのデータベースのそれぞれは、統合する必要のある同じ4つのテーブルを持っています)。psycopg2のCOPYコマンド

私がやっているやり方は今、パンダを使っています。一度に4つのデータベースすべてから1つのテーブルを読み込み、データを1つのデータフレームに連結してから、to_sqlを使用してpostgresデータベースに保存します。次に、残りのデータベースにループスルーして、他のテーブルと同じことを行います。

私の問題はスピードです。私のテーブルの1つに日付ごとに約1〜2milの行があるので、ポストグルにデータを書き終えるのに約5,000〜6,000秒かかります。 .csvファイルに書き込んでから、pgadminでCOPY FROMを使う方がはるかに簡単です。

ここは私の現在のコードです。いくつかの関数呼び出しがありますが、基本的にテーブル名を参照していることに注意してください。基本的なロギングもやっていますが、あまり必要ではありません。私は必要なソースデータベースの列を追加しています。私は実際には文字列であるフィールドから.0を取り除いていますが、パンダもそれらを浮動小数点として認識し、空の整数に0を書き込んで、実際にはint型であることを確認します。

def query_database(table, table_name, query_date): 
    df_list = [] 
    log_list = [] 
    for db in ['NJ', 'NJ2', 'LA', 'NA']: 
     start_time = time.clock() 
     query_timestamp = dt.datetime.now(pytz.timezone('UTC')).strftime('%Y-%m-%d %H:%M:%S') 
     engine_name = '{}{}{}{}'.format(connection_type, server_name, '/', db) 
     print('Accessing {} from {}'.format((select_database(db)[0][table]), engine_name)) 
     engine = create_engine(engine_name) 
     df = pd.read_sql_query(query.format(select_database(db)[0][table]), engine, params={query_date}) 
     query_end = time.clock() - start_time 
     df['source_database'] = db 
     df['insert_date_utc'] = query_timestamp 
     df['row_count'] = df.shape[0] 
     df['column_count'] = df.shape[1] 
     df['query_time'] = round(query_end, 0) 
     df['maximum_id'] = df['Id'].max() 
     df['minimum_id'] = df['Id'].min() 
     df['source_table'] = table_dict.get(table) 
     log = df[['insert_date_utc', 'row_date', 'source_database', 'source_table', 'row_count', 'column_count', 'query_time', 'maximum_id', 'minimum_id']].copy() 
     df.drop(['row_count', 'column_count', 'query_time', 'maximum_id', 'minimum_id', 'source_table'], inplace=True, axis=1) 
     df_list.append(df) 
     log_list.append(log) 
    log = pd.concat(log_list) 
    log.drop_duplicates(subset=['row_date', 'source_database', 'source_table'], inplace=True, keep='last') 
    result = pd.concat(df_list) 
    result.drop_duplicates('Id', inplace=True) 
    cols = [i.strip() for i in (create_columns(select_database(db)[0][table]))] 
    result = result[cols] 
    print('Creating string columns for {}'.format(table_name)) 
    for col in modify_str_cols(select_database(db)[0][table]): 
     create_string(result, col) 
    print('Creating integer columns for {}'.format(table_name)) 
    for col in modify_int_cols(select_database(db)[0][table]): 
     create_int(result, col) 
    log.to_sql('raw_query_log', cms_dtypes.pg_engine, index=False, if_exists='append', dtype=cms_dtypes.log_dtypes) 
    print('Inserting {} data into PostgreSQL'.format(table_name)) 
    result.to_sql(create_table(select_database(db)[0][table]), cms_dtypes.pg_engine, index=False, if_exists='append', chunksize=50000, dtype=create_dtypes(select_database(db)[0][table])) 

どのように高速化するためにCOPY TOとCOPY FROMを挿入することができますか?私はちょうど.csvファイルを書き、それらの上にループするか、メモリから私のポストグルにコピーできますか?

答えて

3

psycopg2には、特定の数の特定の関連性があります(詳細はcopyを参照してください)。 csvを使用する場合は、copy_expert(完全にcopyステートメントを指定できる)を使用する必要があります。

通常、私がこれを行ったときに、私はcopy_expert()とディスク上のファイルを反復処理するファイルのようなオブジェクトを使用しました。それは合理的にうまくいくようです。

これはあなたのケースでは、私はcopy_tocopy_fromがポストグレースでpostgresに転送されているのでここでより良いマッチと思う。あなたは物事を行う方法を決定する前に、あなたは注意する必要がありますこれらの

(あなたはcsvファイルを使用したい場合は、あなたが copy_expertを使用する必要があります)CSV PostgreSQLのコピー出力/入力構文を使用していないメモメモ:

copy_toファイル形式のオブジェクト(StringIOなど)にコピーし、copy_from/copy_expertファイルをファイル形式のオブジェクトからコピーします。あなたはパンダのデータフレームを使用したい場合は、この少し考えると、どちらかのファイルのようなオブジェクトを作成したり、メモリ内のCSVファイルを生成し、それをロードするためにStringIOcopy_expertとともにcsvを使用する必要があるとしています。

+0

私が統合しているデータベースは、MS SQL Serverからのものです。違いがあるかどうかは分かりません。もし私がファイルを書くことを避けることができれば、それが最善の方法でしょう。 COPYコマンドでpandas DataFrameを受け入れることはできますか?私は、データに列を追加し、データ型/空白を埋めることを指定しています。または私はちょうど一時ファイルを書く必要がありますか? – trench

+0

はい、 "ファイルライクなオブジェクト"を提供する必要がありますが、それはメモリからデータを提供することができます。 –

+0

したがって、私はこの行まで同じものをすべて同じにしておくことができます: result.to_sql(create_table(select_database(db)[0] [table])、cms_dtypes.pg_engine、index = False、if_exists = 'append'、chunksize = 50000 、DTYPE = create_dtypes(select_database(DB)[0] [表])) データベースに保存される準備ができ、メモリ内の私の最終的なデータです。テーブルに書き込むためのサンプルコードを提供できますか?それはcopy_toですか?ヘッダーが含まれていないことを確認するにはどうすればよいですか? engine.copy_to( 'table_name'、result) – trench

関連する問題