2011-09-05 27 views
11

私は256x256x256ヌンピー配列を持っています、各要素は行列です。私はこれらの行列のそれぞれについていくつかの計算をする必要があります。そして、私はmultiprocessingモジュールを使って処理を高速化したいと考えています。元の配列の要素[i,j,k]におけるマトリックスの結果は、新しい配列の[i,j,k]要素に入れなければならないように、これらの計算のitertoolsとマルチプロセッシングを組み合わせる?

結果は、元のよう256x256x256配列に格納されなければなりません。

これを行うには、擬似的に書かれたリストを[array[i,j,k], (i, j, k)]として「マルチプロセッシング」する関数に渡したいと思います。 matricesは、元の配列とmyfuncから抽出された全ての行列のリストで計算を行う機能である、コードは多少のようになりますと仮定すると:map_asyncは実際にこれを作成しているよう

import multiprocessing 
import numpy as np 
from itertools import izip 

def myfunc(finput): 
    # Do some calculations... 
    ... 

    # ... and return the result and the index: 
    return (result, finput[1]) 

# Make indices: 
inds = np.rollaxis(np.indices((256, 256, 256)), 0, 4).reshape(-1, 3) 

# Make function input from the matrices and the indices: 
finput = izip(matrices, inds) 

pool = multiprocessing.Pool() 
async_results = np.asarray(pool.map_async(myfunc, finput).get(999999)) 

しかし、それはそう巨大なfinput - リスト1:私のCPUはあまり進んでいませんが、メモリとスワップは数秒で完全に消費されます。これは明らかに私が望むものではありません。

明示的に最初に作成する必要なしに、この巨大なリストをマルチプロセッシング機能に渡す方法はありますか? または、この問題を解決する別の方法をご存知ですか?

ありがとうございました! :-)

+1

'map_async()'で 'get()'を使っているので、おそらく*非同期*操作は不要で、代わりに 'Pool.map()'を使うべきです。 –

+0

おそらく私は問題を正しく理解していないかもしれませんが、あなたはimapまたはimap_unorderedを考慮しましたか? –

答えて

10

すべてmultiprocessing.Pool.map*メソッドは、関数が呼び出されるとすぐにイテレータを完全に使用します。(demo code)一度イテレータ1つのチャンクのマップ機能チャンクを養うために、grouper_nofillを使用します。

def grouper_nofill(n, iterable): 
    '''list(grouper_nofill(3, 'ABCDEFG')) --> [['A', 'B', 'C'], ['D', 'E', 'F'], ['G']] 
    ''' 
    it=iter(iterable) 
    def take(): 
     while 1: yield list(itertools.islice(it,n)) 
    return iter(take().next,[]) 

chunksize=256 
async_results=[] 
for finput in grouper_nofill(chunksize,itertools.izip(matrices, inds)): 
    async_results.extend(pool.map_async(myfunc, finput).get()) 
async_results=np.array(async_results) 

PS。 pool.map_asyncchunksizeパラメータは何か違うことをします:iterableをチャンクに分割し、各チャンクをmap(func,chunk)を呼び出すワーカープロセスに渡します。 func(item)があまりにも速く終了すると、ワーカーが処理するデータが増えることがありますが、map_asyncコールが発行された直後にイテレータが完全に消費されるため、状況には役立ちません。

+0

ありがとう!あなたのソリューションは実際に動作するようです!参考までに、私はpool.map_async(myfunc、finput).get(999999)を使う必要がありましたが、うまくいきます!しかし、それはまだ(正確なchunkksizeに依存して)たくさんのメモリを使います。そして、pythonは実行中にガベージコレクションではないようです。それが何であるか、どんなアイデアですか? – digitaldingo

+0

@digitaldingo:まあ、何も気にしない。コードを[SSCCE](http://sscce.org/)に書き換えてここに投稿することができれば理想的です。 – unutbu

0

Pool.map_async()は、作業を複数の作業者に発送するための反復可能時間の長さを知る必要があります。 izipには__len__が含まれていないため、イテラブルを最初にリストに変換しているため、大量のメモリが使用されています。

__len__で独自のizipスタイルイテレータを作成することで、これを回避することができます。

+0

なぜそれを知る必要がありますか?アイドル状態の労働者と待っている人たちに、なぜ単純に食べさせられないのですか? –

+0

@andrew - 'map_async()'( 'multiprocessing/pool.py')の最初の行は実際には' hasattr(iterable、 '__len__')ではありません:iterable = list(iterable) 'です。作業者の完了順序が不明なため、十分な出力リストを作成するには長さを知る必要があります。 –

+0

hmmm。それを動的に構築することはできますか?私はこれが問題として提起されるかもしれないと思っています。それは有効な要求のようです。 –

2

この問題も同様に発生しました。これに代えて:

res = p.map(func, combinations(arr, select_n)) 

res = p.imap(func, combinations(arr, select_n)) 

IMAPは、それを消費しないでください!