xref: /openbmc/sdbusplus/src/async/context.cpp (revision 97c31c82)
1 #include <systemd/sd-bus.h>
2 
3 #include <sdbusplus/async/context.hpp>
4 
5 #include <chrono>
6 
7 namespace sdbusplus::async
8 {
9 
10 context::context(bus_t&& b) : bus(std::move(b))
11 {
12     dbus_source = event_loop.add_io(bus.get_fd(), EPOLLIN, dbus_event_handle,
13                                     this);
14 }
15 
16 namespace details
17 {
18 
19 /* The sd_bus_wait/process completion event.
20  *
21  *  The wait/process handshake is modelled as a Sender with the the worker
22  *  task `co_await`ing Senders over and over.  This class is the completion
23  *  handling for the Sender (to get it back to the Receiver, ie. the worker).
24  */
25 struct wait_process_completion : bus::details::bus_friend
26 {
27     explicit wait_process_completion(context& ctx) : ctx(ctx) {}
28     virtual ~wait_process_completion() = default;
29 
30     // Called by the `caller` to indicate the Sender is completed.
31     virtual void complete() noexcept = 0;
32     // Called by the `caller` to indicate the Sender should be stopped.
33     virtual void stop() noexcept = 0;
34 
35     // Arm the completion event.
36     void arm() noexcept;
37 
38     // Data to share with the worker.
39     context& ctx;
40     event_t::time_resolution timeout{};
41 
42     // TODO: It seems like we should be able to do a normal `task<>` here
43     //       but spawn on it compile fails.
44     static exec::basic_task<void, exec::__task::__raw_task_context<void>>
45         loop(context& ctx);
46     static void wait_once(context& ctx);
47 };
48 
49 /* The completion template based on receiver type.
50  *
51  * The type of the receivers (typically the co_awaiter) is only known by
52  * a template, so we need a sub-class of completion to hold the receiver.
53  */
54 template <execution::receiver R>
55 struct wait_process_operation : public wait_process_completion
56 {
57     wait_process_operation(context& ctx, R r) :
58         wait_process_completion(ctx), receiver(std::move(r))
59     {}
60 
61     wait_process_operation(wait_process_operation&&) = delete;
62 
63     void complete() noexcept override final
64     {
65         execution::set_value(std::move(this->receiver));
66     }
67 
68     void stop() noexcept override final
69     {
70         // Stop can be called when the context is shutting down,
71         // so treat it as if the receiver completed.
72         execution::set_value(std::move(this->receiver));
73     }
74 
75     friend void tag_invoke(execution::start_t,
76                            wait_process_operation& self) noexcept
77     {
78         self.arm();
79     }
80 
81     R receiver;
82 };
83 
84 /* The sender for the wait/process event. */
85 struct wait_process_sender
86 {
87     explicit wait_process_sender(context& ctx) : ctx(ctx) {}
88 
89     friend auto tag_invoke(execution::get_completion_signatures_t,
90                            const wait_process_sender&, auto)
91         -> execution::completion_signatures<execution::set_value_t()>;
92 
93     template <execution::receiver R>
94     friend auto tag_invoke(execution::connect_t, wait_process_sender&& self,
95                            R r) -> wait_process_operation<R>
96     {
97         // Create the completion for the wait.
98         return {self.ctx, std::move(r)};
99     }
100 
101   private:
102     context& ctx;
103 };
104 
105 exec::basic_task<void, exec::__task::__raw_task_context<void>>
106     wait_process_completion::loop(context& ctx)
107 {
108     while (!ctx.final_stop.stop_requested())
109     {
110         // Handle the next sdbus event.
111         co_await wait_process_sender(ctx);
112 
113         // Completion likely happened on the context 'caller' thread, so
114         // we need to switch to the worker thread.
115         co_await execution::schedule(ctx.loop.get_scheduler());
116     }
117 
118     {
119         std::lock_guard lock{ctx.lock};
120         ctx.wait_process_stopped = true;
121     }
122 }
123 
124 } // namespace details
125 
126 context::~context() noexcept(false)
127 {
128     if (worker_thread.joinable())
129     {
130         throw std::logic_error(
131             "sdbusplus::async::context destructed without completion.");
132     }
133 }
134 
135 void context::run()
136 {
137     // Run the primary portion of the run-loop.
138     caller_run();
139 
140     // Rethrow the pending exception (if it exists).
141     rethrow_pending_exception();
142 
143     // Otherwise this should be final_stop...
144 
145     // We need to wait for the pending wait process and stop it.
146     wait_for_wait_process_stopped();
147 
148     // Wait for all the internal tasks to complete.
149     stdexec::sync_wait(internal_tasks.on_empty());
150 
151     // Finish up the loop and join the thread.
152     // (There shouldn't be anything going on by this point anyhow.)
153     loop.finish();
154     if (worker_thread.joinable())
155     {
156         worker_thread.join();
157     }
158 
159     // Check for one last exception.
160     rethrow_pending_exception();
161 }
162 
163 void context::worker_run()
164 {
165     // Start the sdbus 'wait/process' loop; treat it as an internal task.
166     internal_tasks.spawn(details::wait_process_completion::loop(*this));
167 
168     // Run the execution::run_loop to handle all the tasks.
169     loop.run();
170 }
171 
172 void context::spawn_complete(std::exception_ptr&& e)
173 {
174     {
175         std::lock_guard l{lock};
176         spawn_watcher_running = false;
177 
178         if (e)
179         {
180             pending_exceptions.emplace_back(std::move(e));
181         }
182     }
183 
184     if (stop_requested())
185     {
186         final_stop.request_stop();
187     }
188 
189     caller_wait.notify_one();
190     event_loop.break_run();
191 }
192 
193 void context::check_stop_requested()
194 {
195     if (stop_requested())
196     {
197         throw std::logic_error(
198             "sdbusplus::async::context spawn called while already stopped.");
199     }
200 }
201 
202 void context::spawn_watcher()
203 {
204     {
205         std::lock_guard l{lock};
206         if (spawn_watcher_running)
207         {
208             return;
209         }
210 
211         spawn_watcher_running = true;
212     }
213 
214     // Spawn the watch for completion / exceptions.
215     internal_tasks.spawn(pending_tasks.on_empty() |
216                          execution::then([this]() { spawn_complete(); }));
217 }
218 
219 void context::caller_run()
220 {
221     // We are able to run the loop until the context is requested to stop or
222     // we get an exception.
223     auto keep_running = [this]() {
224         std::lock_guard l{lock};
225         return !final_stop.stop_requested() && pending_exceptions.empty();
226     };
227 
228     // If we are suppose to keep running, start the run loop.
229     if (keep_running())
230     {
231         // Start up the worker thread.
232         if (!worker_thread.joinable())
233         {
234             worker_thread = std::thread{[this]() { worker_run(); }};
235         }
236         else
237         {
238             // We've already been running and there might an exception or
239             // completion pending.  Spawn a new watcher that checks for these.
240             spawn_watcher();
241         }
242 
243         while (keep_running())
244         {
245             // Handle waiting on all the sd-events.
246             details::wait_process_completion::wait_once(*this);
247         }
248     }
249     else
250     {
251         // There might be pending exceptions still, so spawn a watcher for them.
252         spawn_watcher();
253     }
254 }
255 
256 void context::wait_for_wait_process_stopped()
257 {
258     auto worker = std::exchange(pending, nullptr);
259     while (worker == nullptr)
260     {
261         std::lock_guard l{lock};
262         if (wait_process_stopped)
263         {
264             break;
265         }
266 
267         worker = std::exchange(staged, nullptr);
268         if (!worker)
269         {
270             std::this_thread::yield();
271         }
272     }
273     if (worker)
274     {
275         worker->stop();
276         wait_process_stopped = true;
277     }
278 }
279 
280 void context::rethrow_pending_exception()
281 {
282     {
283         std::lock_guard l{lock};
284         if (!pending_exceptions.empty())
285         {
286             auto e = pending_exceptions.front();
287             pending_exceptions.pop_front();
288             std::rethrow_exception(std::move(e));
289         }
290     }
291 }
292 
293 void details::wait_process_completion::arm() noexcept
294 {
295     // Call process.  True indicates something was handled and we do not
296     // need to `wait`, because there might be yet another pending operation
297     // to process, so immediately signal the operation as complete.
298     if (ctx.get_bus().process_discard())
299     {
300         this->complete();
301         return;
302     }
303 
304     // We need to call wait now, get the current timeout and stage ourselves
305     // as the next completion.
306 
307     // Get the bus' timeout.
308     uint64_t to_usec = 0;
309     sd_bus_get_timeout(get_busp(ctx.get_bus()), &to_usec);
310 
311     if (to_usec == UINT64_MAX)
312     {
313         // sd_bus_get_timeout returns UINT64_MAX to indicate 'wait forever'.
314         // Turn this into -1 for sd-event.
315         timeout = std::chrono::microseconds{-1};
316     }
317     else
318     {
319         timeout = std::chrono::microseconds(to_usec);
320     }
321 
322     // Assign ourselves as the pending completion and release the caller.
323     std::lock_guard lock{ctx.lock};
324     ctx.staged = this;
325     ctx.caller_wait.notify_one();
326 }
327 
328 void details::wait_process_completion::wait_once(context& ctx)
329 {
330     // Scope for lock.
331     {
332         std::unique_lock lock{ctx.lock};
333 
334         // If there isn't a completion waiting already, wait on the condition
335         // variable for one to show up (we can't call `poll` yet because we
336         // don't have the required parameters).
337         ctx.caller_wait.wait(lock, [&] {
338             return (ctx.pending != nullptr) || (ctx.staged != nullptr) ||
339                    ctx.final_stop.stop_requested() ||
340                    !ctx.pending_exceptions.empty();
341         });
342 
343         // Save the waiter as pending.
344         if (ctx.pending == nullptr)
345         {
346             ctx.pending = std::exchange(ctx.staged, nullptr);
347         }
348     }
349 
350     // Run the event loop to process one request.
351     // If the context has been requested to be stopped, skip the event loop.
352     if (!ctx.final_stop.stop_requested() && ctx.pending)
353     {
354         ctx.event_loop.run_one(ctx.pending->timeout);
355     }
356 }
357 
358 int context::dbus_event_handle(sd_event_source*, int, uint32_t, void* data)
359 {
360     auto self = static_cast<context*>(data);
361 
362     auto pending = std::exchange(self->pending, nullptr);
363     if (pending != nullptr)
364     {
365         pending->complete();
366     }
367 
368     return 0;
369 }
370 
371 } // namespace sdbusplus::async
372