xref: /openbmc/sdbusplus/src/async/context.cpp (revision 73e278b5)
1 #include <systemd/sd-bus.h>
2 
3 #include <sdbusplus/async/context.hpp>
4 
5 #include <chrono>
6 
7 namespace sdbusplus::async
8 {
9 
10 context::context(bus_t&& b) : bus(std::move(b))
11 {
12     dbus_source = event_loop.add_io(bus.get_fd(), EPOLLIN, dbus_event_handle,
13                                     this);
14 }
15 
16 namespace details
17 {
18 
19 /* The sd_bus_wait/process completion event.
20  *
21  *  The wait/process handshake is modelled as a Sender with the the worker
22  *  task `co_await`ing Senders over and over.  This class is the completion
23  *  handling for the Sender (to get it back to the Receiver, ie. the worker).
24  */
25 struct wait_process_completion : bus::details::bus_friend
26 {
27     explicit wait_process_completion(context& ctx) : ctx(ctx) {}
28     virtual ~wait_process_completion() = default;
29 
30     // Called by the `caller` to indicate the Sender is completed.
31     virtual void complete() noexcept = 0;
32     // Called by the `caller` to indicate the Sender should be stopped.
33     virtual void stop() noexcept = 0;
34 
35     // Arm the completion event.
36     void arm() noexcept;
37 
38     // Data to share with the worker.
39     context& ctx;
40     std::chrono::microseconds timeout{};
41 
42     static task<> loop(context& ctx);
43     static void wait_once(context& ctx);
44 };
45 
46 /* The completion template based on receiver type.
47  *
48  * The type of the receivers (typically the co_awaiter) is only known by
49  * a template, so we need a sub-class of completion to hold the receiver.
50  */
51 template <execution::receiver R>
52 struct wait_process_operation : public wait_process_completion
53 {
54     wait_process_operation(context& ctx, R r) :
55         wait_process_completion(ctx), receiver(std::move(r))
56     {}
57 
58     wait_process_operation(wait_process_operation&&) = delete;
59 
60     void complete() noexcept override final
61     {
62         execution::set_value(std::move(this->receiver));
63     }
64 
65     void stop() noexcept override final
66     {
67         // Stop can be called when the context is shutting down,
68         // so treat it as if the receiver completed.
69         execution::set_value(std::move(this->receiver));
70     }
71 
72     friend void tag_invoke(execution::start_t,
73                            wait_process_operation& self) noexcept
74     {
75         self.arm();
76     }
77 
78     R receiver;
79 };
80 
81 /* The sender for the wait/process event. */
82 struct wait_process_sender
83 {
84     explicit wait_process_sender(context& ctx) : ctx(ctx){};
85 
86     friend auto tag_invoke(execution::get_completion_signatures_t,
87                            const wait_process_sender&, auto)
88         -> execution::completion_signatures<execution::set_value_t()>;
89 
90     template <execution::receiver R>
91     friend auto tag_invoke(execution::connect_t, wait_process_sender&& self,
92                            R r) -> wait_process_operation<R>
93     {
94         // Create the completion for the wait.
95         return {self.ctx, std::move(r)};
96     }
97 
98   private:
99     context& ctx;
100 };
101 
102 task<> wait_process_completion::loop(context& ctx)
103 {
104     while (!ctx.stop_requested())
105     {
106         // Handle the next sdbus event.
107         co_await wait_process_sender(ctx);
108 
109         // Completion likely happened on the context 'caller' thread, so
110         // we need to switch to the worker thread.
111         co_await execution::schedule(ctx.loop.get_scheduler());
112     }
113 }
114 
115 } // namespace details
116 
117 context::~context() noexcept(false)
118 {
119     if (worker_thread.joinable())
120     {
121         throw std::logic_error(
122             "sdbusplus::async::context destructed without completion.");
123     }
124 }
125 
126 bool context::request_stop() noexcept
127 {
128     auto first_stop = stop.request_stop();
129 
130     if (first_stop)
131     {
132         caller_wait.notify_one();
133         event_loop.break_run();
134     }
135 
136     return first_stop;
137 }
138 
139 void context::caller_run(task<> startup)
140 {
141     // Start up the worker thread.
142     worker_thread = std::thread{[this, startup = std::move(startup)]() mutable {
143         worker_run(std::move(startup));
144     }};
145 
146     // Run until the context requested to stop.
147     while (!stop_requested())
148     {
149         // Handle waiting on all the sd-events.
150         details::wait_process_completion::wait_once(*this);
151     }
152 
153     // Stop has been requested, so finish up the loop.
154     loop.finish();
155     if (worker_thread.joinable())
156     {
157         worker_thread.join();
158     }
159 }
160 
161 void context::worker_run(task<> startup)
162 {
163     // Begin the 'startup' task.
164     // This shouldn't start detached because we want to be able to forward
165     // failures back to the 'run'.  execution::ensure_started isn't
166     // implemented yet, so we don't have a lot of other options.
167     execution::start_detached(std::move(startup));
168 
169     // Also start up the sdbus 'wait/process' loop.
170     execution::start_detached(details::wait_process_completion::loop(*this));
171 
172     // Run the execution::run_loop to handle all the tasks.
173     loop.run();
174 }
175 
176 void details::wait_process_completion::arm() noexcept
177 {
178     // Call process.  True indicates something was handled and we do not
179     // need to `wait`, because there might be yet another pending operation
180     // to process, so immediately signal the operation as complete.
181     if (ctx.get_bus().process_discard())
182     {
183         this->complete();
184         return;
185     }
186 
187     // We need to call wait now, get the current timeout and stage ourselves
188     // as the next completion.
189 
190     // Get the bus' timeout.
191     uint64_t to_usec = 0;
192     sd_bus_get_timeout(get_busp(ctx.get_bus()), &to_usec);
193 
194     if (to_usec == UINT64_MAX)
195     {
196         // sd_bus_get_timeout returns UINT64_MAX to indicate 'wait forever'.
197         // Turn this into -1 for sd-event.
198         timeout = std::chrono::microseconds{-1};
199     }
200     else
201     {
202         timeout = std::chrono::microseconds(to_usec);
203     }
204 
205     // Assign ourselves as the pending completion and release the caller.
206     std::lock_guard lock{ctx.lock};
207     ctx.staged = this;
208     ctx.caller_wait.notify_one();
209 }
210 
211 void details::wait_process_completion::wait_once(context& ctx)
212 {
213     // Scope for lock.
214     {
215         std::unique_lock lock{ctx.lock};
216 
217         // If there isn't a completion waiting already, wait on the condition
218         // variable for one to show up (we can't call `poll` yet because we
219         // don't have the required parameters).
220         ctx.caller_wait.wait(lock, [&] {
221             return (ctx.pending != nullptr) || (ctx.staged != nullptr) ||
222                    (ctx.stop_requested());
223         });
224 
225         // Save the waiter as pending.
226         if (ctx.pending == nullptr)
227         {
228             ctx.pending = std::exchange(ctx.staged, nullptr);
229         }
230     }
231 
232     // If the context has been requested to be stopped, exit now instead of
233     // running the context event loop.
234     if (ctx.stop_requested())
235     {
236         return;
237     }
238 
239     // Run the event loop to process one request.
240     ctx.event_loop.run_one(ctx.pending->timeout);
241 
242     // If there is a stop requested, we need to stop the pending operation.
243     if (ctx.stop_requested())
244     {
245         decltype(ctx.pending) pending = nullptr;
246 
247         {
248             std::lock_guard lock{ctx.lock};
249             pending = std::exchange(ctx.pending, nullptr);
250         }
251 
252         // Do the stop outside the lock to prevent potential deadlocks due to
253         // the stop handler running.
254         if (pending != nullptr)
255         {
256             pending->stop();
257         }
258     }
259 }
260 
261 int context::dbus_event_handle(sd_event_source*, int, uint32_t, void* data)
262 {
263     auto self = static_cast<context*>(data);
264 
265     decltype(self->pending) pending = nullptr;
266     {
267         std::lock_guard lock{self->lock};
268         pending = std::exchange(self->pending, nullptr);
269     }
270 
271     // Outside the lock complete the pending operation.
272     //
273     // This can cause the Receiver task (the worker) to start executing (on
274     // this thread!), hence we do not want the lock held in order to avoid
275     // deadlocks.
276     if (pending != nullptr)
277     {
278         if (self->stop_requested())
279         {
280             pending->stop();
281         }
282         else
283         {
284             pending->complete();
285         }
286     }
287 
288     return 0;
289 }
290 
291 } // namespace sdbusplus::async
292