2017-12-19 2 views
0

これは私がまだ解決している私のearlier problemに関連しています。基本的には、ProcessPoolExecutorの逆設計が必要です。ここでは、多くのクエリ処理があり、1つのワーカーは結果を計算してバッチで返します。PythonのProcessPoolExecutorの逆

1つの共有キューで作業項目を送信するのは簡単ですが、正しいプロセスのすべての結果を正しいスレッドに送り返すための素晴らしいソリューションはありません。

答えて

1

質問するプロセスごとに別々のmultiprocessing.pipeがあるのが最も理にかなっていると思います。ワーカープロセスは、パイプ上の使用可能なアイテムを待機し、デキューして処理し、パイプの元のパイプを追跡します。データを送り返すときは、結果を正しいパイプに送ります。

は簡単な例です:

#!/usr/bin/env python3 

import multiprocessing as mp 

def worker(pipes): 
    quit = [False] * len(pipes) 
    results = [''] * len(pipes) 

    # Wait for all workers to send None before quitting 
    while not all(quit): 
     ready = mp.connection.wait(pipes) 
     for pipe in ready: 

      # Get index of query proc's pipe 
      i = pipes.index(pipe) 

      # Receive and "process" 
      obj = pipe.recv() 
      if obj is None: 
       quit[i] = True 
       continue 
      result = str(obj) 
      results[i] += result 

      # Send back to query proc 
      pipes[i].send(result) 
    print(results) 


def query(pipe): 
    for i in 'do some work': 
     pipe.send(i) 
     assert pipe.recv() == i 
    pipe.send(None) # Send sentinel 

if __name__ == '__main__': 
    nquery_procs = 8 
    work_pipes, query_pipes = zip(*(mp.Pipe() for _ in range(nquery_procs))) 

    query_procs = [mp.Process(target=query, args=(pipe,)) for pipe in query_pipes] 
    for p in query_procs: 
     p.start() 
    worker(work_pipes) 
    for p in query_procs: 
     p.join() 

また、あなたはそれぞれの照会プロセスに(ちょうどそのパイプの指標であるかもしれない)ID番号を与えることができる、および任意の要求が(id_num, data)あるタプルでなければなりません。これはちょうど各ループでpipes.index(pipe)をやっているワーカープロセスを回っているので、どれくらいあなたを買っているのか分かりません。

+0

ありがとうございます!実際には各プロセスにスレッドプールがあるので、スレッドごとにパイプを作成する必要があると思います。毎回のリクエスト(数百/秒)ごとに新しいPipe()を作成し、それをリクエストパイプで送信するだけでかなりのオーバーヘッドがありますか?または、それはマネージャ()でなければならないでしょう。どのような場合には、それはマルチプロセッシングと言っても意味がありません.Queueはプロセスセーフです... – Akababa

+1

@Akababa確かめるためにプロファイルする必要がありますが、各要求に対して新しいパイプを作成するのは高価です。特に、各スレッドが複数の要求を行う場合は、最初にすべてのスレッドを作成します。 – bnaecker

+0

クイックフォローアップ:共有キューのプールを使用して作業を開始し、それをポップして作業項目に入れました。 "pickling"には共有メモリへのポインタしか含まれていないので、これは効率的なはずですか? – Akababa