2013-05-31 11 views
9

私は、非同期の「ブロードキャストスタイル」メッセージングを管理するために、Pythonクラス(好ましくはサードパーティライブラリではなく標準言語の一部)を探しています。スレッドセーフな非同期メッセージキューが必要です

メッセージをキューに入れるスレッド(putMessageOnQueueメソッドはブロックする必要がありません)と、メッセージを待っている他の複数のスレッドがあり、おそらくいくつかのブロッキング 'waitForMessage'関数が呼び出されます。メッセージがキューに置かれると、待機中の各スレッドがメッセージの独自のコピーを取得します。

私は組み込みのQueueクラスを見てきましたが、メッセージを消費するとキューからメッセージを削除するように見えるため、これは適切ではないと思われます。

これは一般的な使用例である必要があるようですが、誰も解決策を提案できますか?

+0

I多くの問題を起こすことなく、どのスレッドがどのメッセージを受け取ったのかを追跡する独自のクラスを構築できると考えています。 – Bakuriu

答えて

7

これは、スレッドごとに別々のメッセージキューを使用し、以前にそのようなメッセージの受信に関心があるすべてのキューにメッセージをプッシュすることです。

このような何かが動作するはずが、それは未テストコードです...

from time import sleep 
from threading import Thread 
from Queue import Queue 

class DispatcherThread(Thread): 

    def __init__(self, *args, **kwargs): 
     super(DispatcherThread, self).__init__(*args, **kwargs) 
     self.interested_threads = [] 

    def run(self): 
     while 1: 
      if some_condition: 
       self.dispatch_message(some_message) 
      else: 
       sleep(0.1) 

    def register_interest(self, thread): 
     self.interested_threads.append(thread) 

    def dispatch_message(self, message): 
     for thread in self.interested_threads: 
      thread.put_message(message) 



class WorkerThread(Thread): 

    def __init__(self, *args, **kwargs): 
     super(WorkerThread, self).__init__(*args, **kwargs) 
     self.queue = Queue() 


    def run(self): 

     # Tell the dispatcher thread we want messages 
     dispatcher_thread.register_interest(self) 

     while 1: 
      # Wait for next message 
      message = self.queue.get() 

      # Process message 
      # ... 

    def put_message(self, message): 
     self.queue.put(message) 


dispatcher_thread = DispatcherThread() 
dispatcher_thread.start() 

worker_threads = [] 
for i in range(10): 
    worker_thread = WorkerThread() 
    worker_thread.start() 
    worker_threads.append(worker_thread) 

dispatcher_thread.join() 
+0

パーフェクト、それは素晴らしい作品!完成したバージョンがないのは残念ですが、誰かが明確に説明したら、その原則は複雑ではないと思います。 – codebox

+0

@codebox [マルチプロセッシング](http://docs.python.org/2/library/multiprocessing.html)モジュールでサポートされていますが、それはスレッドではなくサブプロセスです。なぜなら、プロセス間の通信は、スレッドが自然に同じヒープを共有しているため、通常はスレッド間の通信よりも複雑であるからです。 – Aya

2

私は、これは(Python Libにキューの例から取られた)よりまっすぐ進むの例だと思います

from threading import Thread 
from Queue import Queue 


num_worker_threads = 2 

def worker(): 
    while True: 
     item = q.get() 
     do_work(item) 
     q.task_done() 

q = Queue() 
for i in range(num_worker_threads): 
    t = Thread(target=worker) 
    t.daemon = True 
    t.start() 

for item in source(): 
    q.put(item) 

q.join()  # block until all tasks are done 
+0

これはどのように質問の要件を満たしていますか?彼は明示的にすべてのスレッドがアイテムのコピーを必要とするため、キューは機能しません。 – Wlerin

関連する問題