私は最近、有界ロックフリーキューを作成し、いくつかのテストを行っていました。テストでは、いくつかのスレッドは素数を生成します(いくつかの数から始まり、プロデューサスレッドの数の6倍をカウントし、Deterministic Miller Rabinテストを使用してすべての数値をチェックし、素数をキューに挿入します)素数を消費する(キューから要素を削除し、素数であるかどうかをチェックする)。プロデューサスレッドは、各ペアに1つずつ、1つのmod 6に等しい素数を生成し、もう1つは5 mod 6に等しい(2、3、または4 mod 6に等しいすべての数は、2および3)では、メインスレッドは2と3を生成します。生成されていないスレッドの数はグローバルカウンタがあります。プロデューサスレッドまたはメインが素数を生成するたびに、このカウンタが原子的に減少します。消費者スレッドはループしていないときにループする。プロデューサスレッドが完了する前にコンシューマスレッドが停止するのはなぜですか?
実際には素数がキューを通過するかどうかを判断するために、すべてのスレッドが生成して消費する素数の0番目から3番目の瞬間を計算し、プロデューサスレッドのモーメントの合計は、コンシューマスレッドのモーメントの合計に等しい。 n番目の瞬間はn乗の和です。したがって、これは素数の数、それらの和、それらの平方和、それらの立方体の合計、すべての一致を意味します。シーケンスが互いの順列である場合、すべての瞬間が一致するので、最初のnをチェックして、長さnのシーケンスが実際に順列であることを確認する必要がありますが、最初の4つのマッチングは、一致しないシーケンスの確率が非常に小さいことを意味します。
私のロックフリーキューは実際には動作しますが、何らかの理由でコンシューマスレッドがキューにまだ要素がある間にすべて停止します。プロデューサスレッドは、すべての素数をキューに挿入した後、プロデューサスレッドが生成カウンタを減分するだけで、生成するすべてのスレッドが減少した後は、生成カウンタが0にしかならないため、理由はわかりません。したがって、生成カウンタが0のときは常に、すべての要素がキューに挿入されています。しかし、消費者が要素を削除しようとすると、queue.full(キュー内の要素の数)が0の場合にremoveが失敗するだけで成功するはずです。したがって、生成カウンタが0の場合、消費者は正常に消費することができます。 queue.fullは0であり、生成カウンタをチェックしてキューが使い果たされるまで戻るべきではありません。削除が失敗した場合(消費者がプロデューサよりも速く、キューを空にしている場合)にのみ、生成カウンタをチェックします。
しかし、whileループを作成すると、生成カウンタに加えてcheck queue.fullが削除されるため、消費者は早期に返されません。私はそれだけで動作します
__atomic_load_n(&producing, __ATOMIC_SEQ_CST) || __atomic_load_n(&Q.full, __ATOMIC_SEQ_CST)
に
__atomic_load_n(&producing, __ATOMIC_SEQ_CST)
を変更する場合には、あります。私のコードでは、属性、__atomic組み込み関数、__auto_type、文式、128ビット整数、__builtin_ctzll、 '\ e'などのgcc拡張機能、指定された初期化子や複合リテラルなどのC99機能、pthreadsなどのgcc拡張機能を使用しています。私はまた、私がこの問題を抱えている間に問題が発生しないようにするために、弱いバージョンが動作するはずですが、シーケンシャルな一貫性のあるメモリ順序と強力な比較とスワップを使用しています。ここでは、ヘッダqueue.hは次のとおりです。ここで
#ifndef __QUEUE_H__
#define __QUEUE_H__
#include <stddef.h>
#include <inttypes.h>
typedef struct __attribute__((__designated_init__)){//using positional initializers for a struct is terrible
void *buf;
uint8_t *flags;//insert started, insert complete, remove started
size_t cap, full;
uint64_t a, b;
} queue_t;
typedef struct __attribute__((__designated_init__)){
size_t size;
} queue_ft;//this struct serves as a class for queue objects: any data specific to the object goes in the queue_t struct and any shared data goes here
int queue_insert(queue_t*, const queue_ft*, void *elem);
int queue_remove(queue_t*, const queue_ft*, void *out);
int queue_init(queue_t*, const queue_ft*, size_t reserve);
void queue_destroy(queue_t*, const queue_ft*);
#endif
は、ライブラリのソースqueue.cです:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <inttypes.h>
#include "queue.h"
int queue_insert(queue_t *self, const queue_ft *ft, void *elem){
uint64_t i;
while(1){
uint8_t flag = 0;
if(__atomic_load_n(&self->full, __ATOMIC_SEQ_CST) == self->cap){
return 0;
}
i = __atomic_load_n(&self->b, __ATOMIC_SEQ_CST);
if(__atomic_compare_exchange_n(self->flags + i, &flag, 0x80, 0, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)){//set the insert started flag if all flags are clear
break;
}
}
__atomic_fetch_add(&self->full, 1, __ATOMIC_SEQ_CST);
uint64_t b = i;
while(!__atomic_compare_exchange_n(&self->b, &b, (b + 1)%self->cap, 0, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST));//increase the b endpoint of the queue with wraparaound
memcpy(self->buf + i*ft->size, elem, ft->size);//actually insert the item. accesses to the buffer mirror accesses to the flags so this is safe
__atomic_thread_fence(memory_order_seq_cst);
__atomic_store_n(self->flags + i, 0xc0, __ATOMIC_SEQ_CST);//set the insert completed flag
return 1;
}
int queue_remove(queue_t *self, const queue_ft *ft, void *out){
uint64_t i;
while(1){
uint8_t flag = 0xc0;
if(!__atomic_load_n(&self->full, __ATOMIC_SEQ_CST)){
return 0;
}
i = __atomic_load_n(&self->a, __ATOMIC_SEQ_CST);
if(__atomic_compare_exchange_n(self->flags + i, &flag, 0xe0, 0, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)){//set the remove started flag if insert started and insert completed are set but the other flags are clear
break;
}
}
__atomic_fetch_sub(&self->full, 1, __ATOMIC_SEQ_CST);
uint64_t a = i;
while(!__atomic_compare_exchange_n(&self->a, &a, (a + 1)%self->cap, 0, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST));//increase the a endpoint of the queue with wraparound
memcpy(out, self->buf + i*ft->size, ft->size);//actually remove the item.
__atomic_thread_fence(__ATOMIC_SEQ_CST);
__atomic_store_n(self->flags + i, 0x00, __ATOMIC_SEQ_CST);//clear all the flags to mark the remove as completed
return 1;
}
int queue_init(queue_t *self, const queue_ft *ft, size_t reserve){
void *buf = malloc(reserve*ft->size);
if(!buf){
return 0;
}
uint8_t *flags = calloc(reserve, sizeof(uint8_t));
if(!flags){
free(buf);
return 0;
}
*self = (queue_t){
.buf=buf,
.flags=flags,
.cap=reserve,
.full=0,
.a=0,.b=0
};
return 1;
}
void queue_destroy(queue_t *self, const queue_ft *ft){
free(self->buf);
free(self->flags);
}
そしてここでは、テストプログラムのソースtest_queue_pcです。C:消費者のスレッドがキューが空になる前に、ちょうどproducing
のときにループ、プロデューサーが行われるため、彼らは待っていても返しますが、なぜ
#define _POSIX_C_SOURCE 201612UL
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <inttypes.h>
#include <pthread.h>
#include <math.h>
#include <time.h>
#include "queue.h"
//Generate primes up to this number. Note 78498 is the number of primes below 1000000; this is hard coded because the queue does not support growing yet.
#define MAX 1000000
#define QUEUE_SIZE 78498
#define NUM_PRODUCER_PAIRS 3
#define NUM_CONSUMERS 2
//Every producer and consumer thread calculates the 0th through 3rd moments of the sequence of primes it sees, as well as testing them for primality.
//The nth moment is the sum of the nth powers, thus, the order does not matter and if the primes are the same in both the producers and the consumers
//then the sums of the moments will also be the same. I check that the 0th through 3rd moments match which means it is nearly certain the primes go through
//the queue.
#define NUM_MOMENTS 4
//Deterministic Miller Rabin witnesses (see https://en.wikipedia.org/wiki/Miller–Rabin_primality_test)
#define DMR_PRIMES (uint64_t[]){2, 13, 23, 1662803}
#define DMR_PRIMES_C 4
//Macro to split an integer into three parts. The first part has the 2**0, 2**3, 2**6, ..., 2**60 bits of the original and 0 elsewhere.
//The second part has the 2**1, 2**4, 2**7, ..., 2**61 bits of the original and 0 elsewhere. The last part has the 2**2, ..., 2**62 bits.
//The 2**63 bit is lost. The masks represent the sums of geometric sequences. The original number can be obtained by bitwise or or xor on the parts.
//I spread the uint64_t's (which are unsigned long longs) over 3 uint64_t's so that they take up 24 bytes and memcpy'ing them happens in multiple steps.
//This spreading is only done on primes that have been produced before they are put into the queue. The consumers then recombine and verify them.
#define SPREAD_EMPLACE(n) ({__auto_type _n = (n); &(spread_integer){(_n)&(((1ULL<<60)-1)/7), (_n)&(((1ULL<<61)-2)/7), (_n)&(((1ULL<<62)-4)/7)};})
typedef struct{
uint64_t x, y, z;
} spread_integer;
queue_ft spread_integer_ft = {.size= sizeof(spread_integer)};
queue_t Q;
//Start producing count at 1 + (NUM_PRODUCING_THREADS << 1) because main generates 2 and 3 and reduce it by 1 every time a producer thread finishes
int producing = 1 + (NUM_PRODUCER_PAIRS << 1);
//Uses the binary algorithm for modular exponentiation (https://en.wikipedia.org/wiki/Exponentiation_by_squaring)
//It is a helper function for isPrime
uint64_t powmod(unsigned __int128 b, uint64_t e, uint64_t n){
unsigned __int128 r = 1;
b %= n;
while(e){
if(e&1){
r = r*b%n;
}
e >>= 1;
b = b*b%n;
}
return (uint64_t)r;
}
//uses deterministic Miller Rabin primality test
int isPrime(uint64_t n){
uint64_t s, d;//s, d | 2^s*d = n - 1
if(n%2 == 0){
return n == 2;
}
--n;
s = __builtin_ctzll(n);
d = n>>s;
++n;
for(uint64_t i = 0, a, x; i < DMR_PRIMES_C; ++i){
a = DMR_PRIMES[i];
if(a >= n){
break;
}
x = powmod(a, d, n);
if(x == 1 || x == n - 1){
goto CONTINUE_WITNESSLOOP;
}
for(a = 0; a < s - 1; ++a){
x = powmod(x, 2, n);
if(x == 1){
return 0;
}
if(x == n - 1){
goto CONTINUE_WITNESSLOOP;
}
}
return 0;
CONTINUE_WITNESSLOOP:;
}
return 1;
}
void *produce(void *_moments){
uint64_t *moments = _moments, n = *moments;//the output argument for the 0th moment serves as the input argument for the number to start checking for primes at
*moments = 0;
for(; n < MAX; n += 6*NUM_PRODUCER_PAIRS){//the producers are paired so one of every pair generates primes equal to 1 mod 6 and the other equal to 5 mod 6. main generates 2 and 3 the only exceptions
if(isPrime(n)){
for(uint64_t m = 1, i = 0; i < NUM_MOMENTS; m *= n, ++i){
moments[i] += m;
}
if(!queue_insert(&Q, &spread_integer_ft, SPREAD_EMPLACE(n))){
fprintf(stderr, "\e[1;31mERROR: Could not insert into queue.\e[0m\n");
exit(EXIT_FAILURE);
}
}
}
__atomic_fetch_sub(&producing, 1, __ATOMIC_SEQ_CST);//this thread is done generating primes; reduce producing counter by 1
return moments;
}
void *consume(void *_moments){
uint64_t *moments = _moments;
while(__atomic_load_n(&producing, __ATOMIC_SEQ_CST) || __atomic_load_n(&Q.full, __ATOMIC_SEQ_CST)){//busy loop while some threads are producing
spread_integer xyz;
if(queue_remove(&Q, &spread_integer_ft, &xyz)){
uint64_t n = xyz.x | xyz.y | xyz.z;
if(isPrime(n)){
for(uint64_t m = 1, i = 0; i < NUM_MOMENTS; m *= n, ++i){
moments[i] += m;
}
}else{
fprintf(stderr, "\e[1;31mERROR: Generated a prime that fails deterministic Miller Rabin.\e[0m\n");
exit(EXIT_FAILURE);
}
}
}
return moments;
}
int main(void){
if(!queue_init(&Q, &spread_integer_ft, QUEUE_SIZE)){
fprintf(stderr, "\e[1;31mERROR: Could not initialize queue.\e[0m\n");
exit(EXIT_FAILURE);
}
pthread_t producers[NUM_PRODUCER_PAIRS << 1], consumers[NUM_CONSUMERS];
uint64_t moments[(NUM_PRODUCER_PAIRS << 1) + 1 + NUM_CONSUMERS + 1][NUM_MOMENTS] = {};//the 2 extras are because main produces the primes 2 and 3 and consumes primes the consumers leave behind
for(size_t i = 0; i < NUM_CONSUMERS; ++i){//create consumers first to increase likelihood of causing bugs
if(pthread_create(consumers + i, NULL, consume, moments[(NUM_PRODUCER_PAIRS << 1) + 1 + i])){
fprintf(stderr, "\e[1;31mERROR: Could not create consumer thread.\e[0m\n");
exit(EXIT_FAILURE);
}
}
for(size_t i = 0; i < NUM_PRODUCER_PAIRS; ++i){
moments[i << 1][0] = 5 + 6*i;
if(pthread_create(producers + (i << 1), NULL, produce, moments[i << 1])){
fprintf(stderr, "\e[1;31mERROR: Could not create producer thread.\e[0m\n");
exit(EXIT_FAILURE);
}
moments[(i << 1) + 1][0] = 7 + 6*i;
if(pthread_create(producers + (i << 1) + 1, NULL, produce, moments[(i << 1) + 1])){
fprintf(stderr, "\e[1;31mERROR: Could not create producer thread.\e[0m\n");
exit(EXIT_FAILURE);
}
}
for(uint64_t n = 2; n < 4; ++n){
for(uint64_t m = 1, i = 0; i < NUM_MOMENTS; m *= n, ++i){
moments[NUM_PRODUCER_PAIRS << 1][i] += m;
}
if(!queue_insert(&Q, &spread_integer_ft, SPREAD_EMPLACE(n))){
fprintf(stderr, "\e[1;31mERROR: Could not insert into queue.\e[0m\n");
exit(EXIT_FAILURE);
}
}
__atomic_fetch_sub(&producing, 1, __ATOMIC_SEQ_CST);
uint64_t c = 0;
for(size_t i = 0; i < NUM_CONSUMERS; ++i){//join consumers first to bait bugs. Note consumers should not finish until the producing counter reaches 0
void *_c;
if(pthread_join(consumers[i], &_c)){
fprintf(stderr, "\e[1;31mERROR: Could not join consumer thread.\e[0m\n");
exit(EXIT_FAILURE);
}
c += (uintptr_t)_c;
}
for(size_t i = 0; i < NUM_PRODUCER_PAIRS << 1; ++i){
if(pthread_join(producers[i], NULL)){
fprintf(stderr, "\e[1;31mERROR: Could not join producer thread.\e[0m\n");
exit(EXIT_FAILURE);
}
}
//this really should not be happening because the consumer threads only return after the producing counter reaches 0,
//which only happens after all of the producer threads are done inserting items into the queue.
if(Q.full){
fprintf(stdout, "\e[1;31mWTF: Q.full != 0\nproducing == %d\e[0m\n", producing);
}
while(Q.full){
spread_integer xyz;
if(!queue_remove(&Q, &spread_integer_ft, &xyz)){
fprintf(stderr, "\e[1;31mERROR: Could not remove from non empty queue.\e[0m\n");
exit(EXIT_FAILURE);
}
uint64_t n = xyz.x | xyz.y | xyz.z;
if(isPrime(n)){
for(uint64_t m = 1, i = 0; i < NUM_MOMENTS; m *= n, ++i){
moments[(NUM_PRODUCER_PAIRS << 1) + 1 + NUM_CONSUMERS][i] += m;
}
}else{
fprintf(stderr, "\e[1;31mERROR: Generated a prime that fails deterministic Miller Rabin.\e[0m\n");
exit(EXIT_FAILURE);
}
}
queue_destroy(&Q, &spread_integer_ft);
for(uint64_t i = 0, p, c, j; i < NUM_MOMENTS; ++i){
for(j = p = 0; j < (NUM_PRODUCER_PAIRS << 1) + 1; ++j){
p += moments[j][i];
}
for(c = 0; j < (NUM_PRODUCER_PAIRS << 1) + 1 + NUM_CONSUMERS + 1; ++j){
c += moments[j][i];
}
printf("Moment %"PRIu64" %"PRIu64" -> %"PRIu64"\n", i, p, c);
}
}
私はその後
gcc -o test_queue_pc queue.c test_queue_pc.c -Wall -std=c99 -g -O0 -pthread -fuse-ld=gold -flto -lm
でコンパイルします彼らがループするときに正しいことをしてくださいproducing || Q.full
?
"構造体の位置初期化子を使用するのはひどいです"ので、gcc固有の機能を使用する方が良いですか? – Stargateur
gcc固有の機能を使用して位置イニシャライザの使用に関する警告を強制するのは、単に位置イニシャライザを使用するよりもずっと悪くなりますが、指定されたイニシャライザはgcc固有の機能ではないC99機能です。私はこの属性を追加しました。なぜなら、私が使っているプログラムの中の私の構造体のいくつかはすでにパックされていて、とにかく組み込み関数を使う必要があるからです。 – hacatu