2012-02-09 11 views
5

私は、I/O重いブロッキング呼び出しに依存するいくつかのプロデューサ機能と、I/O重いブロッキング呼び出しにも依存する一部のコンシューマ機能を持っています。それらをスピードアップするために、私はGeventマイクロスレッドライブラリをグルーとして使用しました。私は、4人の消費者を持っているし、2台の生産を持っていると思いGeventでマルチプロデューサ、マルチ消費者パラダイムを実装するにはどうすればよいですか?

import gevent 
from gevent.queue import * 
import time 
import random 

q = JoinableQueue() 
workers = [] 
producers = [] 

def do_work(wid, value): 
    gevent.sleep(random.randint(0,2)) 
    print 'Task', value, 'done', wid 

def worker(wid): 
    while True: 
     item = q.get() 
     try: 
      print "Got item %s" % item 
      do_work(wid, item) 
     finally: 
      print "No more items" 
      q.task_done() 


def producer(): 
    while True: 
     item = random.randint(1, 11) 
     if item == 10: 
      print "Signal Received" 
      return 
     else: 
      print "Added item %s" % item 
      q.put(item) 



for i in range(4): 
    workers.append(gevent.spawn(worker, random.randint(1, 100000))) 

#This doesnt work. 
for j in range(2): 
    producers.append(gevent.spawn(producer)) 

#Uncommenting this makes this script work. 
#producer() 

q.join() 

:ここ

は私のパラダイムは次のようになります。プロデューサは、信号が10であるときに終了する。消費者はこのキューからの供給を継続し、プロデューサおよびコンシューマが終了するとタスク全体が終了する。

ただし、これは機能しません。 forというループが複数のプロデューサを生成し、1つのプロデューサしか使用しないとコメントアウトすると、スクリプトは正常に動作します。

私は間違ったことを理解できないようです。

アイデア?

おかげで

答えて

6

キューが未完成の仕事を持っていないときは、実際に終了する必要はありません。なぜなら、概念的には、アプリケーションが終了するときではないからです。

プロデューサが終了したら終了し、未完了の作業がない場合は、次にを終了します。

# Wait for all producers to finish producing 
gevent.joinall(producers) 
# *Now* we want to make sure there's no unfinished work 
q.join() 
# We don't care about workers. We weren't paying them anything, anyways 
gevent.killall(workers) 
# And, we're done. 
3

私は何がキューに入れ、すぐに終了する前に、それはq.join()をすると思います。キューに入る前にすべてのプロデューサに参加してみてください。

+0

こんにちはZCH、私は完全にあなたの答えを踏襲していません。小さなスニペットを貼っていただけますか?それは少し物事を明確にします。 –

+0

@MridangAgarwalla - 'q.join()'の前に 'プロデューサのプロデューサ:producer.join()'を書き込みます。この方法では、最初にすべてのプロデューサが作業を終了し、キューが空になるまで待機します。 – zch

+0

ああ、多分私はそれを間違って実装しました。私は自分のプロデューサーとコンシューマーが同時に稼働することを望んでいました。つまり、プロデューサーは、すべてのキューアイテムが終了してプロデューサーがキューに物事を追加しなくなるまでコンシューマーがフィードを終えるまで、キューに追加を続けます。 –

0

あなたがしたいことは、プロデューサとワーカーが通信している間にメインプログラムをブロックすることです。キューをブロックすると、キューが空になるまで待ってからすぐに戻ることができます。代わりにq.join()

gevent.joinall(producers) 
0

のプログラムの最後にこれを入れて、私はあなたのような同じ問題を満たしています。あなたのコードの主な問題は、あなたのプロデューサがgeventスレッドで生成され、ワー​​カーがタスクをすぐに取得できないということでした。

メインスレッドでは、producer()を実行し、geventスレッドでは生成されないようにすることをお勧めします。メイクセンス上記

import gevent 
from gevent.queue import * 
import time 
import random 

q = JoinableQueue() 
workers = [] 
producers = [] 

def do_work(wid, value): 
    gevent.sleep(random.randint(0,2)) 
    print 'Task', value, 'done', wid 

def worker(wid): 
    while True: 
     item = q.get() 
     try: 
      print "Got item %s" % item 
      do_work(wid, item) 
     finally: 
      print "No more items" 
      q.task_done() 


def producer(): 
    while True: 
     item = random.randint(1, 11) 
     if item == 10: 
      print "Signal Received" 
      return 
     else: 
      print "Added item %s" % item 
      q.put(item) 


producer() 

for i in range(4): 
    workers.append(gevent.spawn(worker, random.randint(1, 100000))) 

コード.. :)

関連する問題