2013-08-13 18 views
15

私の問題は次のとおりです。私は非同期的にいくつかの操作を開始し、それらのすべてが終了するまで続行したい。 Boost Asioを使用すると、これを行う最も簡単な方法は次のとおりです。 tasksが、いくつかの非同期操作をサポートするオブジェクトのコンテナの一種であるとします。Boost Asioで未来を非同期的に待つ方法はありますか?

tasksToGo = tasks.size(); 
for (auto task: tasks) { 
    task.async_do_something([](const boost::system::error_code& ec) 
    { 
     if (ec) { 
      // handle error 
     } else { 
      if (--taslsToGo == 0) { 
       tasksFinished(); 
      } 
     } 
    }); 
} 

この解決策の問題は、回避策のように感じることです。 Boost 1.54では、私は先物でそれを行うことができますが、私は同期を待つことしかできません。これは、run()が呼び出されたスレッドとは別のスレッドからのみ可能です。

for (auto task: tasks) { 
    futures.push_back(task.async_do_something(boost::asio::use_future)); 
} 

for (auto future: futures) { 
    future.wait(); 
} 

このコードは、前のものよりもはるかに明確であるが、私はしたくない別のスレッドを必要とします。私はこのように使用できるものが欲しい:

for (auto task: tasks) { 
    futures.push_back(task.async_do_something(boost::asio::use_future)); 
} 

boost::asio::spawn(ioService, [](boost::asio::yield_context yield) 
{ 
    for (auto future: futures) { 
     future.async_wait(yield); 
    } 
    tasksFinished(); 

} 

同様に使用できるものはありますか?

+2

は、あなたが探しているものを 'future.then()'はありませんか? –

答えて

17

は、私の知る限りでは、このための第一級のサポートは現在ありませんも参照してください。しかし、ライブラリの方向性を考えれば、この機能が将来利用できない場合は驚くだろう。

いくつかの論文は、この種の機能のサポートを追加するために提案されている:

  • N3558 - A Standardized Representation of Asynchronous Operationsは特に興味深いものです。それはとfuture.next()を提案する。それが実装されている場合、それは非同期鎖を表すことが可能であろう。

    for (auto task: tasks) { 
        futures.push_back(task.async_do_something(boost::asio::use_future)); 
    } 
    when_all(futures).then(&tasksFinished); 
    
  • N3562 - Executors and schedulersエグゼキュータを導入します。これは、asyncが実行できるコンテキストに関してより細かい制御を提供するために使用することができます。 Boost.Asioの場合、io_serviceにアクセスするエグゼキュータを提供する必要があります。

これらの論文はまだ進行中であるが、定期的にBoost.ThreadのConformance and Extensionページをチェックする価値があるとBoost.Asioのgithubこれらの提案の早期適応のためにも。


私はこの機能を1年前に必要としていました。以前のバージョンのBoostは、自分のソリューションで動作しました。セマンティクスに関してはまだまだ問題がありますが、公式が採用されるまでは参考資料として役立つかもしれません。私はコードを提供する前に、ここにあなたの質問に基づいて、サンプルアプリケーションです:

#include <iostream> 
#include <boost/asio.hpp> 
#include <boost/bind.hpp> 
#include "async_ops.hpp" 

void handle_timer(const boost::system::error_code& error, int x) 
{ 
    std::cout << "in handle timer: " << x << " : " 
      << error.message() << std::endl; 
} 

void a() { std::cout << "a" << std::endl; } 
void b() { std::cout << "b" << std::endl; } 
void c() { std::cout << "c" << std::endl; } 

int main() 
{ 
    boost::asio::io_service io_service; 
    boost::asio::deadline_timer timer1(io_service); 
    boost::asio::deadline_timer timer2(io_service); 

    // Create a chain that will continue once 2 handlers have been executed. 
    chain all_expired = when_all(io_service, 2); 

    all_expired.then(&a) // Once 2 handlers finish, run a within io_service. 
      .then(&b) // Once a has finished, run b within io_service. 
      .then(&c); // Once b has finished, run c within io_service. 

    // Set expiration times for timers. 
    timer1.expires_from_now(boost::posix_time::seconds(2)); 
    timer2.expires_from_now(boost::posix_time::seconds(5)); 

    // Asynchrnously wait for the timers, wrapping the handlers with the chain. 
    timer1.async_wait(all_expired.wrap(
     boost::bind(&handle_timer, boost::asio::placeholders::error, 1))); 
    timer2.async_wait(all_expired.wrap(
     boost::bind(&handle_timer, boost::asio::placeholders::error, 2))); 

    // Run the io_service. 
    io_service.run(); 
} 

次の出力を生成する:

in handle timer: 1 : Success 
in handle timer: 2 : Success 
a 
b 
c 

そして、ここではasync_ops.hppです:

#include <vector> 
#include <boost/asio.hpp> 
#include <boost/bind.hpp> 
#include <boost/bind/protect.hpp> 
#include <boost/enable_shared_from_this.hpp> 
#include <boost/foreach.hpp> 
#include <boost/function.hpp> 
#include <boost/make_shared.hpp> 
#include <boost/range/iterator_range.hpp> 
#include <boost/shared_ptr.hpp> 
#include <boost/thread/locks.hpp> 
#include <boost/thread/mutex.hpp> 
#include <boost/type_traits/is_integral.hpp> 
#include <boost/type_traits/remove_reference.hpp> 
#include <boost/utility/enable_if.hpp> 

class chain; 

namespace detail { 

/// @brief Chained handler connects two handlers together that will 
///  be called sequentially. 
/// 
/// @note Type erasure is not performed on Handler1 to allow resolving 
///  to the correct asio_handler_invoke via ADL. 
template <typename Handler1> 
class chained_handler 
{ 
public: 
    template <typename Handler2> 
    chained_handler(Handler1 handler1, Handler2 handler2) 
    : handler1_(handler1), 
     handler2_(handler2) 
    {} 

    void operator()() 
    { 
    handler1_(); 
    handler2_(); 
    } 

    template <typename Arg1> 
    void operator()(const Arg1& a1) 
    { 
    handler1_(a1); 
    handler2_(); 
    } 

    template <typename Arg1, typename Arg2> 
    void operator()(const Arg1& a1, const Arg2& a2) 
    { 
    handler1_(a1, a2); 
    handler2_(); 
    } 

//private: 
    Handler1 handler1_; 
    boost::function<void()> handler2_; 
}; 

/// @brief Hook that allows the sequential_handler to be invoked 
///  within specific context based on the hander's type. 
template <typename Function, 
      typename Handler> 
void asio_handler_invoke(
    Function function, 
    chained_handler<Handler>* handler) 
{ 
    boost_asio_handler_invoke_helpers::invoke(
    function, handler->handler1_); 
} 

/// @brief No operation. 
void noop() {} 

/// @brief io_service_executor is used to wrap handlers, providing a 
///  deferred posting to an io_service. This allows for chains 
///  to inherit io_services from other chains. 
class io_service_executor 
    : public boost::enable_shared_from_this<io_service_executor> 
{ 
public: 
    /// @brief Constructor. 
    explicit 
    io_service_executor(boost::asio::io_service* io_service) 
    : io_service_(io_service) 
    {} 

    /// @brief Wrap a handler, returning a functor that will post the 
    ///  provided handler into the io_service. 
    /// 
    /// @param handler Handler to be wrapped for deferred posting. 
    /// @return Functor that will post handler into io_service. 
    template <typename Handler> 
    boost::function<void()> wrap(Handler handler) 
    { 
    // By binding to the io_service_exectuer's post, the io_service 
    // into which the handler can be posted can be specified at a later 
    // point in time. 
    return boost::bind(&io_service_executor::post<Handler>, 
         shared_from_this(), handler); 
    } 

    /// @brief Set the io_service. 
    void io_service(boost::asio::io_service* io_service) 
    { 
    io_service_ = io_service; 
    } 

    /// @brief Get the io_service. 
    boost::asio::io_service* io_service() 
    { 
    return io_service_; 
    } 

private: 

    /// @brief Post handler into the io_service. 
    /// 
    /// @param handler The handler to post. 
    template <typename Handler> 
    void post(Handler handler) 
    { 
    io_service_->post(handler); 
    } 

private: 
    boost::asio::io_service* io_service_; 
}; 

/// @brief chain_impl is an implementation for a chain. It is responsible 
///  for lifetime management, tracking posting and wrapped functions, 
///  as well as determining when run criteria has been satisfied. 
class chain_impl 
    : public boost::enable_shared_from_this<chain_impl> 
{ 
public: 

    /// @brief Constructor. 
    chain_impl(boost::shared_ptr<io_service_executor> executor, 
      std::size_t required) 
    : executor_(executor), 
     required_(required) 
    {} 

    /// @brief Destructor will invoke all posted handlers. 
    ~chain_impl() 
    { 
    run(); 
    } 

    /// @brief Post a handler that will be posted into the executor 
    ///  after run criteria has been satisfied. 
    template <typename Handler> 
    void post(const Handler& handler) 
    { 
    deferred_handlers_.push_back(executor_->wrap(handler)); 
    } 

    /// @brief Wrap a handler, returning a chained_handler. The returned 
    ///  handler will notify the impl when it has been invoked. 
    template <typename Handler> 
    chained_handler<Handler> wrap(const Handler& handler) 
    { 
    return chained_handler<Handler>(
     handler,             // handler1 
     boost::bind(&chain_impl::complete, shared_from_this())); // handler2 
    } 

    /// @brief Force run of posted handlers. 
    void run() 
    { 
    boost::unique_lock<boost::mutex> guard(mutex_); 
    run(guard); 
    } 

    /// @brief Get the executor. 
    boost::shared_ptr<io_service_executor> executor() { return executor_; } 

private: 

    /// @brief Completion handler invoked when a wrapped handler has been 
    ///  invoked. 
    void complete() 
    { 
    boost::unique_lock<boost::mutex> guard(mutex_); 

    // Update tracking. 
    if (required_) 
     --required_; 

    // If criteria has not been met, then return early. 
    if (required_) return; 

    // Otherwise, run the handlers. 
    run(guard);  
    } 

    /// @brief Run handlers. 
    void run(boost::unique_lock<boost::mutex>& guard) 
    { 
    // While locked, swap handlers into a temporary. 
    std::vector<boost::function<void()> > handlers; 
    using std::swap; 
    swap(handlers, deferred_handlers_); 

    // Run handlers without mutex. 
    guard.unlock(); 
    BOOST_FOREACH(boost::function<void()>& handler, handlers) 
     handler(); 
    guard.lock(); 
    } 

private: 
    boost::shared_ptr<io_service_executor> executor_; 
    boost::mutex mutex_; 
    std::size_t required_; 
    std::vector<boost::function<void()> > deferred_handlers_; 
}; 

/// @brief Functor used to wrap and post handlers or chains between two 
///  implementations. 
struct wrap_and_post 
{ 
    wrap_and_post(
    boost::shared_ptr<detail::chain_impl> current, 
    boost::shared_ptr<detail::chain_impl> next 
) 
    : current_(current), 
     next_(next) 
    {} 

    /// @brief Wrap a handler with next, then post into current. 
    template <typename Handler> 
    void operator()(Handler handler) 
    { 
    // Wrap the handler with the next implementation, then post into the 
    // current. The wrapped handler will keep next alive, and posting into 
    // current will cause next::complete to be invoked when current is ran. 
    current_->post(next_->wrap(handler)); 
    } 

    /// @brief Wrap an entire chain, posting into the current. 
    void operator()(chain chain); 

private: 
    boost::shared_ptr<detail::chain_impl> current_; 
    boost::shared_ptr<detail::chain_impl> next_; 
}; 

} // namespace detail 

/// @brief Used to indicate that the a chain will inherit its service from an 
///  outer chain. 
class inherit_service_type {}; 
inherit_service_type inherit_service; 

/// @brief Chain represents an asynchronous call chain, allowing the overall 
///  chain to be constructed in a verbose and explicit manner. 
class chain 
{ 
public: 

    /// @brief Constructor. 
    /// 
    /// @param io_service The io_service in which the chain will run. 
    explicit 
    chain(boost::asio::io_service& io_service) 
    : impl_(boost::make_shared<detail::chain_impl>(
       boost::make_shared<detail::io_service_executor>(&io_service), 
       0)), 
     root_impl_(impl_) 
    {} 

    /// @brief Constructor. The chain will inherit its io_service from an 
    ///  outer chain. 
    explicit 
    chain(inherit_service_type) 
    : impl_(boost::make_shared<detail::chain_impl>(
       boost::make_shared<detail::io_service_executor>(
       static_cast<boost::asio::io_service*>(NULL)), 
       0)), 
     root_impl_(impl_) 
    {} 

    /// @brief Force run posted handlers. 
    void run() 
    { 
    root_impl_->run(); 
    } 

    /// @brief Chain link that will complete when the amount of wrapped 
    ///  handlers is equal to required. 
    /// 
    /// @param required The amount of handlers required to be complete. 
    template <typename T> 
    typename boost::enable_if<boost::is_integral< 
    typename boost::remove_reference<T>::type>, chain>::type 
    any(std::size_t required = 1) 
    { 
    return chain(root_impl_, required); 
    } 

    /// @brief Chain link that wraps all handlers in container, and will 
    ///  be complete when the amount of wrapped handlers is equal to 
    ///  required. 
    /// 
    /// @param Container of handlers to wrap. 
    /// @param required The amount of handlers required to be complete. 
    template <typename Container> 
    typename boost::disable_if<boost::is_integral< 
    typename boost::remove_reference<Container>::type>, chain>::type 
    any(const Container& container, 
     std::size_t required = 1) 
    { 
    return post(container, required); 
    } 

    /// @brief Chain link that wraps all handlers in iterator range, and will 
    ///  be complete when the amount of wrapped handlers is equal to 
    ///  required. 
    /// 
    /// @param Container of handlers to wrap. 
    /// @param required The amount of handlers required to be complete. 
    template <typename Iterator> 
    chain any(Iterator begin, Iterator end, 
      std::size_t required = 1) 
    { 
    return any(boost::make_iterator_range(begin, end), required); 
    } 

    /// @brief Chain link that will complete when the amount of wrapped 
    ///  handlers is equal to required. 
    /// 
    /// @param required The amount of handlers required to be complete. 
    template <typename T> 
    typename boost::enable_if<boost::is_integral< 
    typename boost::remove_reference<T>::type>, chain>::type 
    all(T required) 
    { 
    return any<T>(required); 
    } 

    /// @brief Chain link that wraps all handlers in container, and will 
    ///  be complete when all wrapped handlers from the container 
    ///  have been executed. 
    /// 
    /// @param Container of handlers to wrap. 
    template <typename Container> 
    typename boost::disable_if<boost::is_integral< 
    typename boost::remove_reference<Container>::type>, chain>::type 
    all(const Container& container) 
    { 
    return any(container, container.size()); 
    } 

    /// @brief Chain link that wraps all handlers in iterator range, and will 
    ///  be complete when all wrapped handlers from the iterator range 
    ///  have been executed. 
    /// 
    /// @param Container of handlers to wrap. 
    template <typename Iterator> 
    chain all(Iterator begin, Iterator end) 
    { 
    return all(boost::make_iterator_range(begin, end)); 
    } 

    /// @brief Chain link that represents a single sequential link. 
    template <typename Handler> 
    chain then(const Handler& handler) 
    { 
    boost::array<Handler, 1> handlers = {{handler}}; 
    return all(handlers); 
    } 

    /// @brief Wrap a handler, returning a chained_handler. 
    template <typename Handler> 
    detail::chained_handler<Handler> wrap(const Handler& handler) 
    { 
    return impl_->wrap(handler); 
    } 

    /// @brief Set the executor. 
    void executor(boost::asio::io_service& io_service) 
    { 
    impl_->executor()->io_service(&io_service); 
    } 

    /// @brief Check if this chain should inherit its executor. 
    bool inherits_executor() 
    { 
    return !impl_->executor()->io_service(); 
    } 

private: 

    /// @brief Private constructor used to create links in the chain. 
    /// 
    /// @note All links maintain a handle to the root impl. When constructing a 
    ///  chain, this allows for links later in the chain to be stored as 
    ///  non-temporaries. 
    chain(boost::shared_ptr<detail::chain_impl> root_impl, 
     std::size_t required) 
    : impl_(boost::make_shared<detail::chain_impl>(
       root_impl->executor(), required)), 
     root_impl_(root_impl) 
    {} 

    /// @brief Create a new chain link, wrapping handlers and posting into 
    ///  the current chain. 
    template <typename Container> 
    chain post(const Container& container, 
      std::size_t required) 
    { 
    // Create next chain. 
    chain next(root_impl_, required); 

    // Wrap handlers from the next chain, and post into the current chain. 
    std::for_each(container.begin(), container.end(), 
        detail::wrap_and_post(impl_, next.impl_)); 

    return next; 
    } 

private: 
    boost::shared_ptr<detail::chain_impl> impl_; 
    boost::shared_ptr<detail::chain_impl> root_impl_; 
}; 

void detail::wrap_and_post::operator()(chain c) 
{ 
    // If next does not have an executor, then inherit from current. 
    if (c.inherits_executor()) 
     c.executor(*current_->executor()->io_service()); 

    // When current completes, start the chain. 
    current_->post(boost::protect(boost::bind(&chain::run, c))); 

    // The next impl needs to be aware of when the chain stops, so 
    // wrap a noop and append it to the end of the chain. 
    c.then(next_->wrap(&detail::noop)); 
} 

// Convenience functions. 
template <typename T, typename Handler> 
chain async(T& t, const Handler& handler) 
{ 
    return chain(t).then(handler); 
} 

template <typename T, 
      typename Container> 
chain when_all(T& t, const Container& container) 
{ 
    return chain(t).all(container); 
} 

template <typename T, 
      typename Iterator> 
chain when_all(T& t, Iterator begin, Iterator end) 
{ 
    return chain(t).all(begin, end); 
} 

template <typename T, 
      typename Container> 
chain when_any(T& t, const Container& container) 
{ 
    return chain(t).any(container); 
} 

template <typename T, 
      typename Iterator> 
chain when_any(T& t, Iterator begin, Iterator end) 
{ 
    return chain(t).any(begin, end); 
} 

上記のコードを2つのスレッドで使用した例をいくつか紹介します。マイ表記:

  • a -> bb
  • (a | b)aまたはbを表現し、その後aを表現しています。したがって(a | b) -> cは、aまたはbのいずれかが終了した場合、cを実行することを意味します。
  • (a & b)は、aおよびbを表す。従って、(a & b) -> cは、abの両方が終了した場合、cを実行することを意味します。

各ケースの前に、チェーンの表記を印刷します。さらに、各関数は入力時に大文字を出力し、終了時には下位の文字を出力します。

#include <iostream> 
#include <boost/asio.hpp> 
#include <boost/assign.hpp> 
#include <boost/thread.hpp> 
#include "async_ops.hpp" 

/// @brief Print identifiers when entering and exiting scope, 
///  sleeping between. 
void print_and_sleep(char id, unsigned int sleep_time) 
{ 
    std::cout << char(toupper(id)); 
    boost::this_thread::sleep_for(boost::chrono::milliseconds(sleep_time)); 
    std::cout << char(tolower(id)); 
    std::cout.flush(); 
} 

/// @brief Convenience function to create functors. 
boost::function<void()> make_fn(char id, unsigned int sleep_time) 
{ 
    return boost::bind(&print_and_sleep, id, sleep_time); 
} 

/// @brief Run an io_service with multiple threads. 
void run_service(boost::asio::io_service& io_service) 
{ 
    boost::thread_group threads; 
    threads.create_thread(boost::bind(
    &boost::asio::io_service::run, &io_service)); 
    io_service.run(); 
    threads.join_all(); 
} 

int main() 
{ 
    boost::function<void()> a = make_fn('a', 500); 
    boost::function<void()> b = make_fn('b', 1000); 
    boost::function<void()> c = make_fn('c', 500); 
    boost::function<void()> d = make_fn('d', 1000); 
    boost::function<void()> e = make_fn('e', 500); 

    { 
    std::cout << "a -> b -> c\n" 
       " "; 
    boost::asio::io_service io_service; 
    async(io_service, a) 
     .then(b) 
     .then(c); 
    run_service(io_service); 
    std::cout << std::endl; 
    } 

    { 
    std::cout << "(a & b) -> c\n" 
       " "; 
    boost::asio::io_service io_service; 
    when_all(io_service, boost::assign::list_of(a)(b)) 
     .then(c); 
    run_service(io_service); 
    std::cout << std::endl; 
    } 

    { 
    std::cout << "(a | b) -> c\n" 
       " "; 
    boost::asio::io_service io_service; 
    when_any(io_service, boost::assign::list_of(a)(b)) 
     .then(c); 
    run_service(io_service); 
    std::cout << std::endl; 
    } 

    { 
    std::cout << "(a & b) -> (c & d)\n" 
       " "; 
    boost::asio::io_service io_service; 
    when_all(io_service, boost::assign::list_of(a)(b)) 
     .all(boost::assign::list_of(c)(d)); 
    run_service(io_service); 
    std::cout << std::endl; 
    } 

    { 
    std::cout << "(a & b) -> c -> (d & e)\n" 
       " "; 
    boost::asio::io_service io_service; 
    when_all(io_service, boost::assign::list_of(a)(b)) 
     .then(c) 
     .all(boost::assign::list_of(d)(e)); 
    run_service(io_service); 
    std::cout << std::endl; 
    } 

    std::cout << "(a & b) -> (c & d) -> e\n" 
       " "; 
    { 
    boost::asio::io_service io_service; 
    when_all(io_service, boost::assign::list_of(a)(b)) 
     .all(boost::assign::list_of(c)(d)) 
     .then(e); 
    run_service(io_service); 
    std::cout << std::endl; 
    } 

    std::cout << "(a | b) -> (c | d) -> e\n" 
       " "; 
    { 
    boost::asio::io_service io_service; 
    when_any(io_service, boost::assign::list_of(a)(b)) 
     .any(boost::assign::list_of(c)(d)) 
     .then(e); 
    run_service(io_service); 
    std::cout << std::endl; 
    } 

    std::cout << "(a | b) -> (c & d) -> e\n" 
       " "; 
    { 
    boost::asio::io_service io_service; 
    when_any(io_service, boost::assign::list_of(a)(b)) 
     .all(boost::assign::list_of(c)(d)) 
     .then(e); 
    run_service(io_service); 
    std::cout << std::endl; 
    } 

    { 
    std::cout << "a -> ((b -> d) | c) -> e\n" 
       " "; 
    boost::asio::io_service io_service; 
    async(io_service, a) 
     .any(boost::assign::list_of 
      (async(io_service, b).then(d)) 
      (async(inherit_service, c))) 
     .then(e); 
    run_service(io_service); 
    std::cout << std::endl; 
    } 
} 

は、次の出力を生成します。

a -> b -> c 
    AaBbCc 
(a & b) -> c 
    ABabCc 
(a | b) -> c 
    ABaCbc 
(a & b) -> (c & d) 
    ABabCDcd 
(a & b) -> c -> (d & e) 
    ABabCcDEed 
(a & b) -> (c & d) -> e 
    ABabCDcdEe 
(a | b) -> (c | d) -> e 
    ABaCbDcEed 
(a | b) -> (c & d) -> e 
    ABaCbDcdEe 
a -> ((b -> d) | c) -> e 
    AaBCcEbDed 
-1

作業コンセプトの使用はどうですか?作業が利用可能である限り、io_service :: runは実行され、未完了のタスクがなくなるとすぐに作業が削除されます。

は、実行を呼び出す前に、あなたは仕事のインスタンスを作成します。

boost::shared_ptr<boost::asio::io_service::work> work(new boost::asio::io_service::work(io_service)); 

そして、あなたはio_servce ::実行が終了できるようにしたいとあなたの他のスレッドにあなたはできるだけ早く呼び出し

work.reset(); 

http://www.boost.org/doc/libs/1_54_0/doc/html/boost_asio/reference/io_service.html#boost_asio.reference.io_service.stopping_the_io_service_from_running_out_of_work

+0

将来の準備が整ったときにリセットできるpos_infinタイマーを使用する方が良いでしょう。しかし、タイマーへの参照を渡す(共有する)必要があり、実際にはうまく拡張できない侵入型メソッドです – sehe

関連する問題