それぞれのプロセスは独立していますapply_async
を使用してください。したがって、Pythonのデフォルトの動作は、それらを独立して処理することです。つまり、失敗したものが他のものに影響しないことを意味します。
ここでの問題は、関数loop_over_desired_content
の結果を順番に処理することです。 get
メソッドは、(2番目のプロセスが返された/失敗した場合でも)最初の操作の結果が取得されるまでブロックします。次に、2番目の値を処理し、必要に応じてエラーを発生させます。
import multiprocessing as mp
import time
def fail_in(args):
x, l = args
if x == l:
raise RuntimeError(x)
time.sleep(.5)
print("Finish process {}".format(x))
return x
if __name__ == '__main__':
pool = mp.Pool(processes=3)
tasks = [(i, 0) for i in range(9)]
try:
desired_content = [pool.apply_async(fail_in, args=(a,)) for a in tasks]
t1 = time.time()
results = [p.get() for p in desired_content]
except RuntimeError:
print("apply_async 0 failed in {:4.2}s".format(time.time()-t1))
pool.terminate()
pool = mp.Pool(processes=3)
tasks = [(i, 1) for i in range(9)]
try:
desired_content = [pool.apply_async(fail_in, args=(a,)) for a in tasks]
t1 = time.time()
results = [p.get() for p in desired_content]
except RuntimeError:
print("apply_async 1 failed in {:4.2}s".format(time.time()-t1))
pool.terminate()
pool = mp.Pool(processes=3)
tasks = [(i, 4) for i in range(9)]
try:
desired_content = [pool.apply_async(fail_in, args=(a,)) for a in tasks]
t1 = time.time()
results = [p.get() for p in desired_content]
except RuntimeError:
print("apply_async 4 failed in {:4.2}s".format(time.time()-t1))
pool.terminate()
このエラーによって、残りのプロセスが強制終了されないことに注意してください。 terminate
を使わずにプールに新しい仕事を提出しようとすると、それを見ることができます。あなたの前の仕事からの残りのすべてのプロセスが完了した後に開始されます。
エラーの通知を高速にするには、エラーが返されるとすぐにエラーが発生するimap_unordered
メソッドを使用できます。あなたは注文を返すためにjob_idを使う必要があるので注意しなければなりません。
この場合、callback_error
を使用して通知を取得してクリーンアップを実行することもできます。第二behviorについては
、エラーを上げる前工程であることを、すべての結果を求めて、あなただけ使用することができます。
desired_content = [pool.apply_async(loop_over_desired_sub, args=(arg_list,))
for arg_list in all_details_to_process_full]
results = []
for p in desired_content:
try:
r = p.get()
except Exception as r:
pass
results += [r]
results = [p.get() for p in desired_content]
を