1

multiprocessingモジュールに関するもう1つの質問Python 3. 5.私の問題は、処理されたフォークされたすべてが自分の仕事を完了したことを知っていることです(私はその結果をQueueで見ることができます)、AsyncResult.result()私がPoolObj.join()を実行すると、それは永遠にかかります。私はPoolObj.terminate()を行い、自分の人生を続けることができると知っていますが、なぜこれが起こるのか知りたいですか?プールへのマップコールが完了しても、プロセスに時間がかかるのはなぜですか?

私は、次のコードを使用しています:

def worker(d): 
    queue.put(d) 

def gen_data(): 
    for i in range(int(1e6)): 
     yield i 

if __name__ == "__main__": 
    queue = Queue(maxsize=-1) 
    pool = Pool(processes=12) 
    pool_obj_worker = pool.map_async(worker, gen_data(), chunksize=1) 
    pool.close() 

    print ('Lets run the workers...\n') 
    while True: 
     if pool_obj_worker.ready(): 
      if pool_obj_worker.successful(): 
       print ('\nAll processed successfully!') # I can see this quickly, so my jobs are done 
      else: 
       print ('\nAll processed. Errors encountered!') 
      sys.stdout.flush() 
      print (q.qsize()) # The size is right that means all workers have done their job 
      pool.join() # will get stuck here for long long time 
      queue.put('*')   
      break 
    print ('%d still to be processed' % 
      pool_obj_worker._number_left) 
    sys.stdout.flush() 
    time.sleep(0.5) 

私はそれが間違ってやっているの?私を教えてください。または、join()のプロセスがゾンビになっていますか?

+0

あなたがキューに入れしようとしているもののサイズを小さくする場合はどうなりますか? – roganjosh

+0

1e5と1e4の値が小さい場合でも問題があります。しかし、これらより小さい値の場合、問題はそれほど明白ではありません。 – Parashar

+0

それを '10'に減らしてみてください。少なくとも完了しているのですか、それともまだぶら下がっていますか?これは、[この](https://bugs.python.org/issue8426)に関連している可能性が、私は[こちら](http://stackoverflow.com/questions/38961584/multiprocesses-become-zombie-processes同様の問題がありました - 増加 - 反復 - 何が - ) - しかし、 'プロセス'を使用しています。あなたのケースでは、キューが正しいサイズであると言っているので、何か他のことが起こっているかもしれませんが、正しい方向に向けるかもしれません。また、 '.join() 'がなくてもどうなりますか? – roganjosh

答えて

2

ここでの問題は、で四捨五入されたもの以外に、Queueを追加して使用していることです。 のプロセスが自分の仕事を終えたとき、彼らはすべてのmultiprocessing.Queueで使用FeederThreadに参加し、これらの呼び出しは、(すべてのスレッドが同時にjoinを呼び出し、いくつかの奇妙な競合状態が存在することができるでしょうので、調査することは容易ではない)がハングアップします。キューフィーダスレッドに参加しながら、あなたのプロセスがハングアップすることを明らかにmultiprocessing.util.log_to_stderr(10)許可を追加

。あなたの問題を解決するために

、あなたは(与えるmultiprocessing.SimpleQueueの代わりmultiprocessing.Queueを使用(なし無フィーダースレッドが存在しないように参加中ハング)か、実装するように見える何と行動の同じ種類を提供する方法pool.unordered_imapを使用して試すことができますいずれかworkerによって返された結果を含む順序付けられていないジェネレータを返します)。

+0

ありがとうThomas。あなたが正しいです! Join Queuesには大きなオーバーヘッドがあります。 pool.imap_unorderedを使用すると、ジョブの進行状況をどのように監視できますか? – Parashar

+0

その他の関連する質問はこちら!なぜImapジェネレータの反復は時間の経過とともに遅くなるのですか?結果をリストに蓄積することの影響ですか?もしそうなら、どうすればこの問題を解決できますか?進行状況を監視する – Parashar

+0

、あなたは私がここで提供されているコードを持つ任意の景気減速が表示されていない 'IMapUnorderedIterator._index' –

関連する問題