2016-11-22 3 views
5

現在、大きなファイル(8000万行)を読み込もうとしていますが、ここでは各エントリに対して計算集約的な行列乗算を行う必要があります。これを計算した後、結果をデータベースに挿入します。このプロセスでは時間がかかるため、プロセスを高速化するためにファイルを複数のコアに分割したいと考えています。Python:複数のコアを使用したファイルの処理

私はこの有望な試みを見つけたので、ファイルをnに分割しました。これは動作しますが、私はこの使用してマルチプロセッシング並列化、問題に実行、

if __name__ == '__main__': 
    fp = open(filename) 
    number_of_chunks = 4 
    for chunk_number in range(number_of_chunks): 
     print chunk_number, 100 * '=' 
     for line in file_block(fp, number_of_chunks, chunk_number): 
      process(line) 

def file_block(fp, number_of_blocks, block): 
    ''' 
    A generator that splits a file into blocks and iterates 
    over the lines of one of the blocks. 

    ''' 

    assert 0 <= block and block < number_of_blocks 
    assert 0 < number_of_blocks 

    fp.seek(0,2) 
    file_size = fp.tell() 

    ini = file_size * block/number_of_blocks 
    end = file_size * (1 + block)/number_of_blocks 

    if ini <= 0: 
     fp.seek(0) 
    else: 
     fp.seek(ini-1) 
     fp.readline() 

    while fp.tell() < end: 
     yield fp.readline() 

反復的に、あなたはこのような関数を呼び出すことができ、エラーされた状態で

fp = open(filename) 
number_of_chunks = 4 
li = [file_block(fp, number_of_chunks, chunk_number) for chunk_number in range(number_of_chunks)] 

p = Pool(cpu_count() - 1) 
p.map(processChunk,li) 

を、その発電機は漬けられません。

私はこのエラーを理解していますが、すべての行をリストに入れるためにファイル全体を最初に反復するには高価です。

また、私は一度にデータベースに複数の行を挿入する方が効率的であるため(典型的マップアプローチを使用している場合1によって代わりに1の)、反復当たりコアあたりの行のブロックを使用する

おかげあなたの助けに。

+3

大きなファイルの初期パスを実行して、シーク座標とその位置から読み込む行数を記録することができます。この2つの数字を使ってマルチプロセッシングを呼び出して、各プロセスにジェネレータを押しつけることができます – kezzos

+0

まずファイルを4つのファイルに分割できますか? – cwallenpoole

+0

ファイルオープンと 'file_block'コードを、スレッドが開始する前に初期化するのではなく、各スレッドに移動します。それは、読み取り専用であれば、ファイルを1回でなく4回開いても問題ありません。 –

答えて

3

ジェネレータを前面に作成して各スレッドに渡す代わりに、スレッドコードにそのままにしておきます。

関連する問題