2016-12-12 4 views
0

タスクを実行する単一のワーカースレッドを作成したいという状況があります(基本的に、単一のスレッドでスレッドプールを構築する)。複数のスレッドがタスクをポストすることができ、スレッドにはスレッドを実行するループがあります。ロックの適切な使用が行われるようdequeを使用したC++タスクスレッドの競合条件

が長いと、あまりにも難しいことではありません、私は思った、ので、私の実装は以下の通りである。それを建て

typedef std::function<void()> MyTask; 

class MyTaskPool { 
public: 
    MyTaskPool() { 
     this->closed = false; 
     this->thread = std::thread(std::bind(&MyTaskPool::run, this)); 
    } 

    ~MyTaskPool() { 
     this->closed = true; 
     this->conditionVariable.notify_one(); 
     this->thread.join(); 
    } 

    void post(MyTask task) { 
     { 
      std::lock_guard<std::mutex>(this->mutex); 
      this->tasks.push(task); 
     } 
     this->conditionVariable.notify_one(); 
    } 
private: 
    bool closed; 
    std::mutex mutex; 
    std::condition_variable conditionVariable; 
    std::thread thread; 
    std::deque<MyTask> tasks; 

    void run() { 
     while (true) { 
      boost::optional<MyTask> task; 
      { 
       std::lock_guard<std::mutex>(this->mutex); 
       if (this->closed) 
        return; 

       if (this->tasks.size() > 0) { 
        task = this->tasks.front(); 
        this->tasks.pop_front(); 
       } 
      } 

      if (task.is_initialized()) { 
       task(); 
      } else { 
       std::unique_lock<std::mutex> lock(this->mutex); 
       this->conditionVariable.wait(lock); 
      } 
     } 
    } 
} 

、それをテストし、それが動作します。使い方も簡単です。新しいMyTaskPoolを作成し、単純なラムダ式でタスクにポストすることができます。すばらしいです!この事を長期間使用した後を除いて、突然これは壊れます。this-> tasks.front()は失敗します。イテレータを参照解除できないというエラーが表示されます。私のタスクdequeは空ですか?両端キューに追加および削除するコードは両方ともロックによって保護されているため、このようなことは起こりません。

誰もがエラーを見ることができますか?私はかなり確信していますが、ある種の競合状態ですか?

実際のコードは、各タスクで何らかの処理が行われるため、やや複雑ですが、この例とは関係ありません。

+0

多分スプリアスウェイクアップを除いて壊れることがあり、このコードでは何もありません(、とにかくキューのサイズを確認してください)。もっと複雑なコードを投稿する –

+0

'tasks.is_initialized()'を 'task.is_initialized()'にしてはいけませんか? – NathanOliver

+1

また、デストラクタでは 'this-> closed = true;'を保護しません。デストラクタの最初の行にあなたのミューテックスをロックする必要があります –

答えて

1

エラーは実際には軽微であり、目立ちにくく、タスクの取得を保護するために使用されるロックはローカル変数には格納されません。これにより、それが破壊されて解放されます。ダビデとサムのコメントがそれに統合と同様に何かを実装することを求める人々のために

が、ここでは、私のコードが動作するようになりまし方法は次のとおりです。

typedef std::function<void(MySharedResource)> MyTask; 

class MyTaskPool { 
public: 
    MyTaskPool() { 
     this->closed = false; 
     this->thread = std::thread(std::bind(&MyTaskPool::run, this)); 
    } 

    ~MyTaskPool() { 
     { 
      std::lock_guard<std::mutex> lock(this->mutex); 
      this->closed = true; 
     } 
     this->conditionVariable.notify_one(); 
     this->thread.join(); 
    } 

    void post(MyTask task) { 
     { 
      std::lock_guard<std::mutex> lock(this->mutex); 
      this->tasks.push(task); 
     } 
     this->conditionVariable.notify_one(); 
    } 
private: 
    bool closed; 
    std::mutex mutex; 
    std::condition_variable conditionVariable; 
    std::thread thread; 
    std::deque<MyTask> tasks; 

    void run() { 
     while (true) { 
      boost::optional<MyTask> task; 
      { 
       std::unique_lock<std::mutex> lock(this->mutex); 
       if (this->closed) 
        return; 

       if (this->tasks.size() > 0) { 
        task = this->tasks.front(); 
        this->tasks.pop_front(); 
       } else { 
        this->conditionVariable.wait(lock); 
       } 
      } 

      if (task.is_initialized()) { 
       task(); 
      } 
     } 
    } 
} 
+1

あなたはロックが作成され破棄された2つのインスタンスを見逃しました。私は – UKMonkey

+0

Damnと一致するように答えを更新しました。本当に私はそれを指摘してくれてありがとう。 –

関連する問題