xref: /openbmc/sdbusplus/src/async/context.cpp (revision 1837cd59)
1 #include <systemd/sd-bus.h>
2 
3 #include <sdbusplus/async/context.hpp>
4 
5 #include <chrono>
6 
7 namespace sdbusplus::async
8 {
9 
10 context::context(bus_t&& b) : bus(std::move(b))
11 {
12     dbus_source = event_loop.add_io(bus.get_fd(), EPOLLIN, dbus_event_handle,
13                                     this);
14 }
15 
16 namespace details
17 {
18 
19 /* The sd_bus_wait/process completion event.
20  *
21  *  The wait/process handshake is modelled as a Sender with the the worker
22  *  task `co_await`ing Senders over and over.  This class is the completion
23  *  handling for the Sender (to get it back to the Receiver, ie. the worker).
24  */
25 struct wait_process_completion : bus::details::bus_friend
26 {
27     explicit wait_process_completion(context& ctx) : ctx(ctx) {}
28     virtual ~wait_process_completion() = default;
29 
30     // Called by the `caller` to indicate the Sender is completed.
31     virtual void complete() noexcept = 0;
32     // Called by the `caller` to indicate the Sender should be stopped.
33     virtual void stop() noexcept = 0;
34 
35     // Arm the completion event.
36     void arm() noexcept;
37 
38     // Data to share with the worker.
39     context& ctx;
40     event_t::time_resolution timeout{};
41 
42     static task<> loop(context& ctx);
43     static void wait_once(context& ctx);
44 };
45 
46 /* The completion template based on receiver type.
47  *
48  * The type of the receivers (typically the co_awaiter) is only known by
49  * a template, so we need a sub-class of completion to hold the receiver.
50  */
51 template <execution::receiver R>
52 struct wait_process_operation : public wait_process_completion
53 {
54     wait_process_operation(context& ctx, R r) :
55         wait_process_completion(ctx), receiver(std::move(r))
56     {}
57 
58     wait_process_operation(wait_process_operation&&) = delete;
59 
60     void complete() noexcept override final
61     {
62         execution::set_value(std::move(this->receiver));
63     }
64 
65     void stop() noexcept override final
66     {
67         // Stop can be called when the context is shutting down,
68         // so treat it as if the receiver completed.
69         execution::set_value(std::move(this->receiver));
70     }
71 
72     friend void tag_invoke(execution::start_t,
73                            wait_process_operation& self) noexcept
74     {
75         self.arm();
76     }
77 
78     R receiver;
79 };
80 
81 /* The sender for the wait/process event. */
82 struct wait_process_sender
83 {
84     explicit wait_process_sender(context& ctx) : ctx(ctx) {}
85 
86     friend auto tag_invoke(execution::get_completion_signatures_t,
87                            const wait_process_sender&, auto)
88         -> execution::completion_signatures<execution::set_value_t()>;
89 
90     template <execution::receiver R>
91     friend auto tag_invoke(execution::connect_t, wait_process_sender&& self,
92                            R r) -> wait_process_operation<R>
93     {
94         // Create the completion for the wait.
95         return {self.ctx, std::move(r)};
96     }
97 
98   private:
99     context& ctx;
100 };
101 
102 task<> wait_process_completion::loop(context& ctx)
103 {
104     while (!ctx.final_stop.stop_requested())
105     {
106         // Handle the next sdbus event.
107         co_await wait_process_sender(ctx);
108 
109         // Completion likely happened on the context 'caller' thread, so
110         // we need to switch to the worker thread.
111         co_await execution::schedule(ctx.loop.get_scheduler());
112     }
113 
114     {
115         std::lock_guard lock{ctx.lock};
116         ctx.wait_process_stopped = true;
117     }
118 }
119 
120 } // namespace details
121 
122 context::~context() noexcept(false)
123 {
124     if (worker_thread.joinable())
125     {
126         throw std::logic_error(
127             "sdbusplus::async::context destructed without completion.");
128     }
129 }
130 
131 void context::run()
132 {
133     // Run the primary portion of the run-loop.
134     caller_run();
135 
136     // Rethrow the pending exception (if it exists).
137     rethrow_pending_exception();
138 
139     // Otherwise this should be final_stop...
140 
141     // We need to wait for the pending wait process and stop it.
142     wait_for_wait_process_stopped();
143 
144     // Wait for all the internal tasks to complete.
145     stdexec::this_thread::sync_wait(
146         internal_tasks.empty() | execution::upon_error([&](auto&& e) {
147             pending_exceptions.emplace_back(std::move(e));
148         }));
149 
150     // Finish up the loop and join the thread.
151     // (There shouldn't be anything going on by this point anyhow.)
152     loop.finish();
153     if (worker_thread.joinable())
154     {
155         worker_thread.join();
156     }
157 
158     // Check for one last exception.
159     rethrow_pending_exception();
160 }
161 
162 void context::worker_run()
163 {
164     // Start the sdbus 'wait/process' loop; treat it as an internal task.
165     internal_tasks.spawn(details::wait_process_completion::loop(*this) |
166                          execution::upon_stopped([]() {}));
167 
168     // Run the execution::run_loop to handle all the tasks.
169     loop.run();
170 }
171 
172 void context::spawn_complete(std::exception_ptr&& e)
173 {
174     {
175         std::lock_guard l{lock};
176         spawn_watcher_running = false;
177 
178         if (e)
179         {
180             pending_exceptions.emplace_back(std::move(e));
181         }
182     }
183 
184     if (stop_requested())
185     {
186         final_stop.request_stop();
187     }
188 
189     caller_wait.notify_one();
190     event_loop.break_run();
191 }
192 
193 void context::check_stop_requested()
194 {
195     if (stop_requested())
196     {
197         throw std::logic_error(
198             "sdbusplus::async::context spawn called while already stopped.");
199     }
200 }
201 
202 void context::spawn_watcher()
203 {
204     {
205         std::lock_guard l{lock};
206         if (spawn_watcher_running)
207         {
208             return;
209         }
210 
211         spawn_watcher_running = true;
212     }
213 
214     // Spawn the watch for completion / exceptions.
215     internal_tasks.spawn(pending_tasks.empty() |
216                          execution::then([this]() { spawn_complete(); }) |
217                          execution::upon_error([this](auto&& e) {
218                              spawn_complete(std::move(e));
219                          }));
220 }
221 
222 void context::caller_run()
223 {
224     // We are able to run the loop until the context is requested to stop or
225     // we get an exception.
226     auto keep_running = [this]() {
227         std::lock_guard l{lock};
228         return !final_stop.stop_requested() && pending_exceptions.empty();
229     };
230 
231     // If we are suppose to keep running, start the run loop.
232     if (keep_running())
233     {
234         // Start up the worker thread.
235         if (!worker_thread.joinable())
236         {
237             worker_thread = std::thread{[this]() { worker_run(); }};
238         }
239         else
240         {
241             // We've already been running and there might an exception or
242             // completion pending.  Spawn a new watcher that checks for these.
243             spawn_watcher();
244         }
245 
246         while (keep_running())
247         {
248             // Handle waiting on all the sd-events.
249             details::wait_process_completion::wait_once(*this);
250         }
251     }
252     else
253     {
254         // There might be pending exceptions still, so spawn a watcher for them.
255         spawn_watcher();
256     }
257 }
258 
259 void context::wait_for_wait_process_stopped()
260 {
261     auto worker = std::exchange(pending, nullptr);
262     while (worker == nullptr)
263     {
264         std::lock_guard l{lock};
265         if (wait_process_stopped)
266         {
267             break;
268         }
269 
270         worker = std::exchange(staged, nullptr);
271         if (!worker)
272         {
273             std::this_thread::yield();
274         }
275     }
276     if (worker)
277     {
278         worker->stop();
279         wait_process_stopped = true;
280     }
281 }
282 
283 void context::rethrow_pending_exception()
284 {
285     {
286         std::lock_guard l{lock};
287         if (!pending_exceptions.empty())
288         {
289             auto e = pending_exceptions.front();
290             pending_exceptions.pop_front();
291             std::rethrow_exception(std::move(e));
292         }
293     }
294 }
295 
296 void details::wait_process_completion::arm() noexcept
297 {
298     // Call process.  True indicates something was handled and we do not
299     // need to `wait`, because there might be yet another pending operation
300     // to process, so immediately signal the operation as complete.
301     if (ctx.get_bus().process_discard())
302     {
303         this->complete();
304         return;
305     }
306 
307     // We need to call wait now, get the current timeout and stage ourselves
308     // as the next completion.
309 
310     // Get the bus' timeout.
311     uint64_t to_usec = 0;
312     sd_bus_get_timeout(get_busp(ctx.get_bus()), &to_usec);
313 
314     if (to_usec == UINT64_MAX)
315     {
316         // sd_bus_get_timeout returns UINT64_MAX to indicate 'wait forever'.
317         // Turn this into -1 for sd-event.
318         timeout = std::chrono::microseconds{-1};
319     }
320     else
321     {
322         timeout = std::chrono::microseconds(to_usec);
323     }
324 
325     // Assign ourselves as the pending completion and release the caller.
326     std::lock_guard lock{ctx.lock};
327     ctx.staged = this;
328     ctx.caller_wait.notify_one();
329 }
330 
331 void details::wait_process_completion::wait_once(context& ctx)
332 {
333     // Scope for lock.
334     {
335         std::unique_lock lock{ctx.lock};
336 
337         // If there isn't a completion waiting already, wait on the condition
338         // variable for one to show up (we can't call `poll` yet because we
339         // don't have the required parameters).
340         ctx.caller_wait.wait(lock, [&] {
341             return (ctx.pending != nullptr) || (ctx.staged != nullptr) ||
342                    ctx.final_stop.stop_requested() ||
343                    !ctx.pending_exceptions.empty();
344         });
345 
346         // Save the waiter as pending.
347         if (ctx.pending == nullptr)
348         {
349             ctx.pending = std::exchange(ctx.staged, nullptr);
350         }
351     }
352 
353     // Run the event loop to process one request.
354     // If the context has been requested to be stopped, skip the event loop.
355     if (!ctx.final_stop.stop_requested() && ctx.pending)
356     {
357         ctx.event_loop.run_one(ctx.pending->timeout);
358     }
359 }
360 
361 int context::dbus_event_handle(sd_event_source*, int, uint32_t, void* data)
362 {
363     auto self = static_cast<context*>(data);
364 
365     auto pending = std::exchange(self->pending, nullptr);
366     if (pending != nullptr)
367     {
368         pending->complete();
369     }
370 
371     return 0;
372 }
373 
374 } // namespace sdbusplus::async
375