xref: /openbmc/sdbusplus/src/async/context.cpp (revision 4a9e4221)
1 #include <systemd/sd-bus.h>
2 
3 #include <sdbusplus/async/context.hpp>
4 
5 #include <chrono>
6 
7 namespace sdbusplus::async
8 {
9 
10 context::context(bus_t&& b) : bus(std::move(b))
11 {
12     dbus_source = event_loop.add_io(bus.get_fd(), EPOLLIN, dbus_event_handle,
13                                     this);
14 }
15 
16 namespace details
17 {
18 
19 /* The sd_bus_wait/process completion event.
20  *
21  *  The wait/process handshake is modelled as a Sender with the the worker
22  *  task `co_await`ing Senders over and over.  This class is the completion
23  *  handling for the Sender (to get it back to the Receiver, ie. the worker).
24  */
25 struct wait_process_completion : context_ref, bus::details::bus_friend
26 {
27     explicit wait_process_completion(context& ctx) : context_ref(ctx) {}
28     virtual ~wait_process_completion() = default;
29 
30     // Called by the `caller` to indicate the Sender is completed.
31     virtual void complete() noexcept = 0;
32     // Called by the `caller` to indicate the Sender should be stopped.
33     virtual void stop() noexcept = 0;
34 
35     // Arm the completion event.
36     void arm() noexcept;
37 
38     // Data to share with the worker.
39     event_t::time_resolution timeout{};
40 
41     // TODO: It seems like we should be able to do a normal `task<>` here
42     //       but spawn on it compile fails.
43     static exec::basic_task<void, exec::__task::__raw_task_context<void>>
44         loop(context& ctx);
45     static void wait_once(context& ctx);
46 };
47 
48 /* The completion template based on receiver type.
49  *
50  * The type of the receivers (typically the co_awaiter) is only known by
51  * a template, so we need a sub-class of completion to hold the receiver.
52  */
53 template <execution::receiver R>
54 struct wait_process_operation : public wait_process_completion
55 {
56     wait_process_operation(context& ctx, R r) :
57         wait_process_completion(ctx), receiver(std::move(r))
58     {}
59 
60     wait_process_operation(wait_process_operation&&) = delete;
61 
62     void complete() noexcept override final
63     {
64         execution::set_value(std::move(this->receiver));
65     }
66 
67     void stop() noexcept override final
68     {
69         // Stop can be called when the context is shutting down,
70         // so treat it as if the receiver completed.
71         execution::set_value(std::move(this->receiver));
72     }
73 
74     friend void tag_invoke(execution::start_t,
75                            wait_process_operation& self) noexcept
76     {
77         self.arm();
78     }
79 
80     R receiver;
81 };
82 
83 /* The sender for the wait/process event. */
84 struct wait_process_sender : public context_ref
85 {
86     using is_sender = void;
87 
88     explicit wait_process_sender(context& ctx) : context_ref(ctx) {}
89 
90     friend auto tag_invoke(execution::get_completion_signatures_t,
91                            const wait_process_sender&, auto)
92         -> execution::completion_signatures<execution::set_value_t()>;
93 
94     template <execution::receiver R>
95     friend auto tag_invoke(execution::connect_t, wait_process_sender&& self,
96                            R r) -> wait_process_operation<R>
97     {
98         // Create the completion for the wait.
99         return {self.ctx, std::move(r)};
100     }
101 };
102 
103 exec::basic_task<void, exec::__task::__raw_task_context<void>>
104     wait_process_completion::loop(context& ctx)
105 {
106     while (!ctx.final_stop.stop_requested())
107     {
108         // Handle the next sdbus event.
109         co_await wait_process_sender(ctx);
110 
111         // Completion likely happened on the context 'caller' thread, so
112         // we need to switch to the worker thread.
113         co_await execution::schedule(ctx.loop.get_scheduler());
114     }
115 
116     {
117         std::lock_guard lock{ctx.lock};
118         ctx.wait_process_stopped = true;
119     }
120 }
121 
122 } // namespace details
123 
124 context::~context() noexcept(false)
125 {
126     if (worker_thread.joinable())
127     {
128         throw std::logic_error(
129             "sdbusplus::async::context destructed without completion.");
130     }
131 }
132 
133 void context::run()
134 {
135     // Run the primary portion of the run-loop.
136     caller_run();
137 
138     // This should be final_stop...
139 
140     // We need to wait for the pending wait process and stop it.
141     wait_for_wait_process_stopped();
142 
143     // Wait for all the internal tasks to complete.
144     stdexec::sync_wait(internal_tasks.on_empty());
145 
146     // Finish up the loop and join the thread.
147     // (There shouldn't be anything going on by this point anyhow.)
148     loop.finish();
149     if (worker_thread.joinable())
150     {
151         worker_thread.join();
152     }
153 }
154 
155 void context::worker_run()
156 {
157     // Start the sdbus 'wait/process' loop; treat it as an internal task.
158     internal_tasks.spawn(details::wait_process_completion::loop(*this));
159 
160     // Run the execution::run_loop to handle all the tasks.
161     loop.run();
162 }
163 
164 void context::spawn_complete()
165 {
166     {
167         std::lock_guard l{lock};
168         spawn_watcher_running = false;
169     }
170 
171     if (stop_requested())
172     {
173         final_stop.request_stop();
174     }
175 
176     caller_wait.notify_one();
177     event_loop.break_run();
178 }
179 
180 void context::check_stop_requested()
181 {
182     if (stop_requested())
183     {
184         throw std::logic_error(
185             "sdbusplus::async::context spawn called while already stopped.");
186     }
187 }
188 
189 void context::spawn_watcher()
190 {
191     {
192         std::lock_guard l{lock};
193         if (spawn_watcher_running)
194         {
195             return;
196         }
197 
198         spawn_watcher_running = true;
199     }
200 
201     // Spawn the watch for completion / exceptions.
202     internal_tasks.spawn(pending_tasks.on_empty() |
203                          execution::then([this]() { spawn_complete(); }));
204 }
205 
206 void context::caller_run()
207 {
208     // We are able to run the loop until the context is requested to stop or
209     // we get an exception.
210     auto keep_running = [this]() {
211         std::lock_guard l{lock};
212         return !final_stop.stop_requested();
213     };
214 
215     // If we are suppose to keep running, start the run loop.
216     if (keep_running())
217     {
218         // Start up the worker thread.
219         if (!worker_thread.joinable())
220         {
221             worker_thread = std::thread{[this]() { worker_run(); }};
222         }
223         else
224         {
225             // We've already been running and there might a completion pending.
226             // Spawn a new watcher that checks for these.
227             spawn_watcher();
228         }
229 
230         while (keep_running())
231         {
232             // Handle waiting on all the sd-events.
233             details::wait_process_completion::wait_once(*this);
234         }
235     }
236     else
237     {
238         // There might be pending completions still, so spawn a watcher for
239         // them.
240         spawn_watcher();
241     }
242 }
243 
244 void context::wait_for_wait_process_stopped()
245 {
246     auto worker = std::exchange(pending, nullptr);
247     while (worker == nullptr)
248     {
249         std::lock_guard l{lock};
250         if (wait_process_stopped)
251         {
252             break;
253         }
254 
255         worker = std::exchange(staged, nullptr);
256         if (!worker)
257         {
258             std::this_thread::yield();
259         }
260     }
261     if (worker)
262     {
263         worker->stop();
264         wait_process_stopped = true;
265     }
266 }
267 
268 void details::wait_process_completion::arm() noexcept
269 {
270     // Call process.  True indicates something was handled and we do not
271     // need to `wait`, because there might be yet another pending operation
272     // to process, so immediately signal the operation as complete.
273     if (ctx.get_bus().process_discard())
274     {
275         this->complete();
276         return;
277     }
278 
279     // We need to call wait now, get the current timeout and stage ourselves
280     // as the next completion.
281 
282     // Get the bus' timeout.
283     uint64_t to_usec = 0;
284     sd_bus_get_timeout(get_busp(ctx.get_bus()), &to_usec);
285 
286     if (to_usec == UINT64_MAX)
287     {
288         // sd_bus_get_timeout returns UINT64_MAX to indicate 'wait forever'.
289         // Turn this into -1 for sd-event.
290         timeout = std::chrono::microseconds{-1};
291     }
292     else
293     {
294         timeout = std::chrono::microseconds(to_usec);
295     }
296 
297     // Assign ourselves as the pending completion and release the caller.
298     std::lock_guard lock{ctx.lock};
299     ctx.staged = this;
300     ctx.caller_wait.notify_one();
301 }
302 
303 void details::wait_process_completion::wait_once(context& ctx)
304 {
305     // Scope for lock.
306     {
307         std::unique_lock lock{ctx.lock};
308 
309         // If there isn't a completion waiting already, wait on the condition
310         // variable for one to show up (we can't call `poll` yet because we
311         // don't have the required parameters).
312         ctx.caller_wait.wait(lock, [&] {
313             return (ctx.pending != nullptr) || (ctx.staged != nullptr) ||
314                    ctx.final_stop.stop_requested();
315         });
316 
317         // Save the waiter as pending.
318         if (ctx.pending == nullptr)
319         {
320             ctx.pending = std::exchange(ctx.staged, nullptr);
321         }
322     }
323 
324     // Run the event loop to process one request.
325     // If the context has been requested to be stopped, skip the event loop.
326     if (!ctx.final_stop.stop_requested() && ctx.pending)
327     {
328         ctx.event_loop.run_one(ctx.pending->timeout);
329     }
330 }
331 
332 int context::dbus_event_handle(sd_event_source*, int, uint32_t, void* data)
333 {
334     auto self = static_cast<context*>(data);
335 
336     auto pending = std::exchange(self->pending, nullptr);
337     if (pending != nullptr)
338     {
339         pending->complete();
340     }
341 
342     return 0;
343 }
344 
345 } // namespace sdbusplus::async
346