2016-11-10 5 views
0

複数のプロセスを非同期で実行して応答を送信できる関数を作成しようとしていました。 multiprocessing.Process()が応答を返さないので、私のように関数を作成するものと考える:この関数内で複数の非同期関数を実行し、各関数の戻り値を取得します。

from multiprocessing import Process 

def async_call(func_list): 
    """ 
    Runs the list of function asynchronously. 

    :param func_list: Expects list of lists to be of format 
     [[func1, args1, kwargs1], [func2, args2, kwargs2], ...] 
    :return: List of output of the functions 
     [output1, output2, ...] 
    """ 
    response_list = [] 
    def worker(function, f_args, f_kwargs, response_list): 
     """ 
     Runs the function and appends the output to list 
     """ 
     response = function(*f_args, **f_kwargs) 
     response_list.append(response) 

    processes = [Process(target=worker, args=(func, args, kwargs, response_list)) \ 
        for func, args, kwargs in func_list] 

    for process in processes: 
     process.start() 
    for process in processes: 
     process.join() 
    return response_list 

、私はlistとして追加のパラメータを受け入れる非同期workerを呼び出します。リストは参照として渡されるので、リスト内に実際の関数の応答を追加できると思った。そしてasync_callは私にすべての機能の応答を返します。

しかし、これは期待どおりの動作ではありません。値はworker()内のlistに付加されますが、ワーカーresponse_listのリストの外は空のままです。

私が間違っていることは何ですか?そして、私がやっていることを達成するための選択肢はありますか?

答えて

1

プロセス間で直接オブジェクトを共有することはできません。値を伝達するために特別に設計されたクラスの1つ、QueueとPipeを使用する必要があります。 the documentationを参照してください。

+0

それを手に入れました。私は 'queue.put()'を実行しているアイテムに対して 'queue.get()'を実行するだけです。それ以上のアクセス項目については、フリーズします。だから私は 'func_list'の' len'のためにそれを繰り返しています。これを行うより良い方法はありますか? –

+0

値を関数にマップする方法はありますか?ハックとして、私は関数のインデックスとして1つのキーでdictオブジェクトを返し、キーにソートを返しながら考えることができます。これを達成するための他のもっとpythonicな方法はありますか? –

0

Daniel's Answerで説明したように、プロセス間でオブジェクトを直接共有することはできません。ここで私はmultiprocessing.Queue()を使用して機能を更新しています:サンプル実行

def async_call(func_list): 
    """ 
    Runs the list of function asynchronously. 

    :param func_list: Expects list of lists to be of format 
     [[func1, args1, kwargs1], [func2, args2, kwargs2], ...] 
    :return: List of output of the functions 
     [output1, output2, ...] 
    """ 
    def worker(function, f_args, f_kwargs, queue, index): 
     """ 
     Runs the function and appends the output to list, and the Exception in the case of error 
     """ 
     response = { 
      'index': index, # For tracking the index of each function in actual list. 
          # Since, this function is called asynchronously, order in 
          # queue may differ 
      'data': None, 
      'error': None 
     } 

     # Handle error in the function call 
     try: 
      response['data'] = function(*f_args, **f_kwargs) 
     except Exception as e: 
      response['error'] = e # send back the exception along with the queue 

     queue.put(response) 
    queue = Queue() 
    processes = [Process(target=worker, args=(func, args, kwargs, queue, i)) \ 
        for i, (func, args, kwargs) in enumerate(func_list)] 

    for process in processes: 
     process.start() 

    response_list = [] 
    for process in processes: 
     # Wait for process to finish 
     process.join() 

     # Get back the response from the queue 
     response = queue.get() 
     if response['error']: 
      raise response['error'] # Raise exception if the function call failed 
     response_list.append(response) 

    return [content['data'] for content in sorted(response_list, key=lambda x: x['index'])] 

def my_sum(x, y): 
    return x + y 

def your_mul(x, y): 
    return x*y 

my_func_list = [[my_sum, [1], {'y': 2}], [your_mul, [], {'x':1, 'y':2}]] 

async_call(my_func_list) 
# Value returned: [3, 2] 
関連する問題