一つのスレッドが大きなバッファにメッセージを読み込んで処理をスレッド群に分散させるというユースケースがあります。その後、バッファは複数のスレッドによって共有されます。読み取り専用で、最後のスレッドが終了したときにバッファを解放する必要があります。バッファは、ロックフリーのスラブアロケータから割り当てられます。boost :: intrusive_ptrを使った共有バッファ
私の最初の設計は、バッファにshared_ptrを使用することでした。しかし、バッファは異なるサイズにすることができます。それを取り巻く私のやり方は、このようなことでした。
struct SharedBuffer {
SharedBuffer (uint16_t len, std::shared_ptr<void> ptr)
: _length(len), _buf(std::move(ptr))
{
}
uint8_t data() { return (uint8_t *)_buf.get(); }
uint16_t length
std::shared_ptr<void> _buf; // type-erase the shared_ptr as the SharedBuffer
// need to stored in some other structs
};
今アロケータは、このようなshared_ptrを割り当てます:
SharedBuffer allocate (size_t size)
{
auto buf = std::allocate_shared<std::array<uint8_t, 16_K>>(myallocator);
return SharedBuffer{16_K, buf}; // type erase the std::array
}
そしてSharedBufferはそれを望んでいる各スレッドにエンキューされます。
私は不必要にたくさんのことをやっていると思います。私は以下のスキームでboost :: intrusive_ptrを実行します。私は可変サイズの配列を使用しているので、物事はビットC'ish-です。ここでは、簡略化のためにスラブアロケータをnew()という演算子で変更しました。私はこの実装が大丈夫かどうかを調べるためにそれを実行したかったのです。
template <typename T>
inline int atomicIncrement (T* t)
{
return __atomic_add_fetch(&t->_ref, 1, __ATOMIC_ACQUIRE);
}
template <typename T>
inline int atomicDecrement (T* t)
{
return __atomic_sub_fetch(&t->_ref, 1, __ATOMIC_RELEASE);
}
class SharedBuffer {
public:
friend int atomicIncrement<SharedBuffer>(SharedBuffer*);
friend int atomicDecrement<SharedBuffer>(SharedBuffer*);
SharedBuffer(uint16_t len) : _length(len) {}
uint8_t *data()
{
return &_data[0];
}
uint16_t length() const
{
return _length;
}
private:
int _ref{0};
const uint16_t _length;
uint8_t _data[];
};
using SharedBufferPtr = boost::intrusive_ptr<SharedBuffer>;
SharedBufferPtr allocate (size_t size)
{
// dummy implementation
void *p = ::operator new (size + sizeof(SharedBuffer));
// I am not explicitly constructing the array of uint8_t
return new (p) SharedBuffer(size);
}
void deallocate (SharedBuffer* sbuf)
{
sbuf->~SharedBuffer();
// dummy implementation
::operator delete ((void *)sbuf);
}
void intrusive_ptr_add_ref(SharedBuffer* sbuf)
{
atomicIncrement(sbuf);
}
void intrusive_ptr_release (SharedBuffer* sbuf)
{
if (atomicDecrement(sbuf) == 0) {
deallocate(sbuf);
}
}