2017-01-28 3 views
0

マルチスレッドに慣れていて、タスクが残っているタスクの単純なスレッドセーフなキューを実装しようとしています。いずれのスレッドからもタスクのキューイングは行われません。C Pthreads - スレッドセーフなキューインプリメンテーションの問題

テスト目的のために、すべてのタスクは単なる数値を保持します。私はメインでこれをrunned簡単なテストのために

static pthread_mutex_t task_mutex = PTHREAD_MUTEX_INITIALIZER; 

    typedef struct Task{ 
     int number; 
    }Task; 


    typedef struct Cell{ 
     Task t; 
     struct Cell* next; 
    }Cell; 


    typedef struct TQueue{ 
     struct Cell* head; 
     struct Cell* tail; 
    }TQueue; 



    int empty(TQueue *Queue) 
     return queue->head == queue->tail; 


    void startQueue(TQueue *queue){ 

     queue->head = malloc(sizeof(Cell)); 
     queue->tail = queue->head; 
    } 

    void enqueue(TQueue *queue, Task C){ 

     queue->tail->next = malloc(sizeof(Cell)); 
     queue->tail = queue->tail->next; 
     queue->tail->t = C; 
     queue->tail->next = NULL; 
    } 


    Task * dequeue(TQueue* queue){ 

     pthread_mutex_lock(&task_mutex); 
     Task * t; 

     if(empty(queue)) t = NULL; 

     else{ 

      struct Cell* p = queue->head; 
      queue->head = queue->head->next; 
      t = &queue->head->t; 
      free(p); 
     } 

     pthread_mutex_unlock(&task_mutex); 
     return t; 
    } 

    void * work(void* arg){ 

     TQueue* queue = (TQueue *)arg; 
     Task* t = malloc(sizeof(Task)); 

     for(t = dequeue(queue); t != NULL; t = dequeue(queue)) 
      printf("%d ", t->number); 

     free(t); 
     pthread_exit(NULL); 
     return 0; 
    } 

int main(){ 

    TQueue* queue = malloc(sizeof(TQueue)); 
    startQueue(queue); 

    pthread_t threads[3]; 
    Task t[3]; 


    for(int i = 0; i < 3; i++){ 
     t[i].number = i + 1; 
     enqueue(queue, t[i]); 
    } 

    for(int i = 0; i < 3; i++) pthread_create(&threads[i], NULL, work, (void*)queue); 

    for(int i = 0; i < 3; i++) pthread_join(threads[i], NULL); 

    return 0; 
} 

予想される出力は、任意の順序で1 2 3だったが、時にはそれが1823219 2 3のようなそれで奇妙な番号と列を出力します。私は競争条件や関連する問題を検出できませんでしたので、何か助けていただければ幸いです。

+0

'トン=&queue->頭部> tは、 free(p); '' p''は実際には 'queue-> head'です。だから 'free(p)'は解放されたメモリを 't'にして、未定義の振る舞いを引き起こします。 – kaylum

+0

上記のバグkaylumを修正した後、あなたの 'エンキュー'と 'デキュー'はアトミックかもしれません。'dequeue'は正しい順序で要素を取り除きます)、競合状態はあります。タスク1はデキュー1に、タスク2はデキュー2に、タスク2は_print_で始まるので、' 2 1'を得ます。なぜなら、印刷のための 'for 'はロックを持っていないからです。 –

+0

@kaylum queue-> headは、tが&queue-> head-> tに割り当てられる前に、queue-> head-> –

答えて

0

さらにいくつかのバグが見つかりました。

私はあなたのコードに注釈を付けました。あなたの最初の投稿と2番目の投稿からちょっとしたことがありました。私は前と後に[無償スタイルのクリーンアップをご容赦ください]を示す、コードを修正しました:

#include <stdio.h> 
#include <pthread.h> 
#include <malloc.h> 

static pthread_mutex_t task_mutex = PTHREAD_MUTEX_INITIALIZER; 

typedef struct Task { 
    int number; 
} Task; 

typedef struct Cell { 
// NOTE/BUG: this should be a pointer to the task. otherwise, dequeue gets 
// messy 
#if 0 
    Task t; 
#else 
    Task *t; 
#endif 
    struct Cell *next; 
} Cell; 

typedef struct TQueue { 
    struct Cell *head; 
    struct Cell *tail; 
} TQueue; 

void 
startQueue(TQueue *queue) 
{ 

#if 0 
    queue->head = malloc(sizeof(Cell)); 
#else 
    queue->head = NULL; 
#endif 
    queue->tail = NULL; 
} 

int 
empty(TQueue *queue) 
{ 

    // NOTE/BUG: dequeue never touches tail, so this test is incorrect 
#if 0 
    return (queue->head == queue->tail); 
#else 
    return (queue->head == NULL); 
#endif 
} 

void 
enqueue(TQueue *queue, Task *t) 
{ 
    Cell *p; 

    pthread_mutex_lock(&task_mutex); 

    p = malloc(sizeof(Cell)); 
    p->next = NULL; 
    p->t = t; 

    if (queue->tail == NULL) { 
     queue->tail = p; 
     queue->head = p; 
    } 
    else { 
     queue->tail->next = p; 
     queue->tail = p; 
    } 

    pthread_mutex_unlock(&task_mutex); 
} 

Task * 
dequeue(TQueue *queue) 
{ 
    Task *t; 

    pthread_mutex_lock(&task_mutex); 

    if (empty(queue)) 
     t = NULL; 

    else { 
     Cell *p = queue->head; 

     if (p == queue->tail) 
      queue->tail = NULL; 

     queue->head = p->next; 

     // NOTE/BUG: this is setting t to the second element in the list, 
     // not the first 
     // NOTE/BUG: this is also undefined behavior, in original code (with 
     // original struct definition), because what t points to _does_ get 
     // freed before return 
#if 0 
     t = &queue->head->t; 
#else 
     t = p->t; 
#endif 

     free(p); 
    } 

    pthread_mutex_unlock(&task_mutex); 

    return t; 
} 

void * 
work(void *arg) 
{ 

    TQueue *queue = (TQueue *) arg; 

    // NOTE/BUG: this gets orphaned on the first call to dequeue 
#if 0 
    Task *t = malloc(sizeof(Task)); 
#else 
    Task *t; 
#endif 

    for (t = dequeue(queue); t != NULL; t = dequeue(queue)) 
     printf("%d ", t->number); 

    // NOTE/BUG: this frees some cell allocated in main -- not what we want 
#if 0 
    free(t); 
#endif 

    pthread_exit(NULL); 
    return 0; 
} 

// For a simple test i runned this on main: 

int 
main() 
{ 

    TQueue *queue = malloc(sizeof(TQueue)); 

    startQueue(queue); 

    pthread_t threads[3]; 
    Task t[3]; 

    for (int i = 0; i < 3; i++) { 
     t[i].number = i + 1; 
#if 0 
     enqueue(queue, t); 
#else 
     enqueue(queue, &t[i]); 
#endif 
    } 

    for (int i = 0; i < 3; i++) 
     pthread_create(&threads[i], NULL, work, (void *) queue); 

    for (int i = 0; i < 3; i++) 
     pthread_join(threads[i], NULL); 

    return 0; 
} 

UPDATE:

が同時にタスクを実行するスレッドはありますか?私はhtopでCPU使用量をテストしてきました.4つのコアの中から1つのコアの使用率を最大にすることができます。

いくつか注意してください。 htopは、このような短い実行時間を持つプログラムではあまり表示されません。 10,000個のキュー項目でも、このプログラムは20msで実行されます。

プログラム自体が情報を出力するようにしてください(下記参照)。 printfstdinのスレッドロックを行いますので、プログラムの「シリアル」な性質に寄与している可能性があります。また、また、一つのスレッド(つまり、最初の1)がキューを独占し、すべてエントリを排水でき

(すなわちprintfdequeueよりもはるかに遅いです)、プログラムの実行時間に大きな量の貢献しますの前に他は実行する機会があります。

OSは、単一コア上のすべてのスレッドをスケジュールすることができます。その後、後で(たとえば1秒以内に)「移行」することができます。

いくつかのタイミング情報を出力用の印刷物に含めるようにプログラムを拡張しました。これは、あなたが見たいものをさらに表示するのに役立ちます。また、スレッド数とキューに登録されているアイテム数を制御するコマンドラインオプションを追加しました。これは自分のプログラムのいくつかに似ています。プログラム出力をログファイルに振り分けて調べます。複数の実行のオプション

#include <stdio.h> 
#include <stdlib.h> 
#include <pthread.h> 
#include <malloc.h> 
#include <time.h> 

int opt_n;        // suppress thread output 
int opt_T;        // number of threads 
int opt_Q;        // number of queue items 

static pthread_mutex_t task_mutex = PTHREAD_MUTEX_INITIALIZER; 

double tvzero; 

typedef struct Task { 
    int number; 
} Task; 

typedef struct Cell { 
    Task *t; 
    struct Cell *next; 
} Cell; 

typedef struct TQueue { 
    struct Cell *head; 
    struct Cell *tail; 
} TQueue; 

typedef struct Thread { 
    pthread_t tid; 
    int xid; 
    TQueue *queue; 
} Thread; 

double 
tvgetf(void) 
{ 
    struct timespec ts; 
    double sec; 

    clock_gettime(CLOCK_REALTIME,&ts); 
    sec = ts.tv_nsec; 
    sec /= 1e9; 
    sec += ts.tv_sec; 

    sec -= tvzero; 

    return sec; 
} 

void 
startQueue(TQueue *queue) 
{ 

    queue->head = NULL; 
    queue->tail = NULL; 
} 

int 
empty(TQueue *queue) 
{ 

    return (queue->head == NULL); 
} 

void 
enqueue(TQueue *queue, Task *t) 
{ 
    Cell *p; 

    pthread_mutex_lock(&task_mutex); 

    p = malloc(sizeof(Cell)); 
    p->next = NULL; 
    p->t = t; 

    if (queue->tail == NULL) { 
     queue->tail = p; 
     queue->head = p; 
    } 
    else { 
     queue->tail->next = p; 
     queue->tail = p; 
    } 

    pthread_mutex_unlock(&task_mutex); 
} 

Task * 
dequeue(TQueue *queue) 
{ 
    Task *t; 

    pthread_mutex_lock(&task_mutex); 

    if (empty(queue)) 
     t = NULL; 

    else { 
     Cell *p = queue->head; 

     if (p == queue->tail) 
      queue->tail = NULL; 

     queue->head = p->next; 

     t = p->t; 

     free(p); 
    } 

    pthread_mutex_unlock(&task_mutex); 

    return t; 
} 

void * 
work(void *arg) 
{ 
    Thread *tskcur = arg; 
    TQueue *queue = tskcur->queue; 
    Task *t; 
    double tvbef; 
    double tvaft; 

    while (1) { 
     tvbef = tvgetf(); 
     t = dequeue(queue); 
     tvaft = tvgetf(); 

     if (t == NULL) 
      break; 

     if (! opt_n) 
      printf("[%.9f/%.9f %5.5d] %d\n", 
       tvbef,tvaft - tvbef,tskcur->xid,t->number); 
    } 

    return (void *) 0; 
} 

// For a simple test i runned this on main: 

int 
main(int argc,char **argv) 
{ 
    char *cp; 
    TQueue *queue; 
    Task *t; 
    Thread *tsk; 

    --argc; 
    ++argv; 

    for (; argc > 0; --argc, ++argv) { 
     cp = *argv; 
     if (*cp != '-') 
      break; 

     switch (cp[1]) { 
     case 'n': // suppress thread output 
      opt_n = 1; 
      break; 

     case 'Q': // number of queue items 
      opt_Q = atoi(cp + 2); 
      break; 

     case 'T': // number of threads 
      opt_T = atoi(cp + 2); 
      break; 

     default: 
      break; 
     } 
    } 

    tvzero = tvgetf(); 

    queue = malloc(sizeof(TQueue)); 
    startQueue(queue); 

    if (opt_T == 0) 
     opt_T = 16; 
    Thread threads[opt_T]; 

    if (opt_Q == 0) 
     opt_Q = 10000; 
    t = malloc(sizeof(Task) * opt_Q); 

    for (int i = 0; i < opt_Q; i++) { 
     t[i].number = i + 1; 
     enqueue(queue, &t[i]); 
    } 

    for (int i = 0; i < opt_T; i++) { 
     tsk = &threads[i]; 
     tsk->xid = i + 1; 
     tsk->queue = queue; 
     pthread_create(&tsk->tid, NULL, work, tsk); 
    } 

    for (int i = 0; i < opt_T; i++) { 
     tsk = &threads[i]; 
     pthread_join(tsk->tid, NULL); 
    } 

    printf("TOTAL: %.9f\n",tvgetf()); 

    free(t); 

    return 0; 
} 

UPDATE#2で遊ん:

また、一つのスレッド(つまり、最初の1)がキューを独占し、前にすべてのエントリを排出でき他の人は走る機会があります」その場合は何ができるのですか?

いくつか。

pthread_createは少し時間がかかり、スレッド1は他のスレッドがまだ作成されている間に移動できます。これを改善する方法は、すべてのスレッドを作成することです。各スレッドは(スレッド制御ブロック内に)「実行中」フラグを設定します。メインスレッドは、すべてのスレッドがこのフラグを設定するのを待ちます。次に、メインスレッドは、各スレッドがプライマリスレッドループに入る前にスピンするグローバルな揮発性の「you_may_now_all_run」フラグを設定します。私の経験では、彼らはお互いのマイクロ秒以内に動作し始めます。

私はこれを下の更新されたコードに実装していませんでしたので、あなた自身で実験することができます[nanosleep]。

ミューテックスは、ブロックされたスレッドがキューに入れられ、ミューテックスを待っているため、かなり全体的に[少なくともLinuxの下で]公平です。コメントで言及したように、nanosleepも使用できますが、スレッドが遅くなるので、これはいくらか目的を破ります。

スレッドスターベーションに対する解毒剤は「公平」です。私が言及したように、待たずに公正さのための精巧なアルゴリズムがあります。これは、コーガン/ Petrankアルゴリズムです:http://www.cs.technion.ac.il/~erez/Papers/wf-methodology-ppopp12.pdfこれはとても買主の危険負担が...

しかし、妥協はチケットロックすることができ、本当に少し高度/関係です:https://en.wikipedia.org/wiki/Ticket_lock

私はプログラムを再度作り直しました。プールされた割り当て、チケットとmutexのロック、およびログエントリの遅延印刷のオプションがあります。また、スレッド間で結果をクロスチェックして、重複エントリがないことを確認します。

もちろん、このすべての鍵は正確で高精度なログです(測定できない場合は調整できません)。

は例えば、1は、パフォーマンスの向上が期待されるほど大きくはなかった、dequeuefreeを行うことは、単純に(スラブアロケータと同様に)resuableプールにセルを解放するよりも遅くなるだろうと思うだろうが。これは、glibcのmalloc/freeがただ驚異的な速さであるということでしょう[これは彼らだ請求]。

これらのさまざまなバージョンでは、独自のパフォーマンス測定スイートを構築する方法のアイデアが提供されるはずです。

とにかく、ここのコードです:

#include <stdio.h> 
#include <stdlib.h> 
#include <pthread.h> 
#include <stdatomic.h> 
#include <malloc.h> 
#include <errno.h> 
#include <string.h> 
#include <time.h> 

int opt_p;        // print thread output immediately 
int opt_T;        // number of threads 
int opt_Q;        // number of queue items 
int opt_L;        // use ticket lock 
int opt_M;        // use fast cell alloc/free 

typedef unsigned char byte; 
typedef unsigned int u32; 

#define sysfault(_fmt...) \ 
    do { \ 
     fprintf(stderr,_fmt); \ 
     exit(1); \ 
    } while (0) 

// lock control 
typedef struct AnyLock { 
    pthread_mutex_t mutex;    // standard mutex 
    volatile u32 seqreq;    // ticket lock request 
    volatile u32 seqacq;    // ticket lock grant 
} AnyLock; 

// work value 
typedef struct Task { 
    union { 
     struct Task *next; 
     int number; 
    }; 
} Task; 

// queue item 
typedef struct Cell { 
    struct Cell *next; 
    Task *t; 
} Cell; 

// queue control 
typedef struct TQueue { 
    struct Cell *head; 
    struct Cell *tail; 
} TQueue; 

// thread log entry 
typedef struct Log { 
    double tvbef; 
    double tvaft; 
    int number; 
} Log; 

#define BTVOFF(_off) \ 
    ((_off) >> 3) 
#define BTVMSK(_off) \ 
    (1u << ((_off) & 0x07)) 

#define BTVLEN(_len) \ 
    ((_len) + 7) >> 3 

// thread control 
typedef struct Thread { 
    pthread_t tid; 
    int xid; 
    TQueue *queue; 
    Log *log; 
    byte *bitv; 
} Thread; 

static inline byte 
btvset(byte *bitv,long off) 
{ 
    u32 msk; 
    byte oval; 

    bitv += BTVOFF(off); 
    msk = BTVMSK(off); 

    oval = *bitv & msk; 

    *bitv |= msk; 

    return oval; 
} 

AnyLock task_mutex; 
AnyLock print_mutex; 
double tvzero; 
Cell *cellpool;       // free pool of cells 
long bitvlen; 

#define BARRIER \ 
    __asm__ __volatile__("" ::: "memory") 

// virtual function pointers 
Cell *(*cellnew)(void); 
void (*cellfree)(Cell *); 
void (*lock_acquire)(AnyLock *lock); 
void (*lock_release)(AnyLock *lock); 

double 
tvgetf(void) 
{ 
    struct timespec ts; 
    double sec; 

    clock_gettime(CLOCK_REALTIME,&ts); 
    sec = ts.tv_nsec; 
    sec /= 1e9; 
    sec += ts.tv_sec; 

    sec -= tvzero; 

    return sec; 
} 

void * 
xalloc(size_t cnt,size_t siz) 
{ 
    void *ptr; 

    ptr = calloc(cnt,siz); 
    if (ptr == NULL) 
     sysfault("xalloc: calloc failure -- %s\n",strerror(errno)); 

    return ptr; 
} 

void 
lock_wait_ticket(AnyLock *lock,u32 newval) 
{ 
    u32 oldval; 

    // wait for our ticket to come up 
    // NOTE: atomic_load is [probably] overkill here 
    while (1) { 
#if 0 
     oldval = atomic_load(&lock->seqacq); 
#else 
     oldval = lock->seqacq; 
#endif 
     if (oldval == newval) 
      break; 
    } 
} 

void 
lock_acquire_ticket(AnyLock *lock) 
{ 
    u32 oldval; 
    u32 newval; 
    int ok; 

    // acquire our ticket value 
    // NOTE: just use a garbage value for oldval -- the exchange will 
    // update it with the correct/latest value -- this saves a separate 
    // refetch within the loop 
    oldval = 0; 
    while (1) { 
#if 0 
     BARRIER; 
     oldval = lock->seqreq; 
#endif 
     newval = oldval + 1; 
     ok = atomic_compare_exchange_strong(&lock->seqreq,&oldval,newval); 
     if (ok) 
      break; 
    } 

    lock_wait_ticket(lock,newval); 
} 

void 
lock_release_ticket(AnyLock *lock) 
{ 

    // NOTE: atomic_fetch_add is [probably] overkill, but leave it for now 
#if 1 
    atomic_fetch_add(&lock->seqacq,1); 
#else 
    lock->seqacq += 1; 
#endif 
} 

void 
lock_acquire_mutex(AnyLock *lock) 
{ 

    pthread_mutex_lock(&lock->mutex); 
} 

void 
lock_release_mutex(AnyLock *lock) 
{ 

    pthread_mutex_unlock(&lock->mutex); 
} 

void 
lock_init(AnyLock *lock) 
{ 

    switch (opt_L) { 
    case 1: 
     lock->seqreq = 0; 
     lock->seqacq = 1; 
     lock_acquire = lock_acquire_ticket; 
     lock_release = lock_release_ticket; 
     break; 

    default: 
     pthread_mutex_init(&lock->mutex,NULL); 
     lock_acquire = lock_acquire_mutex; 
     lock_release = lock_release_mutex; 
     break; 
    } 
} 

void 
startQueue(TQueue *queue) 
{ 

    queue->head = NULL; 
    queue->tail = NULL; 
} 

int 
empty(TQueue *queue) 
{ 

    return (queue->head == NULL); 
} 

// cellnew_pool -- allocate a queue entry 
Cell * 
cellnew_pool(void) 
{ 
    int cnt; 
    Cell *p; 
    Cell *pool; 

    while (1) { 
     // try for quick allocation 
     p = cellpool; 

     // bug out if we got it 
     if (p != NULL) { 
      cellpool = p->next; 
      break; 
     } 

     // go to the heap to replenish the pool 
     cnt = 1000; 
     p = xalloc(cnt,sizeof(Cell)); 

     // link up the entries 
     pool = NULL; 
     for (; cnt > 0; --cnt, ++p) { 
      p->next = pool; 
      pool = p; 
     } 

     // put this "online" 
     cellpool = pool; 
    } 

    return p; 
} 

// cellfree_pool -- release a queue entry 
void 
cellfree_pool(Cell *p) 
{ 

    p->next = cellpool; 
    cellpool = p; 
} 

// cellnew_std -- allocate a queue entry 
Cell * 
cellnew_std(void) 
{ 
    Cell *p; 

    p = xalloc(1,sizeof(Cell)); 

    return p; 
} 

// cellfree_std -- release a queue entry 
void 
cellfree_std(Cell *p) 
{ 

    free(p); 
} 

void 
enqueue(TQueue *queue, Task *t) 
{ 
    Cell *p; 

    lock_acquire(&task_mutex); 

    p = cellnew(); 
    p->next = NULL; 
    p->t = t; 

    if (queue->tail == NULL) { 
     queue->tail = p; 
     queue->head = p; 
    } 
    else { 
     queue->tail->next = p; 
     queue->tail = p; 
    } 

    lock_release(&task_mutex); 
} 

Task * 
dequeue(TQueue *queue) 
{ 
    Task *t; 

    lock_acquire(&task_mutex); 

    if (empty(queue)) 
     t = NULL; 

    else { 
     Cell *p = queue->head; 

     if (p == queue->tail) 
      queue->tail = NULL; 

     queue->head = p->next; 

     t = p->t; 

     cellfree(p); 
    } 

    lock_release(&task_mutex); 

    return t; 
} 

void * 
work(void *arg) 
{ 
    Thread *tskcur = arg; 
    TQueue *queue = tskcur->queue; 
    Task *t; 
    Log *log; 
    long cnt; 
    int tprev; 
    byte *bitv; 
    double tvbeg; 
    double tvbef; 
    double tvaft; 

    log = tskcur->log; 
    bitv = tskcur->bitv; 
    tvbeg = tvgetf(); 

    tprev = 0; 
    while (1) { 
     tvbef = tvgetf(); 
     t = dequeue(queue); 
     tvaft = tvgetf(); 

     if (t == NULL) 
      break; 

     // abort if we get a double entry 
     if (btvset(bitv,t->number)) 
      sysfault("work: duplicate\n"); 

     if (opt_p) { 
      printf("[%.9f/%.9f %5.5d] %d [%d]\n", 
       tvbef,tvaft - tvbef,tskcur->xid,t->number,t->number - tprev); 
      tprev = t->number; 
      continue; 
     } 

     log->tvbef = tvbef; 
     log->tvaft = tvaft; 
     log->number = t->number; 
     ++log; 
    } 

    if (! opt_p) { 
     tvaft = tvgetf(); 

     cnt = log - tskcur->log; 
     log = tskcur->log; 

     lock_acquire(&print_mutex); 

     printf("\n"); 
     printf("THREAD=%5.5d START=%.9f STOP=%.9f ELAP=%.9f TOTAL=%ld\n", 
      tskcur->xid,tvbeg,tvaft,tvaft - tvbeg,cnt); 

     tprev = 0; 
     for (; cnt > 0; --cnt, ++log) { 
      printf("[%.9f/%.9f %5.5d] %d [%d]\n", 
       log->tvbef,log->tvaft - log->tvbef,tskcur->xid, 
       log->number,log->number - tprev); 
      tprev = log->number; 
     } 

     lock_release(&print_mutex); 
    } 

    return (void *) 0; 
} 

void 
btvchk(Thread *tska,Thread *tskb) 
{ 
    byte *btva; 
    byte *btvb; 
    byte aval; 
    byte bval; 
    int idx; 

    printf("btvchk: %d ??? %d\n",tska->xid,tskb->xid); 

    btva = tska->bitv; 
    btvb = tskb->bitv; 

    // abort if we get overlapping entries between two threads 
    for (idx = 0; idx < bitvlen; ++idx) { 
     aval = btva[idx]; 
     bval = btvb[idx]; 
     if (aval & bval) 
      sysfault("btvchk: duplicate\n"); 
    } 
} 

// For a simple test i runned this on main: 

int 
main(int argc,char **argv) 
{ 
    char *cp; 
    TQueue *queue; 
    Task *t; 
    Thread *tsk; 

    --argc; 
    ++argv; 

    for (; argc > 0; --argc, ++argv) { 
     cp = *argv; 
     if (*cp != '-') 
      break; 

     switch (cp[1]) { 
     case 'p': // print immediately 
      opt_p = 1; 
      break; 

     case 'Q': // number of queue items 
      opt_Q = atoi(cp + 2); 
      break; 

     case 'T': // number of threads 
      opt_T = atoi(cp + 2); 
      break; 

     case 'L': 
      opt_L = 1; 
      break; 

     case 'M': 
      opt_M = 1; 
      break; 

     default: 
      break; 
     } 
    } 

    printf("p=%d -- thread log is %s\n",opt_p,opt_p ? "immediate" : "deferred"); 

    if (opt_T == 0) 
     opt_T = 16; 
    printf("T=%d (number of threads)\n",opt_T); 

    if (opt_Q == 0) 
     opt_Q = 1000000; 
    printf("Q=%d (number of items to enqueue)\n",opt_Q); 

    printf("L=%d -- lock is %s\n",opt_L,opt_L ? "ticket" : "mutex"); 
    printf("M=%d -- queue item allocation is %s\n", 
     opt_M,opt_M ? "pooled" : "malloc/free"); 

    tvzero = tvgetf(); 

    lock_init(&task_mutex); 
    lock_init(&print_mutex); 

    // select queue item allocation strategy 
    switch (opt_M) { 
    case 1: 
     cellnew = cellnew_pool; 
     cellfree = cellfree_pool; 
     break; 

    default: 
     cellnew = cellnew_std; 
     cellfree = cellfree_std; 
     break; 
    } 

    queue = xalloc(1,sizeof(TQueue)); 
    startQueue(queue); 

    Thread threads[opt_T]; 

    // get byte length of bit vectors 
    bitvlen = BTVLEN(opt_Q + 1); 

    // allocate per-thread log buffers 
    for (int i = 0; i < opt_T; i++) { 
     tsk = &threads[i]; 
     if (! opt_p) 
      tsk->log = xalloc(opt_Q,sizeof(Log)); 
     tsk->bitv = xalloc(bitvlen,sizeof(byte)); 
    } 

    // allocate "work to do" 
    t = xalloc(opt_Q,sizeof(Task)); 

    // add to master queue 
    for (int i = 0; i < opt_Q; i++) { 
     t[i].number = i + 1; 
     enqueue(queue, &t[i]); 
    } 

    // fire up the threads 
    for (int i = 0; i < opt_T; i++) { 
     tsk = &threads[i]; 
     tsk->xid = i + 1; 
     tsk->queue = queue; 
     pthread_create(&tsk->tid, NULL, work, tsk); 
    } 

    // wait for threads to complete 
    for (int i = 0; i < opt_T; i++) { 
     tsk = &threads[i]; 
     pthread_join(tsk->tid, NULL); 
    } 

    // wait for threads to complete 
    for (int i = 0; i < opt_T; i++) { 
     for (int j = i + 1; j < opt_T; j++) 
      btvchk(&threads[i],&threads[j]); 
    } 

    printf("TOTAL: %.9f\n",tvgetf()); 

    free(t); 

    return 0; 
} 
+0

ありがとうございました!もう1つの質問:タスクを同時に実行しているスレッドですか?私はhtopでCPU使用量をテストしてきました.4つのコアの中から1つのコアの使用率を最大にすることができます。 –

+0

とてもいいです!再度、感謝します。 "あなたが言ったように:"また、あるスレッド(つまり、最初のスレッド)はキューを独占し、他のスレッドが実行する前にすべてのエントリを排除することができます。その場合、何ができるでしょうか? –

+0

その場合は「技術的に」「スレッドスターベーション」と呼ばれます。簡単な修正は、作業スレッドのループの最下部に小さな 'nanosleep'呼び出しを追加することです。プログラムオプションの仕方を理解しているなら、 '-S'オプションと呼び出しを追加してみてください。一般的なソリューションでは、公平性を保証する_elaborate_アルゴリズムがあります。私が「改造した」もう一つの単純なものがあります。これは「チケット」ロックに基づいています。これは「十分に良い」かもしれません。これらはmutexを 'stdatomic.h'の' atomic_compare_exchange * 'に置き換えます。私はこの上にいくつかのURLを掘り下げ、後で私の答えを更新する –

関連する問題