xref: /openbmc/sdbusplus/src/async/context.cpp (revision 9d8ba9473a5f118e854d0ec326a4275ce57b1e7b)
1 #include <systemd/sd-bus.h>
2 
3 #include <sdbusplus/async/context.hpp>
4 
5 #include <chrono>
6 
7 namespace sdbusplus::async
8 {
9 
10 context::context(bus_t&& b) : bus(std::move(b))
11 {
12     dbus_source = event_loop.add_io(bus.get_fd(), EPOLLIN, dbus_event_handle,
13                                     this);
14 }
15 
16 namespace details
17 {
18 
19 /* The sd_bus_wait/process completion event.
20  *
21  *  The wait/process handshake is modelled as a Sender with the the worker
22  *  task `co_await`ing Senders over and over.  This class is the completion
23  *  handling for the Sender (to get it back to the Receiver, ie. the worker).
24  */
25 struct wait_process_completion : bus::details::bus_friend
26 {
27     explicit wait_process_completion(context& ctx) : ctx(ctx) {}
28     virtual ~wait_process_completion() = default;
29 
30     // Called by the `caller` to indicate the Sender is completed.
31     virtual void complete() noexcept = 0;
32     // Called by the `caller` to indicate the Sender should be stopped.
33     virtual void stop() noexcept = 0;
34 
35     // Arm the completion event.
36     void arm() noexcept;
37 
38     // Data to share with the worker.
39     context& ctx;
40     event_t::time_resolution timeout{};
41 
42     // TODO: It seems like we should be able to do a normal `task<>` here
43     //       but spawn on it compile fails.
44     static exec::basic_task<void, exec::__task::__raw_task_context<void>>
45         loop(context& ctx);
46     static void wait_once(context& ctx);
47 };
48 
49 /* The completion template based on receiver type.
50  *
51  * The type of the receivers (typically the co_awaiter) is only known by
52  * a template, so we need a sub-class of completion to hold the receiver.
53  */
54 template <execution::receiver R>
55 struct wait_process_operation : public wait_process_completion
56 {
57     wait_process_operation(context& ctx, R r) :
58         wait_process_completion(ctx), receiver(std::move(r))
59     {}
60 
61     wait_process_operation(wait_process_operation&&) = delete;
62 
63     void complete() noexcept override final
64     {
65         execution::set_value(std::move(this->receiver));
66     }
67 
68     void stop() noexcept override final
69     {
70         // Stop can be called when the context is shutting down,
71         // so treat it as if the receiver completed.
72         execution::set_value(std::move(this->receiver));
73     }
74 
75     friend void tag_invoke(execution::start_t,
76                            wait_process_operation& self) noexcept
77     {
78         self.arm();
79     }
80 
81     R receiver;
82 };
83 
84 /* The sender for the wait/process event. */
85 struct wait_process_sender
86 {
87     using is_sender = void;
88 
89     explicit wait_process_sender(context& ctx) : ctx(ctx) {}
90 
91     friend auto tag_invoke(execution::get_completion_signatures_t,
92                            const wait_process_sender&, auto)
93         -> execution::completion_signatures<execution::set_value_t()>;
94 
95     template <execution::receiver R>
96     friend auto tag_invoke(execution::connect_t, wait_process_sender&& self,
97                            R r) -> wait_process_operation<R>
98     {
99         // Create the completion for the wait.
100         return {self.ctx, std::move(r)};
101     }
102 
103   private:
104     context& ctx;
105 };
106 
107 exec::basic_task<void, exec::__task::__raw_task_context<void>>
108     wait_process_completion::loop(context& ctx)
109 {
110     while (!ctx.final_stop.stop_requested())
111     {
112         // Handle the next sdbus event.
113         co_await wait_process_sender(ctx);
114 
115         // Completion likely happened on the context 'caller' thread, so
116         // we need to switch to the worker thread.
117         co_await execution::schedule(ctx.loop.get_scheduler());
118     }
119 
120     {
121         std::lock_guard lock{ctx.lock};
122         ctx.wait_process_stopped = true;
123     }
124 }
125 
126 } // namespace details
127 
128 context::~context() noexcept(false)
129 {
130     if (worker_thread.joinable())
131     {
132         throw std::logic_error(
133             "sdbusplus::async::context destructed without completion.");
134     }
135 }
136 
137 void context::run()
138 {
139     // Run the primary portion of the run-loop.
140     caller_run();
141 
142     // This should be final_stop...
143 
144     // We need to wait for the pending wait process and stop it.
145     wait_for_wait_process_stopped();
146 
147     // Wait for all the internal tasks to complete.
148     stdexec::sync_wait(internal_tasks.on_empty());
149 
150     // Finish up the loop and join the thread.
151     // (There shouldn't be anything going on by this point anyhow.)
152     loop.finish();
153     if (worker_thread.joinable())
154     {
155         worker_thread.join();
156     }
157 }
158 
159 void context::worker_run()
160 {
161     // Start the sdbus 'wait/process' loop; treat it as an internal task.
162     internal_tasks.spawn(details::wait_process_completion::loop(*this));
163 
164     // Run the execution::run_loop to handle all the tasks.
165     loop.run();
166 }
167 
168 void context::spawn_complete()
169 {
170     {
171         std::lock_guard l{lock};
172         spawn_watcher_running = false;
173     }
174 
175     if (stop_requested())
176     {
177         final_stop.request_stop();
178     }
179 
180     caller_wait.notify_one();
181     event_loop.break_run();
182 }
183 
184 void context::check_stop_requested()
185 {
186     if (stop_requested())
187     {
188         throw std::logic_error(
189             "sdbusplus::async::context spawn called while already stopped.");
190     }
191 }
192 
193 void context::spawn_watcher()
194 {
195     {
196         std::lock_guard l{lock};
197         if (spawn_watcher_running)
198         {
199             return;
200         }
201 
202         spawn_watcher_running = true;
203     }
204 
205     // Spawn the watch for completion / exceptions.
206     internal_tasks.spawn(pending_tasks.on_empty() |
207                          execution::then([this]() { spawn_complete(); }));
208 }
209 
210 void context::caller_run()
211 {
212     // We are able to run the loop until the context is requested to stop or
213     // we get an exception.
214     auto keep_running = [this]() {
215         std::lock_guard l{lock};
216         return !final_stop.stop_requested();
217     };
218 
219     // If we are suppose to keep running, start the run loop.
220     if (keep_running())
221     {
222         // Start up the worker thread.
223         if (!worker_thread.joinable())
224         {
225             worker_thread = std::thread{[this]() { worker_run(); }};
226         }
227         else
228         {
229             // We've already been running and there might a completion pending.
230             // Spawn a new watcher that checks for these.
231             spawn_watcher();
232         }
233 
234         while (keep_running())
235         {
236             // Handle waiting on all the sd-events.
237             details::wait_process_completion::wait_once(*this);
238         }
239     }
240     else
241     {
242         // There might be pending completions still, so spawn a watcher for
243         // them.
244         spawn_watcher();
245     }
246 }
247 
248 void context::wait_for_wait_process_stopped()
249 {
250     auto worker = std::exchange(pending, nullptr);
251     while (worker == nullptr)
252     {
253         std::lock_guard l{lock};
254         if (wait_process_stopped)
255         {
256             break;
257         }
258 
259         worker = std::exchange(staged, nullptr);
260         if (!worker)
261         {
262             std::this_thread::yield();
263         }
264     }
265     if (worker)
266     {
267         worker->stop();
268         wait_process_stopped = true;
269     }
270 }
271 
272 void details::wait_process_completion::arm() noexcept
273 {
274     // Call process.  True indicates something was handled and we do not
275     // need to `wait`, because there might be yet another pending operation
276     // to process, so immediately signal the operation as complete.
277     if (ctx.get_bus().process_discard())
278     {
279         this->complete();
280         return;
281     }
282 
283     // We need to call wait now, get the current timeout and stage ourselves
284     // as the next completion.
285 
286     // Get the bus' timeout.
287     uint64_t to_usec = 0;
288     sd_bus_get_timeout(get_busp(ctx.get_bus()), &to_usec);
289 
290     if (to_usec == UINT64_MAX)
291     {
292         // sd_bus_get_timeout returns UINT64_MAX to indicate 'wait forever'.
293         // Turn this into -1 for sd-event.
294         timeout = std::chrono::microseconds{-1};
295     }
296     else
297     {
298         timeout = std::chrono::microseconds(to_usec);
299     }
300 
301     // Assign ourselves as the pending completion and release the caller.
302     std::lock_guard lock{ctx.lock};
303     ctx.staged = this;
304     ctx.caller_wait.notify_one();
305 }
306 
307 void details::wait_process_completion::wait_once(context& ctx)
308 {
309     // Scope for lock.
310     {
311         std::unique_lock lock{ctx.lock};
312 
313         // If there isn't a completion waiting already, wait on the condition
314         // variable for one to show up (we can't call `poll` yet because we
315         // don't have the required parameters).
316         ctx.caller_wait.wait(lock, [&] {
317             return (ctx.pending != nullptr) || (ctx.staged != nullptr) ||
318                    ctx.final_stop.stop_requested();
319         });
320 
321         // Save the waiter as pending.
322         if (ctx.pending == nullptr)
323         {
324             ctx.pending = std::exchange(ctx.staged, nullptr);
325         }
326     }
327 
328     // Run the event loop to process one request.
329     // If the context has been requested to be stopped, skip the event loop.
330     if (!ctx.final_stop.stop_requested() && ctx.pending)
331     {
332         ctx.event_loop.run_one(ctx.pending->timeout);
333     }
334 }
335 
336 int context::dbus_event_handle(sd_event_source*, int, uint32_t, void* data)
337 {
338     auto self = static_cast<context*>(data);
339 
340     auto pending = std::exchange(self->pending, nullptr);
341     if (pending != nullptr)
342     {
343         pending->complete();
344     }
345 
346     return 0;
347 }
348 
349 } // namespace sdbusplus::async
350