2012-06-27 29 views
6

Stackoverflowは私にとって大きな助けとなりました。私はコミュニティに何かを戻したいと思います。私はTinyThread ++ website C++ポータブルスレッドライブラリを使用して簡単なスレッドプールを実装しています。これはStackoverflowから学んだものを使用しています。シンプルなC++スレッドプール実装のクエリ

// ThreadPool.h 

class ThreadPool 
{ 
public: 

ThreadPool(); 
~ThreadPool(); 

// Creates a pool of threads and gets them ready to be used 
void CreateThreads(int numOfThreads); 

// Assigns a job to a thread in the pool, but doesn't start the job 
// Each SubmitJob call will use up one thread of the pool. 
// This operation can only be undone by calling StartJobs and 
// then waiting for the jobs to complete. On completion, 
// new jobs may be submitted. 
void SubmitJob(void (*workFunc)(void *), void *workData); 

// Begins execution of all the jobs in the pool. 
void StartJobs(); 

// Waits until all jobs have completed. 
// The wait will block the caller. 
// On completion, new jobs may be submitted. 
void WaitForJobsToComplete(); 

private: 

enum typeOfWorkEnum { e_work, e_quit }; 

class ThreadData 
{ 
    public: 

    bool ready; // thread has been created and is ready for work 
    bool haveWorkToDo; 
    typeOfWorkEnum typeOfWork; 

    // Pointer to the work function each thread has to call. 
    void (*workFunc)(void *); 

    // Pointer to work data 
    void *workData; 

    ThreadData() : ready(false), haveWorkToDo(false) { }; 
}; 

struct ThreadArgStruct 
{ 
    ThreadPool *threadPoolInstance; 
    int   threadId; 
}; 

// Data for each thread 
ThreadData *m_ThreadData; 

ThreadPool(ThreadPool const&); // copy ctor hidden 
ThreadPool& operator=(ThreadPool const&); // assign op. hidden 

// Static function that provides the function pointer that a thread can call 
// By including the ThreadPool instance in the void * parameter, 
// we can use it to access other data and methods in the ThreadPool instance. 
static void ThreadFuncWrapper(void *arg) 
{ 
    ThreadArgStruct *threadArg = static_cast<ThreadArgStruct *>(arg); 
    threadArg->threadPoolInstance->ThreadFunc(threadArg->threadId); 
} 

// The function each thread calls  
void ThreadFunc(int threadId); 

// Called by the thread pool destructor 
void DestroyThreadPool(); 

// Total number of threads available 
// (fixed on creation of thread pool) 
int m_numOfThreads; 
int m_NumOfThreadsDoingWork; 
int m_NumOfThreadsGivenJobs; 

// List of threads 
std::vector<tthread::thread *> m_ThreadList; 

// Condition variable to signal each thread has been created and executing 
tthread::mutex    m_ThreadReady_mutex; 
tthread::condition_variable m_ThreadReady_condvar; 

// Condition variable to signal each thread to start work 
tthread::mutex    m_WorkToDo_mutex; 
tthread::condition_variable m_WorkToDo_condvar; 

// Condition variable to signal the main thread that 
// all threads in the pool have completed their work 
tthread::mutex    m_WorkCompleted_mutex; 
tthread::condition_variable m_WorkCompleted_condvar; 
}; 

のcppファイル:

// 
// ThreadPool.cpp 
// 

#include "ThreadPool.h"  

// This is the thread function for each thread. 
// All threads remain in this function until 
// they are asked to quit, which only happens 
// when terminating the thread pool. 
void ThreadPool::ThreadFunc(int threadId) 
{ 
ThreadData *myThreadData = &m_ThreadData[threadId]; 
std::cout << "Hello world: Thread " << threadId << std::endl; 

// Signal that this thread is ready 
m_ThreadReady_mutex.lock(); 
     myThreadData->ready = true; 
     m_ThreadReady_condvar.notify_one(); // notify the main thread 
m_ThreadReady_mutex.unlock();  

while(true) 
{ 
    //tthread::lock_guard<tthread::mutex> guard(m); 
    m_WorkToDo_mutex.lock(); 

    while(!myThreadData->haveWorkToDo) // check for work to do 
     m_WorkToDo_condvar.wait(m_WorkToDo_mutex); // if no work, wait here 
    myThreadData->haveWorkToDo = false; // need to do this before unlocking the mutex 

    m_WorkToDo_mutex.unlock(); 

    // Do the work 
    switch(myThreadData->typeOfWork) 
    { 
     case e_work: 
      std::cout << "Thread " << threadId << ": Woken with work to do\n"; 

      // Do work 
      myThreadData->workFunc(myThreadData->workData); 

      std::cout << "#Thread " << threadId << ": Work is completed\n"; 
      break; 

     case e_quit: 
      std::cout << "Thread " << threadId << ": Asked to quit\n"; 
      return; // ends the thread 
    } 

    // Now to signal the main thread that my work is completed 
    m_WorkCompleted_mutex.lock(); 
     m_NumOfThreadsDoingWork--; 

     // Unsure if this 'if' would make the program more efficient 
     // if(m_NumOfThreadsDoingWork == 0) 
      m_WorkCompleted_condvar.notify_one(); // notify the main thread 
    m_WorkCompleted_mutex.unlock();  
    } 

} 


ThreadPool::ThreadPool() 
{ 
    m_numOfThreads = 0; m_NumOfThreadsDoingWork = 0; m_NumOfThreadsGivenJobs = 0; 
} 


ThreadPool::~ThreadPool() 
{ 
    if(m_numOfThreads) 
    { 
    DestroyThreadPool(); 
    delete [] m_ThreadData; 
    } 
} 


void ThreadPool::CreateThreads(int numOfThreads) 
{ 
// Check if a thread pool has already been created 
if(m_numOfThreads > 0) 
    return; 

m_NumOfThreadsGivenJobs = 0; 
m_NumOfThreadsDoingWork = 0; 
m_numOfThreads = numOfThreads; 
m_ThreadData = new ThreadData[m_numOfThreads]; 
ThreadArgStruct threadArg; 

for(int i=0; i<m_numOfThreads; ++i) 
{ 
    threadArg.threadId = i; 
    threadArg.threadPoolInstance = this; 

    // Creates the thread and saves it in a list so we can destroy it later 
    m_ThreadList.push_back(new tthread::thread(ThreadFuncWrapper, (void *)&threadArg )); 

    // It takes a little time for a thread to get established. 
    // Best wait until it gets established before creating the next thread. 
    m_ThreadReady_mutex.lock(); 
    while(!m_ThreadData[i].ready) // Check if thread is ready 
     m_ThreadReady_condvar.wait(m_ThreadReady_mutex); // If not, wait here 
    m_ThreadReady_mutex.unlock();  
} 
} 


// Assigns a job to a thread, but doesn't start the job 
void ThreadPool::SubmitJob(void (*workFunc)(void *), void *workData) 
{ 
// Check if the thread pool has been created 
if(!m_numOfThreads) 
    return; 

if(m_NumOfThreadsGivenJobs >= m_numOfThreads) 
    return; 

m_ThreadData[m_NumOfThreadsGivenJobs].workFunc = workFunc; 
m_ThreadData[m_NumOfThreadsGivenJobs].workData = workData; 

std::cout << "Submitted job " << m_NumOfThreadsGivenJobs << std::endl; 

m_NumOfThreadsGivenJobs++; 
} 

void ThreadPool::StartJobs() 
{ 
// Check that the thread pool has been created 
// and some jobs have been assigned 
if(!m_numOfThreads || !m_NumOfThreadsGivenJobs) 
    return; 

// Set 'haveworkToDo' flag for all threads 
m_WorkToDo_mutex.lock(); 
    for(int i=0; i<m_NumOfThreadsGivenJobs; ++i) 
    { 
     m_ThreadData[i].typeOfWork = e_work; // forgot to do this ! 
     m_ThreadData[i].haveWorkToDo = true; 
    } 
    m_NumOfThreadsDoingWork = m_NumOfThreadsGivenJobs; 

    // Reset this counter so we can resubmit jobs later 
    m_NumOfThreadsGivenJobs = 0; 

    // Notify all threads they have work to do 
    m_WorkToDo_condvar.notify_all(); 
    m_WorkToDo_mutex.unlock(); 
} 


void ThreadPool::WaitForJobsToComplete() 
{ 
    // Check that a thread pool has been created 
    if(!m_numOfThreads) 
    return; 

m_WorkCompleted_mutex.lock(); 
while(m_NumOfThreadsDoingWork > 0) // Check if all threads have completed their work 
    m_WorkCompleted_condvar.wait(m_WorkCompleted_mutex); // If not, wait here 
m_WorkCompleted_mutex.unlock();  
} 


void ThreadPool::DestroyThreadPool() 
{ 
std::cout << "Ask threads to quit\n"; 
m_WorkToDo_mutex.lock(); 
    for(int i=0; i<m_numOfThreads; ++i) 
    { 
    m_ThreadData[i].haveWorkToDo = true; 
    m_ThreadData[i].typeOfWork = e_quit; 
    } 
    m_WorkToDo_condvar.notify_all(); 
m_WorkToDo_mutex.unlock(); 

// As each thread terminates, catch them here 
for(int i=0; i<m_numOfThreads; ++i) 
{ 
    tthread::thread *t = m_ThreadList[i]; 

    // Wait for thread to complete 
    t->join(); 
} 
m_numOfThreads = 0; 
} 
私は最高の(Linux環境下で非常によく動作します)コードを提示した後に尋ねた質問を持っているミューテックスを持つように快適ではない、プログラミングのスレッドに新しいです、など

使用例: (これは平方の逆数を合計してπ二乗/ 6を計算します) 実際、この使用例では同じ計算を10回並列に実行します。より実用的な使用法は、各スレッドが合計されたタームの異なるセットを計算することです。プールジョブが完了すると、すべてのスレッド結果を追加して最終結果を取得します。

struct CalculationDataStruct 
{ 
int inputVal; 
double outputVal; 
}; 

void LongCalculation(void *theSums) 
{ 
CalculationDataStruct *sums = (CalculationDataStruct *)theSums; 

int terms = sums->inputVal; 
double sum; 
for(int i=1; i<terms; i++) 
    sum += 1.0/(double(i)*double(i)); 
sums->outputVal = sum; 
} 


int main(int argc, char** argv) 
{ 
int numThreads = 10; 

// Create pool 
ThreadPool threadPool; 
threadPool.CreateThreads(numThreads); 

// Create thread workspace 
CalculationDataStruct sums[numThreads]; 

// Set up jobs 
for(int i=0; i<numThreads; i++) 
{ 
    sums[i].inputVal = 3000*(i+1); 
    threadPool.SubmitJob(LongCalculation, &sums[i]); 
} 

// Run the jobs 
threadPool.StartJobs(); 
threadPool.WaitForJobsToComplete(); 

// Print results 
for(int i=0; i<numThreads; i++) 
    std::cout << "Sum of " << sums[i].inputVal << " terms is " << sums[i].outputVal << std::endl; 

return 0; 
} 

質問:ThreadPoolの:: ThreadFunc法で 、以下の場合には、より良い性能が得られるであろう声明

if(NumOfThreadsDoingWork == 0) 

が含まれていた場合はどうなりますか? また、コードを改善するための批判と方法に感謝したいと思います。同時に、コードが他の人にも役に立つことを願っています。

+0

単一のジョブが完了した後でメインスレッドが行うことがない限り、すべてのジョブが完了するまでは通知がありません。すべてのスレッドがすぐにスリープ状態に戻った場合は、メインスレッドをスリープ状態にします。つまり、スレッドプールにスレッド数が多い場合を除いて、オーバーヘッドが十分であるとは限りません。 –

+0

Thanks JF、それも私の考えでした。私は「if」文の有無にかかわらずコードを試してみましたが、パフォーマンスの違いは検出できませんでしたが、私は10スレッドしか使用していませんでした。 – ticketman

+0

Windowsデバッグビルドが正常に動作しなくなるバグを修正しました:m_ThreadData [i] .typeOfWork = e_work;関数StartJobs()のfor-loopに渡します。 – ticketman

答えて

1

まず、C++ 11の "std::thread"と "std :: mutex"を調べるとよいでしょう。インテルの "Threading Building Blocks"を調べて、ワーク配布用のパターンをいくつか調べることもできます。移植可能なクロスプラットフォームのC++カプセル化APIの場合、私は一般的にOpenThreads libraryを使用しました。最後に、ZeroMQのようなメッセージパッシングライブラリを使用して、ミューテックスなしでスケーラブルで分散した作業負荷を構築できます。

現在のコードを見ると、スレッドに作業を割り当てるために使用される変数をロックしているようには見えません。あなたがSubmitJobとStartWorkを分けたからです。

最終的に、あなたのThreadPoolはスレッドセーフではありません。

また、仕事の種類などの複雑なAPIもあります。おそらく、「仕事」の概念を抽象化する必要があります。ここで私がやった例があります。大部分のコードをThreadPoolクラスにカプセル化したいと思うでしょう。また、終了方法(NULLジョブ)はちょっと人工的です、あなたはおそらくpthread_cancelを使いたいでしょうが、これはかなりうまくこのデモンストレーションに役立ちました。

#include <queue> 
#include <pthread.h> 
#include <stdio.h> 
#include <stdlib.h> 
#include <unistd.h> 

static int jobNo = 0; 
class Job { 
public: 
    Job() : m_i(++jobNo) { printf("Created job %d.\n", m_i); } 
    int m_i; 
    void Execute() { printf("Job %d executing.\n", m_i); usleep(500 * 1000); } 
}; 

std::queue<Job*> queue; 
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; 
pthread_cond_t cond = PTHREAD_COND_INITIALIZER; 

void AddJob(Job* job) { 
    pthread_mutex_lock(&mutex); 
    queue.push(job); 
    pthread_cond_signal(&cond); 
    pthread_mutex_unlock(&mutex); 
} 

void* QueueWorker(void* /*threadInfo*/) { 
    Job* job = NULL; 
    for (;;) { 
     pthread_mutex_lock(&mutex); 
     while (queue.empty()) { 
      // unlock the mutex until the cond is signal()d or broadcast() to. 
      // if this call succeeds, we will have the mutex locked again on the other side. 
      pthread_cond_wait(&cond, &mutex); 
     } 
     // take the first task and then release the lock. 
     job = queue.front(); 
     queue.pop(); 
     pthread_mutex_unlock(&mutex); 

     if (job == NULL) { 
      // in this demonstration, NULL ends the run, so forward to any other threads. 
      AddJob(NULL); 
      break; 
     } 
     job->Execute(); 
     delete job; 
    } 
    return NULL; 
} 

int main(int argc, const char* argv[]) { 
    pthread_t worker1, worker2; 
    pthread_create(&worker1, NULL, &QueueWorker, NULL); 
    pthread_create(&worker2, NULL, &QueueWorker, NULL); 

    srand(time(NULL)); 

    // queue 5 jobs with delays. 
    for (size_t i = 0; i < 5; ++i) { 
     long delay = (rand() % 800) * 1000; 
     printf("Producer sleeping %fs\n", (float)delay/(1000*1000)); 
     usleep(delay); 
     Job* job = new Job(); 
     AddJob(job); 
    } 
    // 5 more without delays. 
    for (size_t i = 0; i < 5; ++i) { 
     AddJob(new Job); 
    } 
    // null to end the run. 
    AddJob(NULL); 

    printf("Done with jobs.\n"); 
    pthread_join(worker1, NULL); 
    pthread_join(worker2, NULL); 

    return 0; 
} 
関連する問題