1 // Copyright (c) Benjamin Kietzman (github.com/bkietz)
2 //
3 // Distributed under the Boost Software License, Version 1.0. (See accompanying
4 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5 
6 #ifndef DBUS_QUEUE_HPP
7 #define DBUS_QUEUE_HPP
8 
9 #include <deque>
10 #include <functional>
11 #include <boost/asio.hpp>
12 #include <boost/asio/detail/mutex.hpp>
13 
14 namespace dbus {
15 namespace detail {
16 
17 template <typename Message>
18 class queue {
19  public:
20   typedef ::boost::asio::detail::mutex mutex_type;
21   typedef Message message_type;
22   typedef std::function<void(boost::system::error_code, Message)> handler_type;
23 
24  private:
25   boost::asio::io_service& io;
26   mutex_type mutex;
27   std::deque<message_type> messages;
28   std::deque<handler_type> handlers;
29 
30  public:
31   queue(boost::asio::io_service& io_service) : io(io_service) {}
32 
33   queue(const queue<Message>& m)
34       : io(m.io), messages(m.messages), handlers(m.handlers) {
35         //TODO(ed) acquire the lock before copying messages and handlers
36       }
37 
38  private:
39   class closure {
40     handler_type handler_;
41     message_type message_;
42     boost::system::error_code error_;
43 
44    public:
45     void operator()() { handler_(error_, message_); }
46     closure(handler_type h, Message m,
47             boost::system::error_code e = boost::system::error_code())
48         : handler_(h), message_(m), error_(e) {}
49   };
50 
51  public:
52   void push(message_type m) {
53     mutex_type::scoped_lock lock(mutex);
54     if (handlers.empty())
55       messages.push_back(m);
56     else {
57       handler_type h = handlers.front();
58       handlers.pop_front();
59 
60       lock.unlock();
61 
62       io.post(closure(h, m));
63     }
64   }
65 
66   template <typename MessageHandler>
67   inline BOOST_ASIO_INITFN_RESULT_TYPE(MessageHandler,
68                                        void(boost::system::error_code,
69                                             message_type))
70       async_pop(BOOST_ASIO_MOVE_ARG(MessageHandler) h) {
71     typedef ::boost::asio::detail::async_result_init<
72         MessageHandler, void(boost::system::error_code, message_type)>
73         init_type;
74 
75     mutex_type::scoped_lock lock(mutex);
76     if (messages.empty()) {
77       init_type init(BOOST_ASIO_MOVE_CAST(MessageHandler)(h));
78 
79       handlers.push_back(init.handler);
80 
81       lock.unlock();
82 
83       return init.result.get();
84 
85     } else {
86       message_type m = messages.front();
87       messages.pop_front();
88 
89       lock.unlock();
90 
91       init_type init(BOOST_ASIO_MOVE_CAST(MessageHandler)(h));
92 
93       io.post(closure(init.handler, m));
94 
95       return init.result.get();
96     }
97   }
98 };
99 
100 }  // namespace detail
101 }  // namespace dbus
102 
103 #endif  // DBUS_QUEUE_HPP
104