2016-12-13 4 views
0

私は最近、有界ロックフリーキューを作成し、いくつかのテストを行っていました。テストでは、いくつかのスレッドは素数を生成します(いくつかの数から始まり、プロデューサスレッドの数の6倍をカウントし、Deterministic Miller Rabinテストを使用してすべての数値をチェックし、素数をキューに挿入します)素数を消費する(キューから要素を削除し、素数であるかどうかをチェックする)。プロデューサスレッドは、各ペアに1つずつ、1つのmod 6に等しい素数を生成し、もう1つは5 mod 6に等しい(2、3、または4 mod 6に等しいすべての数は、2および3)では、メインスレッドは2と3を生成します。生成されていないスレッドの数はグローバルカウンタがあります。プロデューサスレッドまたはメインが素数を生成するたびに、このカウンタが原子的に減少します。消費者スレッドはループしていないときにループする。プロデューサスレッドが完了する前にコンシューマスレッドが停止するのはなぜですか?

実際には素数がキューを通過するかどうかを判断するために、すべてのスレッドが生成して消費する素数の0番目から3番目の瞬間を計算し、プロデューサスレッドのモーメントの合計は、コンシューマスレッドのモーメントの合計に等しい。 n番目の瞬間はn乗の和です。したがって、これは素数の数、それらの和、それらの平方和、それらの立方体の合計、すべての一致を意味します。シーケンスが互いの順列である場合、すべての瞬間が一致するので、最初のnをチェックして、長さnのシーケンスが実際に順列であることを確認する必要がありますが、最初の4つのマッチングは、一致しないシーケンスの確率が非常に小さいことを意味します。

私のロックフリーキューは実際には動作しますが、何らかの理由でコンシューマスレッドがキューにまだ要素がある間にすべて停止します。プロデューサスレッドは、すべての素数をキューに挿入した後、プロデューサスレッドが生成カウンタを減分するだけで、生成するすべてのスレッドが減少した後は、生成カウンタが0にしかならないため、理由はわかりません。したがって、生成カウンタが0のときは常に、すべての要素がキューに挿入されています。しかし、消費者が要素を削除しようとすると、queue.full(キュー内の要素の数)が0の場合にremoveが失敗するだけで成功するはずです。したがって、生成カウンタが0の場合、消費者は正常に消費することができます。 queue.fullは0であり、生成カウンタをチェックしてキューが使い果たされるまで戻るべきではありません。削除が失敗した場合(消費者がプロデューサよりも速く、キューを空にしている場合)にのみ、生成カウンタをチェックします。

しかし、whileループを作成すると、生成カウンタに加えてcheck queue.fullが削除されるため、消費者は早期に返されません。私はそれだけで動作します

__atomic_load_n(&producing, __ATOMIC_SEQ_CST) || __atomic_load_n(&Q.full, __ATOMIC_SEQ_CST) 

__atomic_load_n(&producing, __ATOMIC_SEQ_CST) 

を変更する場合には、あります。私のコードでは、属性、__atomic組み込み関数、__auto_type、文式、128ビット整数、__builtin_ctzll、 '\ e'などのgcc拡張機能、指定された初期化子や複合リテラルなどのC99機能、pthreadsなどのgcc拡張機能を使用しています。私はまた、私がこの問題を抱えている間に問題が発生しないようにするために、弱いバージョンが動作するはずですが、シーケンシャルな一貫性のあるメモリ順序と強力な比較とスワップを使用しています。ここでは、ヘッダqueue.hは次のとおりです。ここで

#ifndef __QUEUE_H__ 
#define __QUEUE_H__ 

#include <stddef.h> 
#include <inttypes.h> 

typedef struct __attribute__((__designated_init__)){//using positional initializers for a struct is terrible 
    void *buf; 
    uint8_t *flags;//insert started, insert complete, remove started 
    size_t cap, full; 
    uint64_t a, b; 
} queue_t; 

typedef struct __attribute__((__designated_init__)){ 
    size_t size; 
} queue_ft;//this struct serves as a class for queue objects: any data specific to the object goes in the queue_t struct and any shared data goes here 

int queue_insert(queue_t*, const queue_ft*, void *elem); 

int queue_remove(queue_t*, const queue_ft*, void *out); 

int queue_init(queue_t*, const queue_ft*, size_t reserve); 

void queue_destroy(queue_t*, const queue_ft*); 

#endif 

は、ライブラリのソースqueue.cです:

#include <stdio.h> 
#include <stdlib.h> 
#include <string.h> 
#include <inttypes.h> 
#include "queue.h" 

int queue_insert(queue_t *self, const queue_ft *ft, void *elem){ 
    uint64_t i; 
    while(1){ 
     uint8_t flag = 0; 
     if(__atomic_load_n(&self->full, __ATOMIC_SEQ_CST) == self->cap){ 
      return 0; 
     } 
     i = __atomic_load_n(&self->b, __ATOMIC_SEQ_CST); 
     if(__atomic_compare_exchange_n(self->flags + i, &flag, 0x80, 0, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)){//set the insert started flag if all flags are clear 
      break; 
     } 
    } 
    __atomic_fetch_add(&self->full, 1, __ATOMIC_SEQ_CST); 
    uint64_t b = i; 
    while(!__atomic_compare_exchange_n(&self->b, &b, (b + 1)%self->cap, 0, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST));//increase the b endpoint of the queue with wraparaound 
    memcpy(self->buf + i*ft->size, elem, ft->size);//actually insert the item. accesses to the buffer mirror accesses to the flags so this is safe 
    __atomic_thread_fence(memory_order_seq_cst); 
    __atomic_store_n(self->flags + i, 0xc0, __ATOMIC_SEQ_CST);//set the insert completed flag 
    return 1; 
} 

int queue_remove(queue_t *self, const queue_ft *ft, void *out){ 
    uint64_t i; 
    while(1){ 
     uint8_t flag = 0xc0; 
     if(!__atomic_load_n(&self->full, __ATOMIC_SEQ_CST)){ 
      return 0; 
     } 
     i = __atomic_load_n(&self->a, __ATOMIC_SEQ_CST); 
     if(__atomic_compare_exchange_n(self->flags + i, &flag, 0xe0, 0, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)){//set the remove started flag if insert started and insert completed are set but the other flags are clear 
      break; 
     } 
    } 
    __atomic_fetch_sub(&self->full, 1, __ATOMIC_SEQ_CST); 
    uint64_t a = i; 
    while(!__atomic_compare_exchange_n(&self->a, &a, (a + 1)%self->cap, 0, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST));//increase the a endpoint of the queue with wraparound 
    memcpy(out, self->buf + i*ft->size, ft->size);//actually remove the item. 
    __atomic_thread_fence(__ATOMIC_SEQ_CST); 
    __atomic_store_n(self->flags + i, 0x00, __ATOMIC_SEQ_CST);//clear all the flags to mark the remove as completed 
    return 1; 
} 

int queue_init(queue_t *self, const queue_ft *ft, size_t reserve){ 
    void *buf = malloc(reserve*ft->size); 
    if(!buf){ 
     return 0; 
    } 
    uint8_t *flags = calloc(reserve, sizeof(uint8_t)); 
    if(!flags){ 
     free(buf); 
     return 0; 
    } 
    *self = (queue_t){ 
     .buf=buf, 
     .flags=flags, 
     .cap=reserve, 
     .full=0, 
     .a=0,.b=0 
    }; 
    return 1; 
} 

void queue_destroy(queue_t *self, const queue_ft *ft){ 
    free(self->buf); 
    free(self->flags); 
} 

そしてここでは、テストプログラムのソースtest_queue_pcです。C:消費者のスレッドがキューが空になる前に、ちょうどproducingのときにループ、プロデューサーが行われるため、彼らは待っていても返しますが、なぜ

#define _POSIX_C_SOURCE 201612UL 

#include <stdio.h> 
#include <stdlib.h> 
#include <string.h> 
#include <inttypes.h> 
#include <pthread.h> 
#include <math.h> 
#include <time.h> 
#include "queue.h" 

//Generate primes up to this number. Note 78498 is the number of primes below 1000000; this is hard coded because the queue does not support growing yet. 
#define MAX 1000000 
#define QUEUE_SIZE 78498 
#define NUM_PRODUCER_PAIRS 3 
#define NUM_CONSUMERS 2 
//Every producer and consumer thread calculates the 0th through 3rd moments of the sequence of primes it sees, as well as testing them for primality. 
//The nth moment is the sum of the nth powers, thus, the order does not matter and if the primes are the same in both the producers and the consumers 
//then the sums of the moments will also be the same. I check that the 0th through 3rd moments match which means it is nearly certain the primes go through 
//the queue. 
#define NUM_MOMENTS 4 

//Deterministic Miller Rabin witnesses (see https://en.wikipedia.org/wiki/Miller–Rabin_primality_test) 
#define DMR_PRIMES (uint64_t[]){2, 13, 23, 1662803} 
#define DMR_PRIMES_C 4 

//Macro to split an integer into three parts. The first part has the 2**0, 2**3, 2**6, ..., 2**60 bits of the original and 0 elsewhere. 
//The second part has the 2**1, 2**4, 2**7, ..., 2**61 bits of the original and 0 elsewhere. The last part has the 2**2, ..., 2**62 bits. 
//The 2**63 bit is lost. The masks represent the sums of geometric sequences. The original number can be obtained by bitwise or or xor on the parts. 
//I spread the uint64_t's (which are unsigned long longs) over 3 uint64_t's so that they take up 24 bytes and memcpy'ing them happens in multiple steps. 
//This spreading is only done on primes that have been produced before they are put into the queue. The consumers then recombine and verify them. 
#define SPREAD_EMPLACE(n) ({__auto_type _n = (n); &(spread_integer){(_n)&(((1ULL<<60)-1)/7), (_n)&(((1ULL<<61)-2)/7), (_n)&(((1ULL<<62)-4)/7)};}) 

typedef struct{ 
    uint64_t x, y, z; 
} spread_integer; 

queue_ft spread_integer_ft = {.size= sizeof(spread_integer)}; 

queue_t Q; 
//Start producing count at 1 + (NUM_PRODUCING_THREADS << 1) because main generates 2 and 3 and reduce it by 1 every time a producer thread finishes 
int producing = 1 + (NUM_PRODUCER_PAIRS << 1); 

//Uses the binary algorithm for modular exponentiation (https://en.wikipedia.org/wiki/Exponentiation_by_squaring) 
//It is a helper function for isPrime 
uint64_t powmod(unsigned __int128 b, uint64_t e, uint64_t n){ 
    unsigned __int128 r = 1; 
    b %= n; 
    while(e){ 
     if(e&1){ 
      r = r*b%n; 
     } 
     e >>= 1; 
     b = b*b%n; 
    } 
    return (uint64_t)r; 
} 

//uses deterministic Miller Rabin primality test 
int isPrime(uint64_t n){ 
    uint64_t s, d;//s, d | 2^s*d = n - 1 
    if(n%2 == 0){ 
     return n == 2; 
    } 
    --n; 
    s = __builtin_ctzll(n); 
    d = n>>s; 
    ++n; 
    for(uint64_t i = 0, a, x; i < DMR_PRIMES_C; ++i){ 
     a = DMR_PRIMES[i]; 
     if(a >= n){ 
      break; 
     } 
     x = powmod(a, d, n); 
     if(x == 1 || x == n - 1){ 
      goto CONTINUE_WITNESSLOOP; 
     } 
     for(a = 0; a < s - 1; ++a){ 
      x = powmod(x, 2, n); 
      if(x == 1){ 
       return 0; 
      } 
      if(x == n - 1){ 
       goto CONTINUE_WITNESSLOOP; 
      } 
     } 
     return 0; 
     CONTINUE_WITNESSLOOP:; 
    } 
    return 1; 
} 

void *produce(void *_moments){ 
    uint64_t *moments = _moments, n = *moments;//the output argument for the 0th moment serves as the input argument for the number to start checking for primes at 
    *moments = 0; 
    for(; n < MAX; n += 6*NUM_PRODUCER_PAIRS){//the producers are paired so one of every pair generates primes equal to 1 mod 6 and the other equal to 5 mod 6. main generates 2 and 3 the only exceptions 
     if(isPrime(n)){ 
      for(uint64_t m = 1, i = 0; i < NUM_MOMENTS; m *= n, ++i){ 
       moments[i] += m; 
      } 
      if(!queue_insert(&Q, &spread_integer_ft, SPREAD_EMPLACE(n))){ 
       fprintf(stderr, "\e[1;31mERROR: Could not insert into queue.\e[0m\n"); 
       exit(EXIT_FAILURE); 
      } 
     } 
    } 
    __atomic_fetch_sub(&producing, 1, __ATOMIC_SEQ_CST);//this thread is done generating primes; reduce producing counter by 1 
    return moments; 
} 

void *consume(void *_moments){ 
    uint64_t *moments = _moments; 
    while(__atomic_load_n(&producing, __ATOMIC_SEQ_CST) || __atomic_load_n(&Q.full, __ATOMIC_SEQ_CST)){//busy loop while some threads are producing 
     spread_integer xyz; 
     if(queue_remove(&Q, &spread_integer_ft, &xyz)){ 
      uint64_t n = xyz.x | xyz.y | xyz.z; 
      if(isPrime(n)){ 
       for(uint64_t m = 1, i = 0; i < NUM_MOMENTS; m *= n, ++i){ 
        moments[i] += m; 
       } 
      }else{ 
       fprintf(stderr, "\e[1;31mERROR: Generated a prime that fails deterministic Miller Rabin.\e[0m\n"); 
       exit(EXIT_FAILURE); 
      } 
     } 
    } 
    return moments; 
} 

int main(void){ 
    if(!queue_init(&Q, &spread_integer_ft, QUEUE_SIZE)){ 
     fprintf(stderr, "\e[1;31mERROR: Could not initialize queue.\e[0m\n"); 
     exit(EXIT_FAILURE); 
    } 
    pthread_t producers[NUM_PRODUCER_PAIRS << 1], consumers[NUM_CONSUMERS]; 
    uint64_t moments[(NUM_PRODUCER_PAIRS << 1) + 1 + NUM_CONSUMERS + 1][NUM_MOMENTS] = {};//the 2 extras are because main produces the primes 2 and 3 and consumes primes the consumers leave behind 
    for(size_t i = 0; i < NUM_CONSUMERS; ++i){//create consumers first to increase likelihood of causing bugs 
     if(pthread_create(consumers + i, NULL, consume, moments[(NUM_PRODUCER_PAIRS << 1) + 1 + i])){ 
      fprintf(stderr, "\e[1;31mERROR: Could not create consumer thread.\e[0m\n"); 
      exit(EXIT_FAILURE); 
     } 
    } 
    for(size_t i = 0; i < NUM_PRODUCER_PAIRS; ++i){ 
     moments[i << 1][0] = 5 + 6*i; 
     if(pthread_create(producers + (i << 1), NULL, produce, moments[i << 1])){ 
      fprintf(stderr, "\e[1;31mERROR: Could not create producer thread.\e[0m\n"); 
      exit(EXIT_FAILURE); 
     } 
     moments[(i << 1) + 1][0] = 7 + 6*i; 
     if(pthread_create(producers + (i << 1) + 1, NULL, produce, moments[(i << 1) + 1])){ 
      fprintf(stderr, "\e[1;31mERROR: Could not create producer thread.\e[0m\n"); 
      exit(EXIT_FAILURE); 
     } 
    } 
    for(uint64_t n = 2; n < 4; ++n){ 
     for(uint64_t m = 1, i = 0; i < NUM_MOMENTS; m *= n, ++i){ 
      moments[NUM_PRODUCER_PAIRS << 1][i] += m; 
     } 
     if(!queue_insert(&Q, &spread_integer_ft, SPREAD_EMPLACE(n))){ 
      fprintf(stderr, "\e[1;31mERROR: Could not insert into queue.\e[0m\n"); 
      exit(EXIT_FAILURE); 
     } 
    } 
    __atomic_fetch_sub(&producing, 1, __ATOMIC_SEQ_CST); 
    uint64_t c = 0; 
    for(size_t i = 0; i < NUM_CONSUMERS; ++i){//join consumers first to bait bugs. Note consumers should not finish until the producing counter reaches 0 
     void *_c; 
     if(pthread_join(consumers[i], &_c)){ 
      fprintf(stderr, "\e[1;31mERROR: Could not join consumer thread.\e[0m\n"); 
      exit(EXIT_FAILURE); 
     } 
     c += (uintptr_t)_c; 
    } 
    for(size_t i = 0; i < NUM_PRODUCER_PAIRS << 1; ++i){ 
     if(pthread_join(producers[i], NULL)){ 
      fprintf(stderr, "\e[1;31mERROR: Could not join producer thread.\e[0m\n"); 
      exit(EXIT_FAILURE); 
     } 
    } 
    //this really should not be happening because the consumer threads only return after the producing counter reaches 0, 
    //which only happens after all of the producer threads are done inserting items into the queue. 
    if(Q.full){ 
     fprintf(stdout, "\e[1;31mWTF: Q.full != 0\nproducing == %d\e[0m\n", producing); 
    } 
    while(Q.full){ 
     spread_integer xyz; 
     if(!queue_remove(&Q, &spread_integer_ft, &xyz)){ 
      fprintf(stderr, "\e[1;31mERROR: Could not remove from non empty queue.\e[0m\n"); 
      exit(EXIT_FAILURE); 
     } 
     uint64_t n = xyz.x | xyz.y | xyz.z; 
     if(isPrime(n)){ 
      for(uint64_t m = 1, i = 0; i < NUM_MOMENTS; m *= n, ++i){ 
       moments[(NUM_PRODUCER_PAIRS << 1) + 1 + NUM_CONSUMERS][i] += m; 
      } 
     }else{ 
      fprintf(stderr, "\e[1;31mERROR: Generated a prime that fails deterministic Miller Rabin.\e[0m\n"); 
      exit(EXIT_FAILURE); 
     } 
    } 
    queue_destroy(&Q, &spread_integer_ft); 
    for(uint64_t i = 0, p, c, j; i < NUM_MOMENTS; ++i){ 
     for(j = p = 0; j < (NUM_PRODUCER_PAIRS << 1) + 1; ++j){ 
      p += moments[j][i]; 
     } 
     for(c = 0; j < (NUM_PRODUCER_PAIRS << 1) + 1 + NUM_CONSUMERS + 1; ++j){ 
      c += moments[j][i]; 
     } 
     printf("Moment %"PRIu64" %"PRIu64" -> %"PRIu64"\n", i, p, c); 
    } 
} 

私はその後

gcc -o test_queue_pc queue.c test_queue_pc.c -Wall -std=c99 -g -O0 -pthread -fuse-ld=gold -flto -lm 

でコンパイルします彼らがループするときに正しいことをしてくださいproducing || Q.full

+0

"構造体の位置初期化子を使用するのはひどいです"ので、gcc固有の機能を使用する方が良いですか? – Stargateur

+0

gcc固有の機能を使用して位置イニシャライザの使用に関する警告を強制するのは、単に位置イニシャライザを使用するよりもずっと悪くなりますが、指定されたイニシャライザはgcc固有の機能ではないC99機能です。私はこの属性を追加しました。なぜなら、私が使っているプログラムの中の私の構造体のいくつかはすでにパックされていて、とにかく組み込み関数を使う必要があるからです。 – hacatu

答えて

3

は、なぜ消費者のスレッドは、キューが空であるプロデューサーが行われるため、彼らは待っていても、ときだけ生産上の彼らはループの前に返しますが、生産上のときにループ正しいことを行うのですか|| Q.フルですか?

これ以上のプロデューサがないということは、新しいエントリがキューに追加されないことを意味するためです。それはではありませんキューが既に空であることを意味します。

生産者が消費者よりも速い場合を考えてみましょう。彼らは自分のものをキューに追加して終了します。この時点では、キューに項目がありますが、アクティブな生産者数はゼロです。コンシューマがアクティブなプロデューサが存在するかどうかだけを確認する場合、既にキューに入っているアイテムが欠落します。


チェック

if ((active producers) || (items in queue)) 

はここで、C99で正しいものであることに注意することが重要です。

アクティブなプロデューサのみをチェックすると、プロデューサが存在するケースが見つからない場合があります消費者よりも速く、キューに項目が残っている間に終了します。

キュー内の項目のみをチェックすると、プロデューサがまだキューに項目を追加していない場合があります。

キューが最初に空であるかどうかをチェックすると、レースウィンドウが開きます。消費者がキューが空であるかどうかをチェックした後、消費者がアクティブなプロデューサがあるかどうかをチェックする前に、プロデューサは1つまたは複数のアイテムをキューに追加して終了することができる。

アクティブなプロデューサが最初に存在するかどうかを確認する必要があります。アクティブなプロデューサが存在し、現在キューが空の場合、消費者は新しいアイテムがキューに到着するのを待つ必要があります(アクティブなプロデューサの数がゼロになるか、新しいアイテムがキューに到着するまで)。プロデューサの場合は、キューに項目があるかどうかを確認する必要があります。アクティブなプロデューサはキューに新しいアイテムが表示されないことを意味しますが、キューがすでに空であることを意味するわけではありません。

+0

ありがとうございました。フルチェック時の競合状態||なぜ消費者のdo whileループを使用しても動作しないのはなぜですか?これは、完全な||遅延が追加されています。 – hacatu

関連する問題