2012-01-26 12 views
3

私は私のスレッドで、次のコードで、NUM_THREADSスレッドを持っている:ブースト同期

/* 
Calculate some_value; 
*/ 

//Critical section to accummulate all thresholds 
{ 
    boost::mutex::scoped_lock lock(write_mutex); 
    T += some_value; 
    num_threads++; 
    if (num_threads == NUM_THREADS){ 
     T = T/NUM_THREADS; 
     READY = true; 
     cond.notify_all(); 
     num_threads = 0; 
    } 
} 

//Wait for average threshold to be ready 
if (!READY) 
{ 
    boost::unique_lock<boost::mutex> lock(wait_mutex); 
    while (!READY){ 
     cond.wait(lock); 
    } 
} 
//End critical section 

/* 
do_something; 
*/ 

基本的に、私は、すべてのスレッドが継続する前に、READY信号を待ちたいです。 num_threadは0に設定され、READYはスレッドが作成される前にfalseになります。一度、デッドロックが発生します。誰でも助けてくれますか?次のように すべてのブースト変数がグローバルに宣言されています

boost::mutex write_mutex; 
boost::mutex wait_mutex; 
boost::condition cond; 
+0

バックは 'false'に' READY'を設定しますか?それとも、デッドロックが発生していて、うまく動作している実行中であることを意味しますか? –

+0

READYは、すべてのスレッドが終了するとfalseに設定されます。はい、ほとんどの場合、デッドロックを除いて正常に動作します。ありがとう! – Mickey

答えて

1

コードは(私はちょうどbool変数であると仮定)READYフラグの競合状態を持っています。 (スレッド実行インターリーブの、すなわち一つの可能​​な変異体)を起こり得るものである。READYフラグをテストコードがコード設定と同期していない

Thread T1:         Thread T2: 
if (!READY)         
{ 
    unique_lock<mutex> lock(wait_mutex); mutex::scoped_lock lock(write_mutex); 
    while (!READY)       /* ... */ 
    {          READY = true; 
     /* !!! */       cond.notify_all(); 
     cond.wait(lock); 
    } 
} 

(ロックは、これらのクリティカルセクションのために異なっている注意してください)。そして、T1がフラグテストとcondとの間の「穴」にあるとき、T2はフラグをセットし、T1が見逃すかもしれないcondに信号を送るかもしれません。

最も簡単な解決策は、READYの更新と状態通知のための右のミューテックスをロックすることです:

/*...*/ 
T = T/NUM_THREADS; 
{ 
    boost::mutex::scoped_lock lock(wait_mutex); 
    READY = true; 
    cond.notify_all(); 
} 
+0

うわー、とても面白いです。私はそれが今働くと思う。どうもありがとう – Mickey

1

Boost.Threadのbarriersは何が必要かもしれないように見えます。

ここに、いくつかのワーカースレッドによって提供される値の平均を取る実例があります。各ワーカースレッドは、同じ共有バリアを使用して(accumulatorインスタンス経由で)互いに同期します。

#include <cstdlib> 
#include <iostream> 
#include <vector> 
#include <boost/bind.hpp> 
#include <boost/shared_ptr.hpp> 
#include <boost/thread.hpp> 

boost::mutex coutMutex; 
typedef boost::lock_guard<boost::mutex> LockType; 

class Accumulator 
{ 
public: 
    Accumulator(int count) : barrier_(count), sum_(0), count_(count) {} 

    void accumulateAndWait(float value) 
    { 
     { 
      // Increment value 
      LockType lock(mutex_); 
      sum_ += value; 
     } 
     barrier_.wait(); // Wait for other the threads to wait on barrier. 
    } 

    void wait() {barrier_.wait();} // Wait on barrier without changing sum. 

    float sum() {LockType lock(mutex_); return sum_;} // Return current sum 

    float average() {LockType lock(mutex_); return sum_/count_;} 

    // Reset the sum. The barrier is automatically reset when triggered. 
    void reset() {LockType lock(mutex_); sum_ = 0;} 

private: 
    typedef boost::lock_guard<boost::mutex> LockType; 
    boost::barrier barrier_; 
    boost::mutex mutex_; 
    float sum_; 
    int count_; 
}; 

/* Posts a value for the accumulator to add and waits for other threads 
    to do the same. */ 
void workerFunction(Accumulator& accumulator) 
{ 
    // Sleep for a random amount of time before posting value 
    int randomMilliseconds = std::rand() % 3000; 
    boost::posix_time::time_duration randomDelay = 
      boost::posix_time::milliseconds(randomMilliseconds); 
    boost::this_thread::sleep(randomDelay); 

    // Post some random value 
    float value = std::rand() % 100; 

    { 
     LockType lock(coutMutex); 
     std::cout << "Thread " << boost::this_thread::get_id() << " posting " 
        << value << " after " << randomMilliseconds << "ms\n"; 
    } 
    accumulator.accumulateAndWait(value); 

    float avg = accumulator.average(); 

    // Print a message to indicate this thread is past the barrier. 
    { 
     LockType lock(coutMutex); 
     std::cout << "Thread " << boost::this_thread::get_id() << " unblocked. " 
        << "Average = " << avg << "\n" << std::flush; 
    } 
} 

int main() 
{ 
    int workerThreadCount = 5; 
    Accumulator accumulator(workerThreadCount); 

    // Create and launch worker threads 
    boost::thread_group threadGroup; 
    for (int i=0; i<workerThreadCount; ++i) 
    { 
     threadGroup.create_thread(
       boost::bind(&workerFunction, boost::ref(accumulator))); 
    } 

    // Wait for all worker threads to finish 
    threadGroup.join_all(); 
    { 
     LockType lock(coutMutex); 
     std::cout << "All worker threads finished\n" << std::flush; 
    } 

    /* Pause a bit before exiting, to give worker threads a chance to 
     print their messages. */ 
    boost::this_thread::sleep(boost::posix_time::seconds(1)); 
} 

私は、次のような出力が得られます。

Thread 0x100100f80 posting 72 after 1073ms 
Thread 0x100100d30 posting 44 after 1249ms 
Thread 0x1001011d0 posting 78 after 1658ms 
Thread 0x100100ae0 posting 23 after 1807ms 
Thread 0x100101420 posting 9 after 1930ms 
Thread 0x100101420 unblocked. Average = 45.2 
Thread 0x100100f80 unblocked. Average = 45.2 
Thread 0x100100d30 unblocked. Average = 45.2 
Thread 0x1001011d0 unblocked. Average = 45.2 
Thread 0x100100ae0 unblocked. Average = 45.2 
All worker threads finished 
+0

ありがとう!ここでの問題は、平均が計算された後、スレッドが何か他のことをすることになるということです。どうやってやるの? – Mickey

+0

スレッドは、同じタイプの値を平均的に何度も生成し続けるか、まったく違うことを何度も繰り返しますか?とにかく、バリアを同期させた後、それぞれのスレッド機能で他のものをやることを止めるものは何もありません。 –

+0

唯一の問題は、スレッドの残りのコードが計算された平均を実行する必要があることです。 – Mickey

関連する問題