2011-10-16 6 views
6

ここに私が本質的に持っているものがあります。複数のスレッドを持つ安全なメッセージキュー

私は定期的にメッセージをチェックして処理しています。

スレッドB及びCは、BとC又はB又はC Aがメッセージを処理し、従ってキューにアクセスしている間にメッセージを送信しようとすると問題が生じるA.

にメッセージを送信する必要があります。

この問題は通常どのように解決されますか?

おかげ

+0

win32またはposixで作業していますか?私の小さな疑似コードの例を整理するのに役立ちます。 – Nate

答えて

4

は、これは、通常mutexes、または他のマルチスレッド保護メカニズムを使用して解決されます。

Windowsで作業している場合、MFCはこの問題のためにCMutex classを提供します。

posixシステムで作業している場合、posix apiはpthread_mutex_lock, pthread_mutex_unlock, and pthread_mutex_trylock functionsを提供します。

いくつかの基本的な擬似コードは、あなたのケースでの使用を証明するために便利になります:すべての3つのスレッドのために

pthread_mutex_t mutex; *or* CMutex mutex; 
Q queue; // <-- both mutex and queue are global state, whether they are 
      //  global variables, or passed in as parameters, they must 
      //  be the shared by all threads. 

int threadA(/* params */){ 
    while(threadAStillRunning){ 
     // perform some non-critical actions ... 
     pthread_mutex_lock(mutex) *or* mutex.Lock() 
     // perform critical actions ... 
     msg = queue.receiveMessage() 
     pthread_mutex_unlock(mutex) *or* mutex.Unlock() 
     // perform more non-critical actions 
    } 
} 

int threadBorC(/* params */){ 
    while(theadBorCStillRunning){ 
     // perform some non-critical actions ... 
     pthread_mutex_lock(mutex) *or* mutex.Lock() 
     // perform critical actions ... 
     queue.sendMessage(a_msg) 
     pthread_mutex_unlock(mutex) *or* mutex.Unlock() 
    } 
} 

、キューに基づいて行動する能力は、ミューテックスを獲得する能力にかかって - 彼らは単にブロックします。ミューテックスが取得されるまで待ちます。これにより、そのリソースの使用に起因する競合が防止されます。

+0

ネイトの答えは正しいですが、ミューテックスは使いやすいものですが、比較的高いオーバーヘッドを持つため、非常にハイスループットのシナリオには適していない可能性があります。 –

+0

ああ、一般的にスレッドもそうです^)真剣に考えても、スレッド保護に関しては、プラットフォームごとに異なる弱点があります。ミューテックスがカーネル(私はあなた、窓を見ている)に登録されているものがあります。ミューテックスは原子チェック(まったくコストがかかりません)の周りの "ラッパー"です。 – Nate

+0

@Nate:Windowsでは、おそらく非常に安いクリティカルセクションを探しているでしょう。 –

0

ウィンドウが開いていない場合や、C++でクロスプラットフォームのものを実装する場合は、ACEライブラリのキューを使用してみてください。 ACEライブラリサンプルからサンプルとして

ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue; 

、あなたはその後、メッセージをキューに入れるために を使用することができます。

ACE_NEW_RETURN (mb, 
       ACE_Message_Block (rb.size(), 
       ACE_Message_Block::MB_DATA, 
       0, 
       buffer), 
       0); 
    mb->msg_priority (ACE_Utils::truncate_cast<unsigned long> (rb.size())); 
    mb->wr_ptr (rb.size()); 

    ACE_DEBUG ((LM_DEBUG, 
      "enqueueing message of size %d\n", 
      mb->msg_priority())); 

// Enqueue in priority order. 
if (msg_queue->enqueue_prio (mb) == -1) 
ACE_ERROR ((LM_ERROR, "(%t) %p\n", "put_next")); 

キューから取得するために:

ACE_Message_Block *mb = 0; 

msg_queue->dequeue_head (mb) == -1; 
int length = ACE_Utils::truncate_cast<int> (mb->length()); 

if (length > 0) 
    ACE_OS::puts (mb->rd_ptr()); 

    // Free up the buffer memory and the Message_Block. 
    ACE_Allocator::instance()->free (mb->rd_ptr()); 
    mb->release(); 

利点はあなたですあまりにも多くのコードを書くことなく、非常に簡単に同期プリミティブを変更することができます。

関連する問題