バッファにデータをプッシュできるバッファを作成しようとしています。バッファが完全かどうかチェックし、必要に応じてバッファを交換します。別のスレッドは、ファイル出力用のバッファを取得できます。原子インデックスを持つmutex配列からmutexをロックする
私はバッファを正常に実装しましたが、不完全なバッファを強制的にスワップして不完全バッファからデータを返すForceSwapBufferメソッドを追加したかったのです。これを行うために、読み書きバッファが同じであるかどうかをチェックします(書き込み可能な他のフルバッファがある間にバッファにスワップを強制してファイルに書き込もうとしても使用しません)。 このメソッドをGetBufferメソッドと並行して実行できるようにしたい(本当に必要ではありませんが、私はこのメソッドを試してみたいと思っていました)。
GetBufferがブロックし、ForceSwapBufferが終了すると、ForceSwapBufferではアトム_read_buffer_indexが変更されるため、新しいバッファがいっぱいになるまでブロックされます。これはいつもうまくいくのだろうか? GetBufferのブロッキングロックは、アトミックread_buffer_indexの変更を検出し、ロックしようとしているミューテックスを変更するか、ロックの開始時にチェックしますか?ロックする必要のあるミューテックスと、同じミューテックスをロックしようとしています変更?
/* selection of member data */
unsigned int _size, _count;
std::atomic<unsigned int> _write_buffer_index, _read_buffer_index;
unsigned int _index;
std::unique_ptr< std::unique_ptr<T[]>[] > _buffers;
std::unique_ptr<std::mutex[]> _mutexes;
std::recursive_mutex _force_swap_buffer;
/* selection of implementation of member functions */
template<typename T> // included to show the use of the recursive_mutex
void Buffer<T>::Push(T *data, unsigned int length) {
std::lock_guard<std::recursive_mutex> lock(_force_swap_buffer);
if (_index + length <= _size) {
memcpy(&_buffers[_write_buffer_index][_index], data, length*sizeof(T));
_index += length;
} else {
memcpy(&_buffers[_write_buffer_index][_index], data, (_size - _index)*sizeof(T));
unsigned int t_index = _index;
SwapBuffer();
Push(&data[_size - t_index], length - (_size - t_index));
}
}
template<typename T>
std::unique_ptr<T[]> Buffer<T>::GetBuffer() {
std::lock_guard<std::mutex> lock(_mutexes[_read_buffer_index]); // where the magic should happen
std::unique_ptr<T[]> result(new T[_size]);
memcpy(result.get(), _buffers[_read_buffer_index].get(), _size*sizeof(T));
_read_buffer_index = (_read_buffer_index + 1) % _count;
return std::move(result);
}
template<typename T>
std::unique_ptr<T[]> Buffer<T>::ForceSwapBuffer() {
std::lock_guard<std::recursive_mutex> lock(_force_swap_buffer); // lock that forbids pushing and force swapping at the same time
if (_write_buffer_index != _read_buffer_index)
return nullptr;
std::unique_ptr<T[]> result(new T[_index]);
memcpy(result.get(), _buffers[_read_buffer_index].get(), _index*sizeof(T));
unsigned int next = (_write_buffer_index + 1) % _count;
_mutexes[next].lock();
_read_buffer_index = next; // changing the read_index while the other thread it blocked, the new mutex is already locked so the other thread should remain locked
_mutexes[_write_buffer_index].unlock();
_write_buffer_index = next;
_index = 0;
return result;
}
アトミック変数についてのあなたのコメントを確かにチェックしますが、私の場合はそれに書き込むスレッドが1つしかありません。アトミックは別のスレッドの読み込み動作を確実にするためのものです。 – aavdiere
私が言及したことを忘れたのは、 '_write_buffer_index == _read_buffer_index'のときGetBufferは常にロックされるということです。なぜなら、別のスレッドがまだバッファに書き込んでいるときに書き込みバッファを取得できないからです。バッファ)ので、私は '_write_buffer_index == _read_buffer_index'仮定が無効であるとは思わない。 – aavdiere
私はコメントを編集できないので、 'Push 'によって呼び出される通常の' SwapBuffer() 'は新しいバッファが常にロックされるようにします。だから、 'GetBuffer()'が呼び出され、フルバッファが利用できない場合( '_write_buffer_index == _read_buffer_index')、新しいバッファが利用可能になるまでブロックされます。 ( 'write_buffer_index'と' read_buffer_index'を同時にインクリメントして)一つのバッファをスキップしたいだけですが、他のスレッドのlock(_mutexes [_read_buffer_index])がブロックしているmutexを変更するかどうかは分かりません変数が変更されました。 – aavdiere