2016-07-13 17 views
1

異なる引数を持つ関数を同時に実行するには、pool.apply_asyncを使用してPythonでマルチプロセッシングを使用しています。Pythonのマルチプロセッシングのエラーを修正する方法を変更します

コードの関連抽出物がある:

import multiprocessing as mp 

all_details_to_process_full = [[x,y.z], [x2,y2.z2]] 

def loop_over_desired_sub(arg_list): 
    ... 

if __name__ == '__main__': 
    pool = mp.Pool(processes=10) 
    desired_content = [pool.apply_async(loop_over_desired_sub, args=(arg_list,)) for arg_list in all_details_to_process_full]  
    results = [p.get() for p in desired_content] 

私の知る限り、Pythonはエラーのみが初期化された最古のサブプロセスによって提起されたコードを停止するため、デフォルトの動作です。

たとえば、別のサブプロセスで処理されるリストに10個のアイテムがあり、最初のアイテム(初期化された最初のサブプロセス)の処理にエラーがある場合、Pythonはエラーをただちに発生し、コード。ただし、2番目のサブプロセスにエラーがある場合、そのサブプロセスは停止しますが、残りのコードは最初の項目が終了するまで実行され、エラーが発生してコードが停止します。 [3番目のアイテムを処理する際にエラーが発生した場合は、1と2の両方のアイテムが終了してからエラーが発生する必要があります。

この動作を変更する方法はあり、両方のために:

  1. サブプロセスのいずれかで、すなわちすべてのエラー上げ、コードを停止する すぐ

  2. コードが停止しませんエラーが発生した場合は、 サブプロセスが完了するまで

答えて

2

それぞれのプロセスは独立していますapply_asyncを使用してください。したがって、Pythonのデフォルトの動作は、それらを独立して処理することです。つまり、失敗したものが他のものに影響しないことを意味します。

ここでの問題は、関数loop_over_desired_contentの結果を順番に処理することです。 getメソッドは、(2番目のプロセスが返された/失敗した場合でも)最初の操作の結果が取得されるまでブロックします。次に、2番目の値を処理し、必要に応じてエラーを発生させます。

import multiprocessing as mp 
import time 


def fail_in(args): 
    x, l = args 
    if x == l: 
     raise RuntimeError(x) 
    time.sleep(.5) 
    print("Finish process {}".format(x)) 
    return x 


if __name__ == '__main__': 
    pool = mp.Pool(processes=3) 
    tasks = [(i, 0) for i in range(9)] 

    try: 
     desired_content = [pool.apply_async(fail_in, args=(a,)) for a in tasks] 
     t1 = time.time() 
     results = [p.get() for p in desired_content] 
    except RuntimeError: 
     print("apply_async 0 failed in {:4.2}s".format(time.time()-t1)) 
    pool.terminate() 
    pool = mp.Pool(processes=3) 
    tasks = [(i, 1) for i in range(9)] 

    try: 
     desired_content = [pool.apply_async(fail_in, args=(a,)) for a in tasks] 
     t1 = time.time() 
     results = [p.get() for p in desired_content] 
    except RuntimeError: 
     print("apply_async 1 failed in {:4.2}s".format(time.time()-t1)) 
    pool.terminate() 
    pool = mp.Pool(processes=3) 
    tasks = [(i, 4) for i in range(9)] 

    try: 
     desired_content = [pool.apply_async(fail_in, args=(a,)) for a in tasks] 
     t1 = time.time() 
     results = [p.get() for p in desired_content] 
    except RuntimeError: 
     print("apply_async 4 failed in {:4.2}s".format(time.time()-t1)) 
    pool.terminate() 

このエラーによって、残りのプロセスが強制終了されないことに注意してください。 terminateを使わずにプールに新しい仕事を提出しようとすると、それを見ることができます。あなたの前の仕事からの残りのすべてのプロセスが完了した後に開始されます。

エラーの通知を高速にするには、エラーが返されるとすぐにエラーが発生するimap_unorderedメソッドを使用できます。あなたは注文を返すためにjob_idを使う必要があるので注意しなければなりません。
この場合、callback_errorを使用して通知を取得してクリーンアップを実行することもできます。第二behviorについては

、エラーを上げる前工程であることを、すべての結果を求めて、あなただけ使用することができます。

desired_content = [pool.apply_async(loop_over_desired_sub, args=(arg_list,)) 
        for arg_list in all_details_to_process_full] 
results = [] 
for p in desired_content: 
    try: 
     r = p.get() 
    except Exception as r: 
     pass 
    results += [r] 

results = [p.get() for p in desired_content] 
関連する問題