1 // Copyright (c) Benjamin Kietzman (github.com/bkietz) 2 // 3 // Distributed under the Boost Software License, Version 1.0. (See accompanying 4 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 5 6 #ifndef DBUS_QUEUE_HPP 7 #define DBUS_QUEUE_HPP 8 9 #include <deque> 10 #include <functional> 11 #include <boost/asio.hpp> 12 #include <boost/asio/detail/mutex.hpp> 13 14 namespace dbus { 15 namespace detail { 16 17 template <typename Message> 18 class queue { 19 public: 20 typedef ::boost::asio::detail::mutex mutex_type; 21 typedef Message message_type; 22 typedef std::function<void(boost::system::error_code, Message)> handler_type; 23 24 private: 25 boost::asio::io_service& io; 26 mutex_type mutex; 27 std::deque<message_type> messages; 28 std::deque<handler_type> handlers; 29 30 public: 31 queue(boost::asio::io_service& io_service) : io(io_service) {} 32 33 queue(const queue<Message>& m) 34 : io(m.io), messages(m.messages), handlers(m.handlers) { 35 //TODO(ed) acquire the lock before copying messages and handlers 36 } 37 38 private: 39 class closure { 40 handler_type handler_; 41 message_type message_; 42 boost::system::error_code error_; 43 44 public: 45 void operator()() { handler_(error_, message_); } 46 closure(handler_type h, Message m, 47 boost::system::error_code e = boost::system::error_code()) 48 : handler_(h), message_(m), error_(e) {} 49 }; 50 51 public: 52 void push(message_type m) { 53 mutex_type::scoped_lock lock(mutex); 54 if (handlers.empty()) 55 messages.push_back(m); 56 else { 57 handler_type h = handlers.front(); 58 handlers.pop_front(); 59 60 lock.unlock(); 61 62 io.post(closure(h, m)); 63 } 64 } 65 66 template <typename MessageHandler> 67 inline BOOST_ASIO_INITFN_RESULT_TYPE(MessageHandler, 68 void(boost::system::error_code, 69 message_type)) 70 async_pop(BOOST_ASIO_MOVE_ARG(MessageHandler) h) { 71 typedef ::boost::asio::detail::async_result_init< 72 MessageHandler, void(boost::system::error_code, message_type)> 73 init_type; 74 75 mutex_type::scoped_lock lock(mutex); 76 if (messages.empty()) { 77 init_type init(BOOST_ASIO_MOVE_CAST(MessageHandler)(h)); 78 79 handlers.push_back(init.handler); 80 81 lock.unlock(); 82 83 return init.result.get(); 84 85 } else { 86 message_type m = messages.front(); 87 messages.pop_front(); 88 89 lock.unlock(); 90 91 init_type init(BOOST_ASIO_MOVE_CAST(MessageHandler)(h)); 92 93 io.post(closure(init.handler, m)); 94 95 return init.result.get(); 96 } 97 } 98 }; 99 100 } // namespace detail 101 } // namespace dbus 102 103 #endif // DBUS_QUEUE_HPP 104