2017-05-24 4 views
2

QtConcurrent::map関数を使用してQVectorを操作します。すべての私のサンプルプログラムではありませんので、マルチスレッドバージョンとほとんどスピード上の利点がある1QtConcurrent :: mapには何も表示されません

QVector<double> arr(10000000, 0); 
QElapsedTimer timer; 
qDebug() << QThreadPool::globalInstance()->maxThreadCount() << "Threads"; 

int end; 
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ 
timer.start(); 
for(int i = 0; i < 100; ++i) { 
    std::transform(arr.begin(), arr.end(), arr.begin(), [](double x){ return ++x; }); 
} 
end = timer.elapsed(); 
qDebug() << end; 
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ 
timer.start(); 
for(int i = 0; i < 100; ++i) { 
    std::for_each(arr.begin(), arr.end(), [](double &x){ ++x; }); 
} 
end = timer.elapsed(); 
qDebug() << end; 
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ 
timer.start(); 
for(int i = 0; i < 100; ++i) { 
    QFuture<void> qf = QtConcurrent::map(arr.begin(), arr.end(), [](double &x){ ++x; }); 
    qf.waitForFinished(); 
} 
end = timer.elapsed(); 
qDebug() << end; 

しかしプログラムの出力

4 Threads 
905 // std::transform 
886 // std::for_each 
876 // QtConcurrent::map 

によってQVector内のすべての値をインクリメントすることです。実際には4つのスレッドが実行されていることを確認しました。私は-O2最適化を使用しました。このような状況に、より一般的なQThreadPoolアプローチが適していますか?

EDIT:

IはQtConcurrent::run()を用いdifferernt方法を試みました。

void add1(QVector<double>::iterator first, QVector<double>::iterator last) { 
    for(; first != last; ++first) { 
     *first += 1; 
    } 
} 

/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ 
std::for_each(arr.begin(), arr.end(), [](double &x){ ++x; }); 
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ 
QFuture<void> qf[numThreads]; 
for(int j = 0; j < numThreads; ++j) { 
    qf[j] = QtConcurrent::run(add1, arr.begin()+j*n/numThreads, arr.begin()+(j+1)*n/numThreads-1); 
} 
for(int j = 0; j < numThreads; ++j) { 
    qf[j].waitForFinished(); 
} 

私は手動で別のスレッドでタスクを配布します。しかし、まだパフォーマンスの向上はほとんど得られません。

181 ms // std::for_each 
163 ms // QtConcurrent::run 

ここでも何が間違っていますか?

+1

なぜスピードアップが期待できますか?あなたは各ループの反復で未来を待っています。 – juanchopanza

+0

私はこの分野の専門家ではありませんが、map()が4つのスレッドを開始し、このコード行をSTL関数より速く終了させる必要があります。あるいは、私はこの機能の概念を誤解しましたか? – NullAchtFuffZehn

答えて

3

QtConcurrentは、単にC++のスレッドプリミティブとロールスイン独自のスレッドプールを使用するのと比べてオーバーヘッドが高いと思われます。

template<class T> 
struct threaded_queue { 
    using lock = std::unique_lock<std::mutex>; 
    void push_back(T t) { 
    { 
     lock l(m); 
     data.push_back(std::move(t)); 
    } 
    cv.notify_one(); 
    } 
    boost::optional<T> pop_front() { 
    lock l(m); 
    cv.wait(l, [this]{ return abort || !data.empty(); }); 
    if (abort) return {}; 
    auto r = std::move(data.back()); 
    data.pop_back(); 
    return std::move(r); 
    } 
    void terminate() { 
    { 
     lock l(m); 
     abort = true; 
     data.clear(); 
    } 
    cv.notify_all(); 
    } 
    ~threaded_queue() 
    { 
    terminate(); 
    } 
private: 
    std::mutex m; 
    std::deque<T> data; 
    std::condition_variable cv; 
    bool abort = false; 
}; 
struct thread_pool { 
    thread_pool(std::size_t n = 1) { start_thread(n); } 
    thread_pool(thread_pool&&) = delete; 
    thread_pool& operator=(thread_pool&&) = delete; 
    ~thread_pool() = default; // or `{ terminate(); }` if you want to abandon some tasks 
    template<class F, class R=std::result_of_t<F&()>> 
    std::future<R> queue_task(F task) { 
    std::packaged_task<R()> p(std::move(task)); 
    auto r = p.get_future(); 
    tasks.push_back(std::move(p)); 
    return r; 
    } 
    template<class F, class R=std::result_of_t<F&()>> 
    std::future<R> run_task(F task) { 
    if (threads_active() >= total_threads()) { 
     start_thread(); 
    } 
    return queue_task(std::move(task)); 
    } 
    void terminate() { 
    tasks.terminate(); 
    } 
    std::size_t threads_active() const { 
    return active; 
    } 
    std::size_t total_threads() const { 
    return threads.size(); 
    } 
    void clear_threads() { 
    terminate(); 
    threads.clear(); 
    } 
    void start_thread(std::size_t n = 1) { 
    while(n-->0) { 
     threads.push_back(
     std::async(std::launch::async, 
      [this]{ 
      while(auto task = tasks.pop_front()) { 
       ++active; 
       try{ 
       (*task)(); 
       } catch(...) { 
       --active; 
       throw; 
       } 
       --active; 
      } 
      } 
     ) 
    ); 
    } 
    } 
private: 
    std::vector<std::future<void>> threads; 
    threaded_queue<std::packaged_task<void()>> tasks; 
    std::atomic<std::size_t> active = {}; 
}; 

struct my_timer_t { 
    std::chrono::high_resolution_clock::time_point first; 
    std::chrono::high_resolution_clock::duration duration; 

    void start() { 
     first = std::chrono::high_resolution_clock::now(); 
    } 
    std::chrono::high_resolution_clock::duration finish() { 
     return duration = std::chrono::high_resolution_clock::now()-first; 
    } 
    unsigned long long ms() const { 
     return std::chrono::duration_cast<std::chrono::milliseconds>(duration).count(); 
    } 
}; 
int main() { 
    std::vector<double> arr(1000000, 0); 
    my_timer_t timer; 

    /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ 
    timer.start(); 
    for(int i = 0; i < 100; ++i) { 
     std::transform(arr.begin(), arr.end(), arr.begin(), [](double x){ return ++x; }); 
    } 
    timer.finish(); 
    auto time_transform = timer.ms(); 
    std::cout << time_transform << "<- std::transform (" << arr[rand()%arr.size()] << ")\n"; 
    /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ 
    timer.start(); 
    for(int i = 0; i < 100; ++i) { 
     std::for_each(arr.begin(), arr.end(), [](double &x){ ++x; }); 
    } 
    timer.finish(); 
    auto time_for_each = timer.ms(); 
    std::cout << time_for_each << "<- std::for_each (" << arr[rand()%arr.size()] << ")\n"; 
    /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ 
    enum { num_threads = 8 }; 
    thread_pool pool(num_threads); 
    timer.start(); 
    for(int i = 0; i < 100; ++i) { 
     std::array< std::future<void>, num_threads > tasks; 
     for (int t = 0; t < num_threads; ++t) { 
      tasks[t] = pool.run_task([&,t]{ 
       std::for_each(arr.begin()+(arr.size()/num_threads)*t, arr.begin()+(arr.size()/num_threads)*(t+1), [](double& x){++x;}); 
      }); 
     } 
     // std::cout << "loop! -- " << pool.threads_active() << "/" << pool.total_threads() << std::endl; 
     for (int t = 0; t < num_threads; ++t) 
      tasks[t].wait(); 
    } 
    timer.finish(); 
    auto time_pool = timer.ms(); 
    std::cout << time_pool << "<- thread_pool (" << arr[rand()%arr.size()] << ")\n"; 
} 

Live example

これが生成します。

153<- std::transform (100) 
131<- std::for_each (200) 
82<- thread_pool (300) 

大幅スピードアップタスク8つの方法を分割するために、単純なC++ 11スレッドプールを使用した場合。 (タスクを4つに分割すると約105でした)。

私はプログラムの実行に時間がかかり、オンラインシステムがタイムアウトしたため、私は自分の10倍のテストセットを使用しました。

あなたのスレッドプールシステムと通信するオーバーヘッドが発生しますが、私の素朴なスレッドプールは、このような実際のライブラリーより優れているはずはありません。

ここで深刻な問題は、メモリがIOにバインドされている可能性があることです。あなたがすべてのバイトを待たなければならない場合、もっと速くバイトにアクセスするスレッドは助けになりません。

+1

'QtConcurrent'のオーバーヘッドをどのようにテストしましたか? '++'演算を 'num_threads'バッチにグループ化したことに注意してください。あなたは 'QtConcurrent'でもこれを行うことができます。 – m7913d

+0

@ m7913dこれは 'QtConcurrent'がやろうとしていることです。ハードウェアスレッドの数に基づいてタスクの一部を処理するいくつかのサブスレッドを起動します。私はちょうど手動でそれをしました。私は 'for_each'に比べてかなりのスピードアップを得ていました。 – Yakk

+0

'QtConcurrent'は、各操作をスレッドに分配します(同時スレッドの最大数を考慮します)。それをグループ化しません。同じ時間がかからない場合は、操作を正しくグループ化するのは簡単ではありません。 – m7913d

関連する問題