0

私はThreadPoolExecutorを使用しており、ワーカースレッドが失敗した場合に備えて計算全体を中止する必要があります。どのワーカースレッドでスローされた例外をすぐに再発生させるには?

ThreadPoolExecutorが自動的に例外を再発生しませんので、これは関係なく、エラーの成功を印刷します。1.例。 .result()例外を再発生させますので

from concurrent.futures import ThreadPoolExecutor 

def task(): 
    raise ValueError 

with ThreadPoolExecutor() as executor: 
    executor.submit(task) 
print('Success') 

例2これは正しく、メインスレッドがクラッシュします。しかし、最初のタスクが完了するのを待つので、メインスレッドは遅延を伴う例外を経験します。

import time 
from concurrent.futures import ThreadPoolExecutor 

def task(should_raise): 
    time.sleep(1) 
    if should_raise: 
     raise ValueError 

with ThreadPoolExecutor() as executor: 
    executor.submit(task, False).result() 
    executor.submit(task, True).result() 
print('Success') 

にはどうすれば失敗を処理し、残りの労働者を中止し、それが発生した直後にメインスレッド(多かれ少なかれ)における労働者の例外を気づくことができますか?

答えて

1

まず、我々は彼らの結果を要求する前にタスクを提出する必要があります。それ以外の場合は、スレッドがさえ並行して実行されていません。

futures = [] 
with ThreadPoolExecutor() as executor: 
    futures.append(executor.submit(good_task)) 
    futures.append(executor.submit(bad_task)) 
for future in futures: 
    future.result() 

今、私たちはメインスレッドとワーカースレッドの両方に利用可能な変数に例外情報を保存することができます。

exc_info = None 

メインスレッドは本当に、そのサブプロセスをkillすることはできませんので、我々は労働者が設定して停止する例外情報をチェックする必要があり:

def good_task(): 
    global exc_info 
    while not exc_info: 
     time.sleep(0.1) 

def bad_task(): 
    global exc_info 
    time.sleep(0.2) 
    try: 
     raise ValueError() 
    except Exception: 
     exc_info = sys.exc_info() 

すべてのスレッドが終了した後、舞をnスレッドは、例外情報を保持する変数をチェックできます。データが入力された場合は、例外を再発行します。

if exc_info: 
    raise exc_info[0].with_traceback(exc_info[1], exc_info[2]) 
print('Success') 
0

私が思うに、私はそのようにそれを実装します:

  1. 取り消しを通知するために、
  2. つを例外を報告するための一つ:

    Iメインプロセス、私は2つのキューを作成します。

::

import multiprocessing as mp 

error_queue = mp.Queue() 
cancel_queue = mp.Queue() 

私は、各ThreadPoolExecutorを作成し、パラメータとして、これらのキューを渡します。

class MyExecutor(concurrent.futures.ThreadPoolExecutor): 
    def __init__(self, error_queue, cancel_queue): 
     self.error_queue : error_queue 
     self.cancel_queue = cancel_queue 

ThreadPoolExecutorにはメインループがあります。このループでは、最初にcancel_queueをスキャンして、「キャンセル」メッセージが利用可能かどうかを確認します。

メインループでは、例外マネージャも実装しています。メインプロセスで

self.status = "running" 
with True: # <- or something else 
    if not self.cancel_queue.empty(): 
     self.status = "cancelled" 
     break 
    try: 
     # normal processing 
     ... 
    except Exception as exc: 
     # you can log the exception here for debug 
     self.error_queue.put(exc) 
     self.status = "error" 
     break 
    time.sleep(.1) 

を実行し、すべてのMyExecutorインスタンスerreurが発生した場合、私は例外を発生させます。

がerror_queueをスキャン:すべての

while True: 
    if not error_queue.empty(): 
     cancel_queue.put("cancel") 
    time.sleep(.1) 
関連する問題