2017-11-08 8 views
1

異なるパラメータで同じ関数を何度も呼び出すスレッドのベクトルを再利用したいと思います。 (原子パラメータを除いて)記述がないので、ミューテックスの必要はありません。このアイデアを表現するために、ベクトルの最大値を見つける並列化されたコードの基本的な例を作成しました。明らかにベクトルの最大値を見つけるより良い方法がありますが、説明のために、私が書いている実際のコードの詳細を知ることを避けるために、私はこのばかげた例を考えています。C++同じ関数を呼び出すスレッドのベクトルを再利用する

コードベクトルは番号Kを(K上限で初期化されている)を含むかどうかをチェックする機能pFindを呼び出すことにより、ベクトルの最大数を見出します。実行されている場合は実行が停止し、そうでない場合はkが1つ減らされ、プロセスが繰り返されます。

コードでは、ベクトル内にkの検索を並列化するスレッドのベクトルを生成します。問題は、すべての値がkの場合、スレッドのベクトルが再生成され、新しいスレッドが結合されるたびに発生するということです。 スレッドのベクトルを生成し、それらを結合するたびに、避けたいオーバーヘッドが発生します。

スレッドのベクトル(プール)を1回生成して新しい実行に再利用する方法があるのだろうかと思います。他のどのスピードアップのヒントも高く評価されます。

void pFind(
    vector<int>& a, 
    int n, 
    std::atomic<bool>& flag, 
    int k, 
    int numTh, 
    int val 
    ) { 
    int i = k; 

    while (i < n) { 
     if (a[i] == val) { 
      flag = true; 
      break; 
     } else 
      i += numTh; 
    } 
} 

int main() { 
    std::atomic<bool> flag; 
    flag = false; 
    int numTh = 8; 
    int val = 1000; 
    int pos = 0; 

    while (!flag) { 
     vector<thread>threads; 
     for (int i = 0; i < numTh; i++){ 
      thread th(&pFind, std::ref(a), size, std::ref(flag), i, numTh, val); 
      threads.push_back(std::move(th)); 
     } 
     for (thread& th : threads) 
      th.join(); 

     if (flag) 
      break; 

     val--; 

    } 
    cout << val << "\n"; 
    return 0; 
} 
+1

の残りの部分で説明;メインスレッドは引数の共有キューを作成し、そこから作業者は必要に応じて新しい作業を取ります。キューにアクセスするにはmutexが必要ですが、作業項目を挿入して抽出するのが安い限り( '.front()'の 'std :: move'を使用してコピー構築を避け、' .pop() 'mutexの制御の外側にある他のすべての作業でアイテムを削除するか、' std :: move'が依然として高価である、 'unique_ptr'などを格納して取り出す)場合、ロックの競合は最小限に抑える必要があります。 – ShadowRanger

+1

基本的には、スレッドを「再利用」しないでください。スレッドは、終了する代わりに作業を続ける方法を知っておいてください。 – ShadowRanger

+0

簡単な例を提供することができますか(または私に1つを指摘してください)。事前に感謝します – totalUnimodular

答えて

0

あなたのコードでは、競合状態を持っている:boolアトミック型ではありませんので、ない並行に書き込むために、複数のスレッドのために安全です。 std::atomic_boolまたはstd::atomic_flagを使用する必要があります。

質問に答えるには、ループの反復ごとにthreadsベクトルを再作成しています。これはループ本体の外側で宣言を移動することで回避できます。スレッド自体を再利用することは、はるかに複雑なトピックです。

vector<thread> threads; 
threads.reserve(numTh); 

while (!flag) { 
    for (size_t i = 0; i < numTh; ++i) 
     threads.emplace_back(pFind, a, size, flag, i, numTh, val); 
    for (auto &th : threads) 
     th.join(); 
    threads.clear(); 
} 
+0

そうですね、この例を書いたとき、私はフラグをアトミックにするのを忘れていました(実際にはこのアトミックパラメータを参照しています)。 – totalUnimodular

1

構築後にstd::threadに別の実行機能(クロージャ)を割り当てる方法はありません。これは、一般に、すべてのスレッド抽象化に当てはまりますが、実装では低レベルの抽象化を内部的にメモまたはキャッシュしてスレッドをフォークして高速に結合しようとするので、新しいスレッドを構築するだけで実行可能です。システムプログラミングサークルでは、新しいスレッドの作成が非常に軽いかどうか、またはスレッドを頻繁にフォークしないようにクライアントを記述する必要があるかどうかについての議論があります。 (これは非常に長い間進行してきたもので、多くのトレードオフがあることは明らかです。)

本当に欲しいものをしようとする抽象化はたくさんあります。それらは、「スレッドプール」、「タスクエグゼキュータ」(または単に「エグゼキュータ」)、および「先物」などの名前を有する。これらのすべては、しばしばシステム内のハードウェアコアの数に関係する一連のスレッドを作成し、それらのスレッドのそれぞれをループして要求を探すことによって、スレッドにマッピングする傾向があります。

コメントが示すように、これを自分で行う主な方法は、スレッドに実行要求を受け付け、処理して結果を投稿するトップレベルループを持たせることです。これを行うには、mutexや条件変数などの他の同期メソッドを使用する必要があります。多くの要求があり、要求が信じられないほど大きくない場合は、一般的にこのように処理する方が高速です。

標準のC++並行処理サポートは良いことですが、実際の高性能作業にはかなり欠けています。 Intel's TBBのようなものは、はるかに工業的な強みのソリューションです。

0
一緒に別のオンライン検索、次のような作品からいくつかのコードを縫い合わせによって

が、whileループの各繰り返しでスレッドを再生成するアプローチのようにほど速くないです。

おそらく誰かがこのアプローチにコメントできます。

以下のクラスは、スレッドプールここ

class ThreadPool { 
    public: 
    ThreadPool(int threads) : shutdown_(false){ 
     threads_.reserve(threads); 
     for (int i = 0; i < threads; ++i) 
      threads_.emplace_back(std::bind(&ThreadPool::threadEntry, this, i)); 
    } 

    ~ThreadPool(){ 
     { 
      // Unblock any threads and tell them to stop 
      std::unique_lock<std::mutex>l(lock_); 

      shutdown_ = true; 
      condVar_.notify_all(); 
     } 

     // Wait for all threads to stop 
     std::cerr << "Joining threads" << std::endl; 

     for (auto & thread : threads_) thread.join(); 
    } 

    void doJob(std::function<void(void)>func){ 
     // Place a job on the queu and unblock a thread 
     std::unique_lock<std::mutex>l(lock_); 

     jobs_.emplace(std::move(func)); 
     condVar_.notify_one(); 
    } 

    void threadEntry(int i){ 
     std::function<void(void)>job; 

     while (1){ 
      { 
       std::unique_lock<std::mutex>l(lock_); 

       while (!shutdown_ && jobs_.empty()) condVar_.wait(l); 

       if (jobs_.empty()){ 
        // No jobs to do and we are shutting down 
        std::cerr << "Thread " << i << " terminates" << std::endl; 
        return; 
       } 

       std::cerr << "Thread " << i << " does a job" << std::endl; 
       job = std::move(jobs_.front()); 
       jobs_.pop(); 
      } 

      // Do the job without holding any locks 
      job(); 
     } 
    } 
}; 

は通常、あなたがキューから引き出された引数で与えられたタスクを実行するタスクのプロセッサを書きたいコード

void pFind(
vector<int>& a, 
int n, 
std::atomic<bool>& flag, 
int k, 
int numTh, 
int val, 
std::atomic<int>& completed) { 
    int i = k; 

    while (i < n) { 
     if (a[i] == val) { 
      flag = true; 
      break; 
     } else 
      i += numTh; 
    } 
    completed++; 
} 

int main() { 
    std::atomic<bool> flag; 
    flag = false; 
    int numTh = 8; 
    int val = 1000; 
    int pos = 0; 
    std::atomic<int> completed; 
    completed=0; 

    ThreadPool p(numThreads); 

    while (!flag) { 
     for (int i = 0; i < numThreads; i++) { 
      p.doJob(std::bind(pFind, std::ref(a), size, std::ref(flag), i, numTh, val, std::ref(completed))); 
     } 

     while (completed < numTh) {} 

     if (flag) { 
      break; 
     } else { 
      completed = 0; 
      val--; 
     } 

    } 
    cout << val << "\n"; 
    return 0; 
} 
関連する問題