2013-03-09 11 views
29

私はいくつかのPythonコードをいくつかのファイルに対して並列に実行しようとしています。構築物は、基本的には次のとおりです。ジョインでPythonマルチプロセッシングプールがハングアップしますか?

def process_file(filename, foo, bar, baz=biz): 
    # do stuff that may fail and cause exception 

if __name__ == '__main__': 
    # setup code setting parameters foo, bar, and biz 

    psize = multiprocessing.cpu_count()*2 
    pool = multiprocessing.Pool(processes=psize) 

    map(lambda x: pool.apply_async(process_file, (x, foo, bar), dict(baz=biz)), sys.argv[1:]) 
    pool.close() 
    pool.join() 

pool.mapは(ように見える)しませんので、私は以前に似た何かをするpool.mapを使用しました、それは素晴らしい仕事が、私はここにいることを使用するように見えることはできません余分な引数を渡すことができます(ラムダをマーシャリングできないため、ラムダを使用すると機能しません)。

これで、apply_async()を直接使用して動作させようとしています。私の問題は、コードがハングアップして終了しないようです。いくつかのファイルが例外で失敗しますが、どうして失敗/ハングを引き起こす原因になっていないのでしょうか?興味深いことに、ファイルに例外が発生しても失敗しない場合、正常に終了します。

私には何が欠けていますか?

編集:決して子供を享受していないと出て行く、永遠

Exception in thread Thread-3: 
Traceback (most recent call last): 
    File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner 
    self.run() 
    File "/usr/lib/python2.7/threading.py", line 505, in run 
    self.__target(*self.__args, **self.__kwargs) 
    File "/usr/lib/python2.7/multiprocessing/pool.py", line 376, in _handle_results 
    task = get() 
TypeError: ('__init__() takes at least 3 arguments (1 given)', <class 'subprocess.CalledProcessError'>,()) 

私はこれらの一つでも表示される場合は、プロセスの親プロセスがハングアップします。function(ひいては労働者)が失敗したとき、私はこの例外を参照してください。

+0

あなたのコードは、 'process_file'にランダム例外をスローしたとしてもうまくいくようです。だからおそらく、問題を引き起こしている 'process_file'であなたが実際にやっていることと関係しているでしょう。 – robertklep

+0

Huh。どのバージョンのpython?私は2.7です。実際のプログラムのprocess_fileは非常に複雑で、PIL、NetworkX、poly2triなどのライブラリを大量に使用しています。私はいくつかのケースで例外を引き起こす可能性があるバグを知っている少なくとも2か所は知っていますが、私は単にそれらのエラーを無視して移動する必要があります。私はなぜそれが私のために出くわすことはありませんが、あなたのために働くように困惑しています。 – clemej

+0

2.7.2、これは私がテストしたものです:https://gist.github.com/robertklep/5125319 – robertklep

答えて

40

申し訳ありませんが、他の誰かが私はここに投稿したい同様の問題がある場合は、少なくとも回避策を見つけました。私はそこでより良い答えを受け入れます。

私は問題の根本がhttp://bugs.python.org/issue9400と考えています。私はクレイジーじゃない

  • 、それは漬物「例外」に不可能ではないにしても非常に困難である私が本当に少なくともpython2で
  • を動作するようになっているやろうとしている:これは私に二つのことを伝えます親プロセスに戻ります。単純なものは動作しますが、他の多くのものは動作しません。

私の場合、私のワーカー関数はsegfaultingだったサブプロセスを起動していました。これはCallableProcessError例外を返しましたが、これはpickleableではありません。何らかの理由で、親のプールオブジェクトが昼食に出て、join()の呼び出しから戻ることはありません。

私の特別なケースでは、私は例外が何であっても気にしません。たいてい、私はそれを記録し続けようとします。これを行うために、私は単純にtry/except節にトップワーカー関数をラップします。ワーカーが例外をスローすると、親プロセスに戻ってログに記録される前にキャッチされ、ワー​​カープロセスは例外を送信しようとしなくなるため、正常に終了します。以下を参照してください:

次に、元のマップ関数呼び出しではなく、初期マップ関数呼び出しprocess_file_wrapped()を使用しています。今、私のコードは意図したとおりに動作します。

+7

あなた自身の質問に答えるために謝罪する必要はありません。このページでは、実際の問題と回避策を示しています。それは良い。 –

+1

ところで、もう1つの解決策は、例外のエラーメッセージだけを受け取り、基になる「Exception」クラスを使用してレイズすることです。 –

+1

私はまだStackExchangeには新しく、エチケットについてはわかりません。上記のコメントの@robertklepのスニペットが普通のException()で動作すると、私はそれもうまくいくと考えていますが、結論は例外を捕まえて、既知のものを返さなければならないということです。 – clemej

4

lambdaの代わりにfunctools.partialインスタンスを実際に使用することができます。オブジェクトをピクルする必要がある場合です。 partialオブジェクトはPython 2.7(およびPython 3)からpickleableです。

pool.map(functools.partial(process_file, x, foo, bar, baz=biz), sys.argv[1:]) 
+0

Hmm。以前はfunctoolsを使っていませんでした。情報をありがとう。私はまだこれはまだ同じ例外 - 予告問題に苦しむだろうと思う。可能であれば – clemej

+0

;私は言うことができません。あなたは 'pool.map'で前回の成功を収めていたと言いました。 – nneonneo

+0

私は完全に異なるコンテキストでpool.mapを使用しました。ここでは、例外が発生する可能性はほとんどありませんでした。私はその質問についてそれについてもっと明確にすべきだった。 – clemej

2

pool.mapがハングしたとき、私には同じようなバグ(同じではない)がありました。 私のユースケースでは、それを解決するためにpool.terminateを使用することができました(あなたがものを変更する前にあなたも同様にしてください)。

私はので、私はすべてがdocsから、終わっ知っterminateを呼び出す前にpool.mapを使用:

マップの並列等価()組み込み関数(それはしかし一つだけ反復可能な引数をサポートしています)。結果が準備完了するまでブロックされます。

これがあなたのユースケースであれば、これをパッチする方法かもしれません。

関連する問題