2016-07-03 1 views
1

for_eachの並列インスタンスを実行するためのサンプルコードを書きました 以下のコードでスレッドに参加できません。私はコンカレントプログラミングには少し早いので、もしすべてのことを正しく行っているかどうかは分かりません。スレッドがfor_eachに参加できませんC++

template <typename Iterator, typename F> 
class for_each_block 
{ 
public : 
     void operator()(Iterator start, Iterator end, F f) { 
      cout << this_thread::get_id << endl; 
      this_thread::sleep_for(chrono::seconds(5)); 
      for_each(start, end, [&](auto& x) { f(x); }); 
    } 
}; 

typedef unsigned const long int ucli; 

template <typename Iterator, typename F> 
void for_each_par(Iterator first, Iterator last, F f) 
{ 
    ucli size = distance(first, last); 
    if (!size) 
     return; 
    ucli min_per_thread = 4; 
    ucli max_threads = (size + min_per_thread - 1)/min_per_thread; 
    ucli hardware_threads = thread::hardware_concurrency(); 

    ucli no_of_threads = min(max_threads, hardware_threads != 0 ? hardware_threads : 4); 

    ucli block_size = size/no_of_threads; 

    vector<thread> vf(no_of_threads); 
    Iterator block_start = first; 
    for (int i = 0; i < (no_of_threads - 1); i++) 
    { 
     Iterator end = first; 
     advance(end, block_size); 
     vf.push_back(std::move(thread(for_each_block<Iterator, F>(),first,end,f))); 
     first = end; 
    } 
    vf.push_back(std::move(thread(for_each_block<Iterator, F>(), first, last, f))); 
    cout << endl; 
    cout << vf.size() << endl; 
    for(auto& x: vf) 
    { 
     if (x.joinable()) 
      x.join(); 
     else 
      cout << "threads not joinable " << endl; 
    } 

    this_thread::sleep_for(chrono::seconds(100)); 
} 

int main() 
{ 
    vector<int> v1 = { 1,8,12,5,4,9,20,30,40,50,10,21,34,33 }; 
    for_each_par(v1.begin(), v1.end(), print_type<int>); 
return 0; 
} 

上記のコードでは、スレッドは結合できません。私はまだ非同期的な未来を試していますが、私は同じことをまだ得ています。ここに何かがないのですか?

すべてのヘルプは大歓迎され、 はこれがno_of_threadsデフォルトの初期化スレッドを持つベクトルを作成します

答えて

4
vector<thread> vf(no_of_threads); 

..事前にありがとうございます。それらはデフォルトで初期化されているので、それらのどれもが結合可能ではありません。

vector<thread> vf; 
vf.reserve(no_of_threads); 

P.S .: std::moveは一時的に冗長です:);これを変更することを検討:これまで

vf.push_back(std::move(thread(for_each_block<Iterator, F>(), first, last, f))); 

を:

vf.emplace_back(for_each_block<Iterator, F>(), first, last, f); 
+0

ありがとう、それは今動作します。しかし、私は1つの問題があります。すべてのスレッドが同じIDを出力します。あなたはまたもっと提案することができますか(私は何か間違ったやり方をしたかもしれないと確信しています)。また、私はいつもpush_backとemplace_backのために混乱を感じました:) –

+1

@KartikV 'this_thread :: get_id'は関数であり、呼び出す必要があります。今は関数ポインタの値を表示しています。 –

+0

@Ocelot本当に、私はとても愚かな気がする。二人とも目を開けました。私は愚かな間違いを避けるために、はっきりと見えるべきです。君たちありがとう。 –

1

これが面白いかであってもなくてもよいです。私はコードをリファクタリングして、より慣れ親しんだアプローチを使用するようにしました。あなたのアプローチが間違っていると言っているわけではありませんが、スレッド管理を学んでいるので、他に何ができるかに興味があると思いました。

必要に応じて難燃化してください。コメントをインライン:

#include <vector> 
#include <chrono> 
#include <thread> 
#include <mutex> 
#include <iomanip> 
#include <future> 

using namespace std; 

// 
// provide a means of serialising writing to a stream. 
// 
struct locker 
{ 
    locker() : _lock(mutex()) {} 

    static std::mutex& mutex() { static std::mutex m; return m; } 
    std::unique_lock<std::mutex> _lock; 
}; 
std::ostream& operator<<(std::ostream& os, const locker& l) { 
    return os; 
} 

// 
// fill in the missing work function 
// 
template<class T> 
void print_type(const T& t) { 
    std::cout << locker() << hex << std::this_thread::get_id() << " : " << dec << t << std::endl; 
} 

// put this in your personable library. 
// the standards committee really should have given us ranges by now... 
template<class I1, class I2> 
struct range_impl 
{ 
    range_impl(I1 i1, I2 i2) : _begin(i1), _end(i2) {}; 

    auto begin() const { return _begin; } 
    auto end() const { return _end; } 

    I1 _begin; 
    I2 _end; 
}; 

// distinct types because sometimes dissimilar iterators are comparable 
template<class I1, class I2> 
auto range(I1 i1, I2 i2) { 
    return range_impl<I1, I2>(i1, i2); 
} 

// 
// lets make a helper function so we can auto-deduce template args 
// 
template<class Iterator, typename F> 
auto make_for_each_block(Iterator start, Iterator end, F&& f) 
{ 
    // a lambda gives all the advantages of a function object with none 
    // of the boilerplate. 
    return [start, end, f = std::move(f)] { 
     cout << locker() << this_thread::get_id() << endl; 
     this_thread::sleep_for(chrono::seconds(1)); 

     // let's keep loops simple. for_each is a bit old-skool. 
     for (auto& x : range(start, end)) { 
      f(x); 
     } 
    }; 
} 


template <typename Iterator, typename F> 
void for_each_par(Iterator first, Iterator last, F f) 
{ 
    if(auto size = distance(first, last)) 
    { 
     std::size_t min_per_thread = 4; 
     std::size_t max_threads = (size + min_per_thread - 1)/min_per_thread; 
     std::size_t hardware_threads = thread::hardware_concurrency(); 

     auto no_of_threads = min(max_threads, hardware_threads != 0 ? hardware_threads : 4); 

     auto block_size = size/no_of_threads; 

     // futures give us two benefits: 
     // 1. they automatically transmit exceptions 
     // 2. no need for if(joinable) join. get is sufficient 
     // 
     vector<future<void>> vf; 
     vf.reserve(no_of_threads - 1); 
     for (auto count = no_of_threads ; --count ;) 
     { 
      // 
      // I was thinking of refactoring this into std::generate_n but actually 
      // it was less readable. 
      // 
      auto end = std::next(first, block_size); 
      vf.push_back(async(launch::async, make_for_each_block(first, end, f))); 
      first = end; 
     } 
     cout << locker() << endl << "threads: " << vf.size() << " (+ main thread)" << endl; 

     // 
     // why spawn a thread for the remaining block? we may as well use this thread 
     // 
     /* auto partial_sum = */ make_for_each_block(first, last, f)(); 

     // join the threads 
     // note that if the blocks returned a partial aggregate, we could combine them 
     // here by using the values in the futures. 
     for (auto& f : vf) f.get(); 
    } 
} 

int main() 
{ 
    vector<int> v1 = { 1,8,12,5,4,9,20,30,40,50,10,21,34,33 }; 
    for_each_par(v1.begin(), v1.end(), print_type<int>); 
    return 0; 
} 

サンプル出力:

0x700000081000 
0x700000104000 

threads: 3 (+ main thread) 
0x700000187000 
0x100086000 
0x700000081000 : 1 
0x700000104000 : 5 
0x700000187000 : 20 
0x100086000 : 50 
0x700000081000 : 8 
0x700000104000 : 4 
0x700000187000 : 30 
0x100086000 : 10 
0x700000081000 : 12 
0x700000104000 : 9 
0x700000187000 : 40 
0x100086000 : 21 
0x100086000 : 34 
0x100086000 : 33 
Program ended with exit code: 0 

はここに移動:: STDを説明してください:[start, end, f = std::move(f)] {...};

これは、C +で利用できるようになりました歓迎言語機能です+14。キャプチャブロック内のf = std::move(f)は、decltype(f) new_f = std::move(f)と等価です。ただし、新しい変数はで、new_fではありません。それは私たちがstd::moveオブジェクトをラムダにコピーするのではなくラムダに入れることを可能にします。

ほとんどの関数オブジェクトでは問題ありませんが、いくつかは大きくなる可能性があります。これにより、コンパイラは利用可能な場合はコピーではなく移動を使用できます。

+0

それは非常にきちんとした、ロッカー、範囲は本当にクールです。関数の引数として使用した移動セマンティクスについて説明してください。 –

+0

@KartikVが更新されました。助けてくれることを望む。 –

+0

ありがとうございました。あなたのヘルパー機能は本当に便利です。そのようなものがあれば、どんなブログにでも向けることができますか?(または、時間があれば書くことができます) –

関連する問題