2016-12-20 5 views
0

バッファにデータをプッシュできるバッファを作成しようとしています。バッファが完全かどうかチェックし、必要に応じてバッファを交換します。別のスレッドは、ファイル出力用のバッファを取得できます。原子インデックスを持つmutex配列からmutexをロックする

私はバッファを正常に実装しましたが、不完全なバッファを強制的にスワップして不完全バッファからデータを返すForceSwapBufferメソッドを追加したかったのです。これを行うために、読み書きバッファが同じであるかどうかをチェックします(書き込み可能な他のフルバッファがある間にバッファにスワップを強制してファイルに書き込もうとしても使用しません)。 このメソッドをGetBufferメソッドと並行して実行できるようにしたい(本当に必要ではありませんが、私はこのメソッドを試してみたいと思っていました)。

GetBufferがブロックし、ForceSwapBufferが終了すると、ForceSwapBufferではアトム_read_buffer_indexが変更されるため、新しいバッファがいっぱいになるまでブロックされます。これはいつもうまくいくのだろうか? GetBufferのブロッキングロックは、アトミックread_buffer_indexの変更を検出し、ロックしようとしているミューテックスを変更するか、ロックの開始時にチェックしますか?ロックする必要のあるミューテックスと、同じミューテックスをロックしようとしています変更?

/* selection of member data */ 
unsigned int _size, _count; 

std::atomic<unsigned int> _write_buffer_index, _read_buffer_index; 
unsigned int _index; 

std::unique_ptr< std::unique_ptr<T[]>[] > _buffers; 
std::unique_ptr<std::mutex[]> _mutexes; 

std::recursive_mutex _force_swap_buffer; 

/* selection of implementation of member functions */ 
template<typename T> // included to show the use of the recursive_mutex 
void Buffer<T>::Push(T *data, unsigned int length) { 
    std::lock_guard<std::recursive_mutex> lock(_force_swap_buffer); 
    if (_index + length <= _size) { 
     memcpy(&_buffers[_write_buffer_index][_index], data, length*sizeof(T)); 
     _index += length; 
    } else { 
     memcpy(&_buffers[_write_buffer_index][_index], data, (_size - _index)*sizeof(T)); 
     unsigned int t_index = _index; 
     SwapBuffer(); 
     Push(&data[_size - t_index], length - (_size - t_index)); 
    } 
} 

template<typename T> 
std::unique_ptr<T[]> Buffer<T>::GetBuffer() { 
    std::lock_guard<std::mutex> lock(_mutexes[_read_buffer_index]); // where the magic should happen 
    std::unique_ptr<T[]> result(new T[_size]); 
    memcpy(result.get(), _buffers[_read_buffer_index].get(), _size*sizeof(T)); 
    _read_buffer_index = (_read_buffer_index + 1) % _count; 
    return std::move(result); 
} 

template<typename T> 
std::unique_ptr<T[]> Buffer<T>::ForceSwapBuffer() { 
    std::lock_guard<std::recursive_mutex> lock(_force_swap_buffer); // lock that forbids pushing and force swapping at the same time 

    if (_write_buffer_index != _read_buffer_index) 
     return nullptr; 

    std::unique_ptr<T[]> result(new T[_index]); 
    memcpy(result.get(), _buffers[_read_buffer_index].get(), _index*sizeof(T)); 

    unsigned int next = (_write_buffer_index + 1) % _count; 

    _mutexes[next].lock(); 
    _read_buffer_index = next; // changing the read_index while the other thread it blocked, the new mutex is already locked so the other thread should remain locked 
    _mutexes[_write_buffer_index].unlock(); 

    _write_buffer_index = next; 
    _index = 0; 

    return result; 
} 

答えて

0

コードにいくつかの問題があります。まず、原子変数を変更するときは注意してください。小さなセットの操作だけが実際にはアトミックであり(http://en.cppreference.com/w/cpp/atomic/atomic参照)、アトミック操作の組み合わせはアトミックではありません。次のようにしてください。

_read_buffer_index = (_read_buffer_index + 1) % _count; 

ここでは、変数、インクリメント、モジュロ演算、およびアトミックストアのアトミックリードがあります。しかし、文全体がでなく、アトミックです! _countが2の累乗である場合は、++オペレーターを使用することができます。そうでない場合には、を一時変数に読み込み、上記の計算を実行してから、の変数が変更されていない場合はcompare_exchange関数を使用して新しい値を保存する必要があります。明らかに後者は成功するまでループしなければならない。また、あるスレッドが2番目のスレッドのreadとcompare_exchangeの間で変数_countをインクリメントする可能性について心配する必要があります。その場合、2番目のスレッドは変数が変更されていないと誤って判断します。

第2の問題は、キャッシュラインバウンスです。同じキャッシュラインに複数のミューテックスがある場合、2つ以上のスレッドが同時にアクセスしようとすると、パフォーマンスが非常に悪くなります。キャッシュラインのサイズは、お使いのプラットフォームによって異なります。

主な問題は、ForceSwapBuffer()Push()両方が_force_swap_bufferミューテックスをロックしながら、GetBuffer()にはないということです。 ただし、変更は_read_buffer_indexです。 ForceSwapBuffer()中のSO:

std::lock_guard<std::recursive_mutex> lock(_force_swap_buffer); 

if (_write_buffer_index != _read_buffer_index) 
    return nullptr; 

// another thread can call GetBuffer() here and change _read_buffer_index 

// rest of the code here 

_write_buffer_index == _read_buffer_indexif -statement後は、実際に無効であると仮定。

+0

アトミック変数についてのあなたのコメントを確かにチェックしますが、私の場合はそれに書き込むスレッドが1つしかありません。アトミックは別のスレッドの読み込み動作を確実にするためのものです。 – aavdiere

+0

私が言及したことを忘れたのは、 '_write_buffer_index == _read_buffer_index'のときGetBufferは常にロックされるということです。なぜなら、別のスレッドがまだバッファに書き込んでいるときに書き込みバッファを取得できないからです。バッファ)ので、私は '_write_buffer_index == _read_buffer_index'仮定が無効であるとは思わない。 – aavdiere

+0

私はコメントを編集できないので、 'Push 'によって呼び出される通常の' SwapBuffer() 'は新しいバッファが常にロックされるようにします。だから、 'GetBuffer()'が呼び出され、フルバッファが利用できない場合( '_write_buffer_index == _read_buffer_index')、新しいバッファが利用可能になるまでブロックされます。 ( 'write_buffer_index'と' read_buffer_index'を同時にインクリメントして)一つのバッファをスキップしたいだけですが、他のスレッドのlock(_mutexes [_read_buffer_index])がブロックしているmutexを変更するかどうかは分かりません変数が変更されました。 – aavdiere

関連する問題