2017-12-22 19 views
-1

クライアントに送信するときにネットワークサーバーのダブルバッファリングを実装しようとしています。アイデアは、私はそれを実行しようとすると、私はエラーを取得しています、それは見た目ほど素晴らしい、残念ながらboost :: asio double buffering

boost::asio::async_write - ensure only one outstanding call

から来ました。 AFAIK、私はイテレータを使用していないので、async_writeは完了しようとしていると仮定しますが、本当に何が問題を引き起こしているのか分かりません。

エラーは、最初のasync_writeが転記された後に発生します。

エラーがコールスタックがある

"ベクターイテレータdereferencableない" で、ベクトルの実装

reference operator*() const 
    { // return designated object 
#if _ITERATOR_DEBUG_LEVEL == 2 
    const auto _Mycont = static_cast<const _Myvec *>(this->_Getcont()); 
    if (_Mycont == 0 
     || _Ptr == _Tptr() 
     || _Ptr < _Mycont->_Myfirst 
     || _Mycont->_Mylast <= _Ptr) 
     { // report error 
     _DEBUG_ERROR("vector iterator not dereferencable"); 
     _SCL_SECURE_OUT_OF_RANGE; 
     } 

にこのソースから来ている:

msvcp140d.dll!00007ff9f9f40806() Unknown 
ConsoleApplication2.exe!std::_Vector_const_iterator<std::_Vector_val<std::_Simple_types<char> > >::operator*() Line 74 C++ 
ConsoleApplication2.exe!boost::asio::detail::buffer_debug_check<std::_Vector_const_iterator<std::_Vector_val<std::_Simple_types<char> > > >::operator()() Line 532 C++ 
ConsoleApplication2.exe!std::_Invoker_functor::_Call<boost::asio::detail::buffer_debug_check<std::_Vector_const_iterator<std::_Vector_val<std::_Simple_types<char> > > > & __ptr64>(boost::asio::detail::buffer_debug_check<std::_Vector_const_iterator<std::_Vector_val<std::_Simple_types<char> > > > & _Obj) Line 1377 C++ 
ConsoleApplication2.exe!std::invoke<boost::asio::detail::buffer_debug_check<std::_Vector_const_iterator<std::_Vector_val<std::_Simple_types<char> > > > & __ptr64>(boost::asio::detail::buffer_debug_check<std::_Vector_const_iterator<std::_Vector_val<std::_Simple_types<char> > > > & _Obj) Line 1445 C++ 
ConsoleApplication2.exe!std::_Invoke_ret<void,boost::asio::detail::buffer_debug_check<std::_Vector_const_iterator<std::_Vector_val<std::_Simple_types<char> > > > & __ptr64>(std::_Forced<void,1> __formal, boost::asio::detail::buffer_debug_check<std::_Vector_const_iterator<std::_Vector_val<std::_Simple_types<char> > > > & <_Vals_0>) Line 1462 C++ 
ConsoleApplication2.exe!std::_Func_impl<boost::asio::detail::buffer_debug_check<std::_Vector_const_iterator<std::_Vector_val<std::_Simple_types<char> > > >,std::allocator<int>,void>::_Do_call() Line 214 C++ 
ConsoleApplication2.exe!std::_Func_class<void>::operator()() Line 280 C++ 
ConsoleApplication2.exe!boost::asio::detail::buffer_cast_helper(const boost::asio::const_buffer & b) Line 276 C++ 
ConsoleApplication2.exe!boost::asio::buffer_cast<void const * __ptr64>(const boost::asio::const_buffer & b) Line 435 C++ 
ConsoleApplication2.exe!boost::asio::buffer(const boost::asio::const_buffer & b, unsigned __int64 max_size_in_bytes) Line 751 C++ 
ConsoleApplication2.exe!boost::asio::detail::consuming_buffers_iterator<boost::asio::const_buffer,std::_Vector_const_iterator<std::_Vector_val<std::_Simple_types<boost::asio::const_buffer> > > >::consuming_buffers_iterator<boost::asio::const_buffer,std::_Vector_const_iterator<std::_Vector_val<std::_Simple_types<boost::asio::const_buffer> > > >(bool at_end, const boost::asio::const_buffer & first, std::_Vector_const_iterator<std::_Vector_val<std::_Simple_types<boost::asio::const_buffer> > > begin_remainder, std::_Vector_const_iterator<std::_Vector_val<std::_Simple_types<boost::asio::const_buffer> > > end_remainder, unsigned __int64 max_size) Line 62 C++ 
ConsoleApplication2.exe!boost::asio::detail::consuming_buffers<boost::asio::const_buffer,std::vector<boost::asio::const_buffer,std::allocator<boost::asio::const_buffer> > >::begin() Line 210 C++ 
ConsoleApplication2.exe!boost::asio::detail::buffer_sequence_adapter<boost::asio::const_buffer,boost::asio::detail::consuming_buffers<boost::asio::const_buffer,std::vector<boost::asio::const_buffer,std::allocator<boost::asio::const_buffer> > > >::validate(const boost::asio::detail::consuming_buffers<boost::asio::const_buffer,std::vector<boost::asio::const_buffer,std::allocator<boost::asio::const_buffer> > > & buffer_sequence) Line 145 C++ 
ConsoleApplication2.exe!boost::asio::detail::win_iocp_socket_send_op<boost::asio::detail::consuming_buffers<boost::asio::const_buffer,std::vector<boost::asio::const_buffer,std::allocator<boost::asio::const_buffer> > >,boost::asio::detail::write_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::vector<boost::asio::const_buffer,std::allocator<boost::asio::const_buffer> >,boost::asio::detail::transfer_all_t,void <lambda>(const boost::system::error_code &, unsigned __int64) > >::do_complete(boost::asio::detail::win_iocp_io_service * owner, boost::asio::detail::win_iocp_operation * base, const boost::system::error_code & result_ec, unsigned __int64 bytes_transferred) Line 74 C++ 
ConsoleApplication2.exe!boost::asio::detail::win_iocp_operation::complete(boost::asio::detail::win_iocp_io_service & owner, const boost::system::error_code & ec, unsigned __int64 bytes_transferred) Line 47 C++ 
ConsoleApplication2.exe!boost::asio::detail::win_iocp_io_service::do_one(bool block, boost::system::error_code & ec) Line 406 C++ 
ConsoleApplication2.exe!boost::asio::detail::win_iocp_io_service::run(boost::system::error_code & ec) Line 164 C++ 
ConsoleApplication2.exe!boost::asio::io_service::run() Line 59 C++ 
ConsoleApplication2.exe!ConnectionManager::IoServiceThreadProc() Line 83 C++ 
[External Code] 

マイ最小限のコンパイル例で、そのタイマーで「ビープ音#」を送信しようとしたばかりのネットワークサーバー:

Connection.h

#pragma once 

#include <boost/asio.hpp> 

#include <atomic> 
#include <condition_variable> 
#include <memory> 
#include <mutex> 

//-------------------------------------------------------------------- 
class ConnectionManager; 

//-------------------------------------------------------------------- 
class Connection : public std::enable_shared_from_this<Connection> 
{ 
public: 

    typedef std::shared_ptr<Connection> SharedPtr; 

    // Ensure all instances are created as shared_ptr in order to fulfill requirements for shared_from_this 
    static Connection::SharedPtr Create(ConnectionManager * connectionManager, boost::asio::ip::tcp::socket & socket); 

    Connection(const Connection &) = delete; 
    Connection(Connection &&) = delete; 
    Connection & operator = (const Connection &) = delete; 
    Connection & operator = (Connection &&) = delete; 
    ~Connection(); 

    // We have to defer the start until we are fully constructed because we share_from_this() 
    void Start(); 
    void Stop(); 

    void Send(const std::vector<char> & data); 

private: 

    ConnectionManager *          m_owner; 
    boost::asio::ip::tcp::socket       m_socket; 
    std::atomic<bool>          m_stopped; 
    boost::asio::streambuf         m_receiveBuffer; 
    mutable std::mutex          m_sendMutex; 
    std::vector<boost::asio::const_buffer>     m_sendBuffers[2]; // Double buffer 
    int              m_activeSendBufferIndex; 
    bool             m_sending; 

    std::vector<char>          m_allReadData; 

    Connection(ConnectionManager * connectionManager, boost::asio::ip::tcp::socket socket); 

    void DoReceive(); 
    void DoSend(); 
}; 

//-------------------------------------------------------------------- 

Connection.cpp

#include "Connection.h" 
#include "ConnectionManager.h" 
#include "Logger.h" 

#include <boost/bind.hpp> 

#include <algorithm> 
#include <cstdio> 

//-------------------------------------------------------------------- 
Connection::SharedPtr Connection::Create(ConnectionManager * connectionManager, boost::asio::ip::tcp::socket & socket) 
{ 
    return Connection::SharedPtr(new Connection(connectionManager, std::move(socket))); 
} 

//-------------------------------------------------------------------- 
Connection::Connection(ConnectionManager * connectionManager, boost::asio::ip::tcp::socket socket) 
    : 
    m_owner        (connectionManager) 
    , m_socket       (std::move(socket)) 
    , m_stopped       (false) 
    , m_receiveBuffer     () 
    , m_sendMutex      () 
    , m_sendBuffers      () 
    , m_activeSendBufferIndex    (0) 
    , m_sending       (false) 
    , m_allReadData      () 
{ 
} 

//-------------------------------------------------------------------- 
Connection::~Connection() 
{ 
    // Boost uses RAII, so we don't have anything to do. Let thier destructors take care of business 
} 

//-------------------------------------------------------------------- 
void Connection::Start() 
{ 
    DoReceive(); 
} 

//-------------------------------------------------------------------- 
void Connection::Stop() 
{ 
    // The entire connection class is only kept alive, because it is a shared pointer and always has a ref count 
    // as a consequence of the outstanding async receive call that gets posted every time we receive. 
    // Once we stop posting another receive in the receive handler and once our owner release any references to 
    // us, we will get destroyed. 
    m_stopped = true; 
    m_owner->OnConnectionClosed(shared_from_this()); 
} 

//-------------------------------------------------------------------- 
void Connection::Send(const std::vector<char> & data) 
{ 
    std::lock_guard<std::mutex> lock(m_sendMutex); 

    // Append to the inactive buffer 
    m_sendBuffers[m_activeSendBufferIndex^1].push_back(boost::asio::buffer(data)); 

    // 
    DoSend(); 
} 

//-------------------------------------------------------------------- 
void Connection::DoSend() 
{ 
    // Check if there is an async send in progress 
    // An empty active buffer indicates there is no outstanding send 
    if (m_sendBuffers[m_activeSendBufferIndex].empty()) 
    { 
     m_activeSendBufferIndex ^= 1; 

     boost::asio::async_write(m_socket, m_sendBuffers[m_activeSendBufferIndex], 
      [this](const boost::system::error_code & errorCode, size_t bytesTransferred) 
      { 
       std::lock_guard<std::mutex> lock(m_sendMutex); 

       m_sendBuffers[m_activeSendBufferIndex].clear(); 

       if (errorCode) 
       { 
        printf("An error occured while attemping to send data to a connection. Error Code: %d", errorCode.value()); 

        // An error occurred 
        // We do not stop or close on sends, but instead let the receive error out and then close 
        return; 
       } 

       // Check if there is more to send that has been queued up on the inactive buffer, 
       // while we were sending what was on the active buffer 
       if (!m_sendBuffers[m_activeSendBufferIndex^1].empty()) 
       { 
        DoSend(); 
       } 
      }); 
    } 
} 

//-------------------------------------------------------------------- 
void Connection::DoReceive() 
{ 
    auto self(shared_from_this()); 

    boost::asio::async_read_until(m_socket, m_receiveBuffer, '#', 
     [self](const boost::system::error_code & errorCode, size_t bytesRead) 
     { 
      if (errorCode) 
      { 
       printf("An error occured while attemping to receive data from connection. Error Code: %d", errorCode.value()); 

       // Notify our masters that we are ready to be destroyed 
       self->m_owner->OnConnectionClosed(self); 

       // An error occured 
       return; 
      } 

      // Grab the read data 
      std::istream stream(&self->m_receiveBuffer); 
      std::string data; 
      std::getline(stream, data, '#'); 
      data += "#"; 

      printf("Received data from client: %s", data.c_str()); 

      // Issue the next receive 
      if (!self->m_stopped) 
      { 
       self->DoReceive(); 
      } 
     }); 
} 

//-------------------------------------------------------------------- 

ConnectionManager.h

#pragma once 

#include "Connection.h" 

// Boost Includes 
#include <boost/asio.hpp> 

// Standard Includes 
#include <thread> 
#include <vector> 

//-------------------------------------------------------------------- 
class ConnectionManager 
{ 
public: 

    ConnectionManager(unsigned port, size_t numThreads); 
    ConnectionManager(const ConnectionManager &) = delete; 
    ConnectionManager(ConnectionManager &&) = delete; 
    ConnectionManager & operator = (const ConnectionManager &) = delete; 
    ConnectionManager & operator = (ConnectionManager &&) = delete; 
    ~ConnectionManager(); 

    void Start(); 
    void Stop(); 

    void OnConnectionClosed(Connection::SharedPtr connection); 

protected: 

    boost::asio::io_service   m_io_service; 
    boost::asio::ip::tcp::acceptor  m_acceptor; 
    boost::asio::ip::tcp::socket  m_listenSocket; 
    std::vector<std::thread>   m_threads; 

    mutable std::mutex     m_connectionsMutex; 
    std::vector<Connection::SharedPtr> m_connections; 

    boost::asio::deadline_timer  m_timer; 

    void IoServiceThreadProc(); 

    void DoAccept(); 
    void DoTimer(); 
}; 

//-------------------------------------------------------------------- 

ConnectionManager.cpp

#include "ConnectionManager.h" 

#include "Logger.h" 

#include <boost/bind.hpp> 
#include <boost/date_time/posix_time/posix_time.hpp> 

#include<cstdio> 
#include <system_error> 

//------------------------------------------------------------------------------ 
ConnectionManager::ConnectionManager(unsigned port, size_t numThreads) 
    : 
    m_io_service () 
    , m_acceptor (m_io_service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port)) 
    , m_listenSocket(m_io_service) 
    , m_threads  (numThreads) 
    , m_timer  (m_io_service) 
{ 
} 

//------------------------------------------------------------------------------ 
ConnectionManager::~ConnectionManager() 
{ 
    Stop(); 
} 

//------------------------------------------------------------------------------ 
void ConnectionManager::Start() 
{ 
    if (m_io_service.stopped()) 
    { 
     m_io_service.reset(); 
    } 

    DoAccept(); 

    for (auto & thread : m_threads) 
    { 
     if (!thread.joinable()) 
     { 
      thread.swap(std::thread(&ConnectionManager::IoServiceThreadProc, this)); 
     } 
    } 

    DoTimer(); 
} 

//------------------------------------------------------------------------------ 
void ConnectionManager::Stop() 
{ 
    { 
     std::lock_guard<std::mutex> lock(m_connectionsMutex); 
     m_connections.clear(); 
    } 

    // TODO - Will the stopping of the io_service be enough to kill all the connections and ultimately have them get destroyed? 
    //  Because remember they have outstanding ref count to thier shared_ptr in the async handlers 
    m_io_service.stop(); 

    for (auto & thread : m_threads) 
    { 
     if (thread.joinable()) 
     { 
      thread.join(); 
     } 
    } 
} 

//------------------------------------------------------------------------------ 
void ConnectionManager::IoServiceThreadProc() 
{ 
    try 
    { 
     // Log that we are starting the io_service thread 
     { 
      printf("io_service socket thread starting."); 
     } 

     // Run the asynchronous callbacks from the socket on this thread 
     // Until the io_service is stopped from another thread 
     m_io_service.run(); 
    } 
    catch (std::system_error & e) 
    { 
     printf("System error caught in io_service socket thread. Error Code: %d", e.code().value()); 
    } 
    catch (std::exception & e) 
    { 
     printf("Standard exception caught in io_service socket thread. Exception: %s", e.what()); 
    } 
    catch (...) 
    { 
     printf("Unhandled exception caught in io_service socket thread."); 
    } 

    { 
     printf("io_service socket thread exiting."); 
    } 
} 

//------------------------------------------------------------------------------ 
void ConnectionManager::DoAccept() 
{ 
    m_acceptor.async_accept(m_listenSocket, 
     [this](const boost::system::error_code errorCode) 
     { 
      if (errorCode) 
      { 
       printf(, "An error occured while attemping to accept connections. Error Code: %d", errorCode.value()); 
       return; 
      } 

      // Create the connection from the connected socket 
      std::lock_guard<std::mutex> lock(m_connectionsMutex); 
      Connection::SharedPtr connection = Connection::Create(this, m_listenSocket); 
      m_connections.push_back(connection); 
      connection->Start(); 

      DoAccept(); 
     }); 
} 

//------------------------------------------------------------------------------ 
void ConnectionManager::OnConnectionClosed(Connection::SharedPtr connection) 
{ 
    std::lock_guard<std::mutex> lock(m_connectionsMutex); 

    auto itConnection = std::find(m_connections.begin(), m_connections.end(), connection); 
    if (itConnection != m_connections.end()) 
    { 
     m_connections.erase(itConnection); 
    } 
} 

//------------------------------------------------------------------------------ 
void ConnectionManager::DoTimer() 
{ 
    if (!m_io_service.stopped()) 
    { 
     // Send messages every second 
     m_timer.expires_from_now(boost::posix_time::seconds(30)); 
     m_timer.async_wait(
      [this](const boost::system::error_code & errorCode) 
      { 
       std::lock_guard<std::mutex> lock(m_connectionsMutex); 
       for (auto connection : m_connections) 
       { 
        connection->Send(std::vector<char>{'b', 'e', 'e', 'p', '#'}); 
       } 

       DoTimer(); 
      }); 
    } 
} 

main.cppに

#include "ConnectionManager.h" 

#include <cstring> 
#include <iostream> 
#include <string> 

int main() 
{ 
    ConnectionManager connectionManager(4000, 2); 
    connectionManager.Start(); 

    std::this_thread::sleep_for(std::chrono::minutes(5)); 

    connectionManager.Stop(); 

    return 0; 
} 

私はバッファのベクトルを持っているし、それらの2があり、一つはアクティブで、一度非アクティブです。未処理の非同期書き込みが投稿されている間、非アクティブが追加されます。その後、非同期書き込みのハンドラは、送信されたはずのアクティブバッファをクリアします。それは私にとっては大丈夫です。私は昨日一日中それを見てきました。

誰かが私が間違っていたことを知っていますか?これらのバッファーを適切に使用する方法については、まったく知らない。

答えて

1

問題は、あなたがconst参照によってSendメソッドに渡されたベクトルの一時オブジェクトを作成

connection->Send(std::vector<char>{'b', 'e', 'e', 'p', '#'}); 

この行であってもよいが、あなたのSend方法であなたはboost::asio::buffer(data)しかしbufferが唯一のポインタが含まれているオブジェクトを作成し、使用していますデータとデータのサイズ。 Sendメソッドを呼び出すと、DoSendメソッドでデータを送信するときに、async_writeを使用すると、一時的なベクターが削除され、バッファが無効になります。

+0

これは、「ビープ音」を含むベクターがバッファーを意味し、ハンドラーが呼び出されるまで手を触れないようにする必要があることを意味します。データのコピーが含まれていない場合、std :: vector バッファのポイントは何ですか?私はそれがダブルバッファーを実装する全体のポイントだと思った。 それでは、私のコードはどのように見えますか?今私はさらに混乱していますが、この問題を解決することに興奮しています。 –

+0

私は問題がどこにあるのかを暗示しています。私はあなたの問題の完全な解決策を持っていません。あなたはベクトルのコピーを作ってそれを送信し、次にデータが存在することを確信しているか、または同期的にデータを送信します。 – rafix07

+0

Boost Asioのバッファーコンセプト_never_はデータをコピーします。それはまさに[定義]です(http://www.boost.org/doc/libs/1_66_0/doc/html/boost_asio/overview/core/buffers.html)。これは、C++があなたに必要な支払いをしようとしているからです(コピーが必要な場合は、自分で管理できます)。 – sehe

関連する問題