2011-11-26 6 views
10

どのようにマルチプロセッシングが正しく行われたかを知りたい。私がQueue(左の緑色の丸)に書かれている関数f1によって生成されたリスト[1,2,3,4,5]を持っていると仮定します。今度は、そのキューから2つのプロセスを引き出します(プロセス内でf2を実行します)。彼らはデータを処理します。例えば、値を倍にして2番目のキューに書き込みます。今、関数f3はこのデータを読み取り、それを印刷します。機能インサイドパイプラインでのマルチプロセッシングが正しく行われた

layout of the data flow

は永遠にキューから読み取るしようとし、ループのようなものがあります。どのようにこのプロセスを止めるのですか?

アイデアは、1

f1リストだけを送信するのではなく、またNoneオブジェクトまたはcustonオブジェクト、class PipelineTerminator: passまたはちょうどダウンすべての方法を伝播されているいくつかのように。 f3は、Noneが来るのを待っています。そこにいるとき、ループから壊れます。問題:f2の2つのうちの1つがNoneを読み取って伝播し、もう1つがまだ番号を処理している可能性があります。最後の値は失われます。 2

f3

アイデアf1です。したがって、関数f1はデータとパイプを生成し、プロセスをf2で生成し、すべてのデータをフィードします。産卵と給餌の後、それは受信したオブジェクトを数えて処理するだけで、2番目のパイプで待機します。どのくらいのデータが供給されているか知っているので、f2を実行しているプロセスを終了できます。しかし、目標が処理パイプラインを設定することであれば、異なるステップは分離可能でなければならない。したがって、f1,f2およびf3は、パイプラインの異なる要素であり、高価な手順は並行して行われます。

アイデア3

pipeline idea 3

パイプラインの各部分が関数であるそれが好きで、それらを管理する責任があるとして、この関数は、プロセスを生成。それは、どのくらいのデータが入ってきたか、どのくらいのデータが返されたかを知っています(yieldかもしれません)。したがって、Noneオブジェクトを伝播することは安全です。

setup child processes 

execute thread one and two and wait until both finished 

thread 1: 
    while True: 
     pull from input queue 
     if None: break and set finished_flag 
     else: push to queue1 and increment counter1 

thread 2: 
    while True: 
     pull from queue2 
     increment counter2 
     yield result 
     if counter1 == counter2 and finished_flag: break 

when both threads finished: kill process pool and return. 

(代わりにスレッドを使用しての、多分1は賢く解決策を考えることができます。)

そう...

私は供給し、待っている、アイデア2次ソリューションを実装しています結果は到着するが、実際には独立した機能を持つパイプラインではなかった。それは私が管理しなければならなかった仕事のために働いたが、維持することは難しかった。

パイプラインをどのように実装するのか(複数のプロセスで1つのプロセスで簡単にジェネレータの機能を使用するなど)を聞いて、通常はそれらを管理しています。

答えて

1

アイデア1を使用すると何が問題になるのですか?各ワーカープロセス(f2)は、完了時にそのアイデンティティを持つカスタムオブジェクトを配置しますか?その後、f3は、ワーカープロセスがなくなるまで、そのワーカーを終了させます。 Pythonの3.2の新しいまた

は、あなたが「正しい方法」(TM)にしようとしている何をすべき標準ライブラリのconcurrent.futuresパッケージです - http://docs.python.org/dev/library/concurrent.futures.html

たぶん、それが可能ですPython 2.xシリーズと並行してバックポートを探します。アイデア1について

+0

しかし、 'f2' *の労働者は、それが最後のことをどのように知っていますか? 'f1'はそこにいる作業者の数を知り、その数のカスタムオブジェクトを送る必要があります。このようにして、すべての作業者がこの通知を受け取ることが保証されます。それは明らかに可能ですが、私は "関数を単にプラグインする"ことはできません、私は各ステップに何人の労働者がいるか知る必要があります。だからこそ私はアイデア3が好きです。そして、私には初めてのことです。私はそれを掘り下げていきます。 –

+0

それは私が "受け入れ"をチェックした理由です:) –

+0

"作業停止"カスタムオブジェクトは "F1"によって送信されるので、 "f2"ワーカープロセスの総数を含めることができます。これらが "作業停止"オブジェクトを "f3"に渡すだけであれば、作業者の総数を知ることができます。より多くの情報がこの方法で送信される可能性があります。重要なことは、少なくとも「f3」(ただしおそらくは「f1」)でも「コントロールレイヤー」を持つことです。実際に処理されるキュー上のオブジェクト – jsbueno

1

、方法に関する:

import multiprocessing as mp 

sentinel=None 

def f2(inq,outq): 
    while True: 
     val=inq.get() 
     if val is sentinel: 
      break 
     outq.put(val*2) 

def f3(outq): 
    while True: 
     val=outq.get() 
     if val is sentinel: 
      break 
     print(val) 

def f1(): 
    num_workers=2 
    inq=mp.Queue() 
    outq=mp.Queue() 
    for i in range(5): 
     inq.put(i) 
    for i in range(num_workers):   
     inq.put(sentinel) 
    workers=[mp.Process(target=f2,args=(inq,outq)) for i in range(2)] 
    printer=mp.Process(target=f3,args=(outq,)) 
    for w in workers: 
     w.start() 
    printer.start() 
    for w in workers: 
     w.join() 
    outq.put(sentinel) 
    printer.join() 

if __name__=='__main__': 
    f1() 

アイデア1の説明の唯一の違いは、それがセンチネルを受信while-loopのうちf2ブレークが(したがって自体を終了する)ことです。 (w.join()を使用して)作業者が完了し、f3を送信するまで(while-loopから脱出することを通知する)、ブロック番号f1がブロックされます。

+0

ありがとう、それは私が実装したアプローチに似ていますが、あなたのバージョンは非常に読みやすいです。私が気に入らないのは、この場合のようにパイプラインのすべてのコンポーネントがパイプラインについて何かを知る必要があるという事実です。「プリンタ」は前のステップの作業者数などを知る必要があります。だからこそ私はこれをカプセル化し、パイプラインのすべての*ステップを正確に1つの入力と1つの出力にし、各ステップで分岐とマージを行うことを考えました。 –

+0

それは良い点です。 'f3'を' num_workers'とは無関係にすることができますが、 'f1'は' workers'が完了した後で、その監視子を送ります。私は、私が意味することを示すために投稿を編集しました。 – unutbu

7

MPipeモジュールでは、単にこれを行う:

from mpipe import OrderedStage, Pipeline 

def f1(value): 
    return value * 2 

def f2(value): 
    print(value) 

s1 = OrderedStage(f1, size=2) 
s2 = OrderedStage(f2) 
p = Pipeline(s1.link(s2)) 

for task in 1, 2, 3, 4, 5, None: 
    p.put(task) 

上記ラン4工程:第一段階のため

  • (関数F1
  • を1つは第2段階(functi f2
  • およびパイプラインを供給するメインプログラムの詳細。

MPipe cookbookは、プロセスが最後のタスクとしてNoneを使用して、内部的にシャットダウンされている方法のいくつかの説明を提供しています。

MPipeインストールし、コードを実行するには:

virtualenv venv 
venv/bin/pip install mpipe 
venv/bin/python prog.py 

出力:

2 
4 
6 
8 
10 
+0

少なくとも入門的な例のように見えます!ちなみにいいロゴです –

0

まさにそれがセマフォを使用している最も簡単な方法を。

F1

F1を処理したいデータを使用して 'キュー' を投入しています。このプッシュの終わりを終えて、n個の 'Stop'キーワードをキューに入れます。あなたの例ではn = 2ですが、通常は関係する労働者の数です。 コードは次のようになります。

for n in no_of_processes: 
    tasks.put('Stop') 

F2

F2はget -commandが提供するキューから引っ張っています。要素はキューから取得され、キュー内で削除されます。停止信号に注意を払いながら今、あなたはループにポップを置くことができます。

for elem in iter(tasks.get, 'STOP'): 
    do something 

F3

この1つは少しトリッキーです。 F3へのシグナルとして機能するF2でセマフォーを生成することができます。しかし、この信号がいつ届くのかわからないので、データが失われる可能性があります。しかし、F3はF2と同じ方法でデータを取り込み、それをtry... exceptの記述に入れることができます。 queue.getは、キューに要素がない場合はqueue.Emptyを送出します。 tasks

while control: 
    try: 
     results.get() 
    except queue.Empty: 
     control = False 

resultsているキュー:だから、F3でのプルは次のようになります。したがって、Pythonにはまだ含まれていないものは必要ありません。

0

私はconcurent.futuresと3つのプールを使用しており、それらはfuture.add_done_callbackで接続されています。それから、各プールでshutdownと呼ぶことで、プロセス全体が終了するのを待ちます。

from concurrent.futures import ProcessPoolExecutor 
import time 
import random 


def worker1(arg): 
    time.sleep(random.random()) 
    return arg 


def pipe12(future): 
    pool2.submit(worker2, future.result()).add_done_callback(pipe23) 


def worker2(arg): 
    time.sleep(random.random()) 
    return arg 


def pipe23(future): 
    pool3.submit(worker3, future.result()).add_done_callback(spout) 


def worker3(arg): 
    time.sleep(random.random()) 
    return arg 


def spout(future): 
    print(future.result()) 


if __name__ == "__main__": 
    __spec__ = None # Fix multiprocessing in Spyder's IPython 
    pool1 = ProcessPoolExecutor(2) 
    pool2 = ProcessPoolExecutor(2) 
    pool3 = ProcessPoolExecutor(2) 
    for i in range(10): 
     pool1.submit(worker1, i).add_done_callback(pipe12) 
    pool1.shutdown() 
    pool2.shutdown() 
    pool3.shutdown() 
関連する問題