xref: /openbmc/sdbusplus/src/async/context.cpp (revision d6f15eeb)
1 #include <systemd/sd-bus.h>
2 
3 #include <sdbusplus/async/context.hpp>
4 
5 #include <chrono>
6 
7 namespace sdbusplus::async
8 {
9 
10 context::context(bus_t&& b) : bus(std::move(b))
11 {
12     dbus_source = event_loop.add_io(bus.get_fd(), EPOLLIN, dbus_event_handle,
13                                     this);
14 }
15 
16 namespace details
17 {
18 
19 /* The sd_bus_wait/process completion event.
20  *
21  *  The wait/process handshake is modelled as a Sender with the the worker
22  *  task `co_await`ing Senders over and over.  This class is the completion
23  *  handling for the Sender (to get it back to the Receiver, ie. the worker).
24  */
25 struct wait_process_completion : context_ref, bus::details::bus_friend
26 {
27     explicit wait_process_completion(context& ctx) : context_ref(ctx) {}
28     virtual ~wait_process_completion() = default;
29 
30     // Called by the `caller` to indicate the Sender is completed.
31     virtual void complete() noexcept = 0;
32     // Called by the `caller` to indicate the Sender should be stopped.
33     virtual void stop() noexcept = 0;
34 
35     // Arm the completion event.
36     void arm() noexcept;
37 
38     // Data to share with the worker.
39     event_t::time_resolution timeout{};
40 
41     static auto loop(context& ctx) -> task<>;
42     static void wait_once(context& ctx);
43 };
44 
45 /* The completion template based on receiver type.
46  *
47  * The type of the receivers (typically the co_awaiter) is only known by
48  * a template, so we need a sub-class of completion to hold the receiver.
49  */
50 template <execution::receiver R>
51 struct wait_process_operation : public wait_process_completion
52 {
53     wait_process_operation(context& ctx, R r) :
54         wait_process_completion(ctx), receiver(std::move(r))
55     {}
56 
57     wait_process_operation(wait_process_operation&&) = delete;
58 
59     void complete() noexcept override final
60     {
61         execution::set_value(std::move(this->receiver));
62     }
63 
64     void stop() noexcept override final
65     {
66         // Stop can be called when the context is shutting down,
67         // so treat it as if the receiver completed.
68         execution::set_value(std::move(this->receiver));
69     }
70 
71     friend void tag_invoke(execution::start_t,
72                            wait_process_operation& self) noexcept
73     {
74         self.arm();
75     }
76 
77     R receiver;
78 };
79 
80 /* The sender for the wait/process event. */
81 struct wait_process_sender : public context_ref
82 {
83     using is_sender = void;
84 
85     explicit wait_process_sender(context& ctx) : context_ref(ctx) {}
86 
87     friend auto tag_invoke(execution::get_completion_signatures_t,
88                            const wait_process_sender&, auto)
89         -> execution::completion_signatures<execution::set_value_t()>;
90 
91     template <execution::receiver R>
92     friend auto tag_invoke(execution::connect_t, wait_process_sender&& self,
93                            R r) -> wait_process_operation<R>
94     {
95         // Create the completion for the wait.
96         return {self.ctx, std::move(r)};
97     }
98 };
99 
100 auto wait_process_completion::loop(context& ctx) -> task<>
101 {
102     while (!ctx.final_stop.stop_requested())
103     {
104         // Handle the next sdbus event.
105         co_await wait_process_sender(ctx);
106 
107         // Completion likely happened on the context 'caller' thread, so
108         // we need to switch to the worker thread.
109         co_await execution::schedule(ctx.loop.get_scheduler());
110     }
111 
112     {
113         std::lock_guard lock{ctx.lock};
114         ctx.wait_process_stopped = true;
115     }
116 }
117 
118 } // namespace details
119 
120 context::~context() noexcept(false)
121 {
122     if (worker_thread.joinable())
123     {
124         throw std::logic_error(
125             "sdbusplus::async::context destructed without completion.");
126     }
127 }
128 
129 void context::run()
130 {
131     // Run the primary portion of the run-loop.
132     caller_run();
133 
134     // This should be final_stop...
135 
136     // We need to wait for the pending wait process and stop it.
137     wait_for_wait_process_stopped();
138 
139     // Wait for all the internal tasks to complete.
140     stdexec::sync_wait(internal_tasks.on_empty());
141 
142     // Finish up the loop and join the thread.
143     // (There shouldn't be anything going on by this point anyhow.)
144     loop.finish();
145     if (worker_thread.joinable())
146     {
147         worker_thread.join();
148     }
149 }
150 
151 void context::worker_run()
152 {
153     // Start the sdbus 'wait/process' loop; treat it as an internal task.
154     internal_tasks.spawn(details::wait_process_completion::loop(*this));
155 
156     // Run the execution::run_loop to handle all the tasks.
157     loop.run();
158 }
159 
160 void context::spawn_complete()
161 {
162     {
163         std::lock_guard l{lock};
164         spawn_watcher_running = false;
165     }
166 
167     if (stop_requested())
168     {
169         final_stop.request_stop();
170     }
171 
172     caller_wait.notify_one();
173     event_loop.break_run();
174 }
175 
176 void context::check_stop_requested()
177 {
178     if (stop_requested())
179     {
180         throw std::logic_error(
181             "sdbusplus::async::context spawn called while already stopped.");
182     }
183 }
184 
185 void context::spawn_watcher()
186 {
187     {
188         std::lock_guard l{lock};
189         if (spawn_watcher_running)
190         {
191             return;
192         }
193 
194         spawn_watcher_running = true;
195     }
196 
197     // Spawn the watch for completion / exceptions.
198     internal_tasks.spawn(pending_tasks.on_empty() |
199                          execution::then([this]() { spawn_complete(); }));
200 }
201 
202 void context::caller_run()
203 {
204     // We are able to run the loop until the context is requested to stop or
205     // we get an exception.
206     auto keep_running = [this]() {
207         std::lock_guard l{lock};
208         return !final_stop.stop_requested();
209     };
210 
211     // If we are suppose to keep running, start the run loop.
212     if (keep_running())
213     {
214         // Start up the worker thread.
215         if (!worker_thread.joinable())
216         {
217             worker_thread = std::thread{[this]() { worker_run(); }};
218         }
219         else
220         {
221             // We've already been running and there might a completion pending.
222             // Spawn a new watcher that checks for these.
223             spawn_watcher();
224         }
225 
226         while (keep_running())
227         {
228             // Handle waiting on all the sd-events.
229             details::wait_process_completion::wait_once(*this);
230         }
231     }
232     else
233     {
234         // There might be pending completions still, so spawn a watcher for
235         // them.
236         spawn_watcher();
237     }
238 }
239 
240 void context::wait_for_wait_process_stopped()
241 {
242     auto worker = std::exchange(pending, nullptr);
243     while (worker == nullptr)
244     {
245         std::lock_guard l{lock};
246         if (wait_process_stopped)
247         {
248             break;
249         }
250 
251         worker = std::exchange(staged, nullptr);
252         if (!worker)
253         {
254             std::this_thread::yield();
255         }
256     }
257     if (worker)
258     {
259         worker->stop();
260         wait_process_stopped = true;
261     }
262 }
263 
264 void details::wait_process_completion::arm() noexcept
265 {
266     // Call process.  True indicates something was handled and we do not
267     // need to `wait`, because there might be yet another pending operation
268     // to process, so immediately signal the operation as complete.
269     if (ctx.get_bus().process_discard())
270     {
271         this->complete();
272         return;
273     }
274 
275     // We need to call wait now, get the current timeout and stage ourselves
276     // as the next completion.
277 
278     // Get the bus' timeout.
279     uint64_t to_usec = 0;
280     sd_bus_get_timeout(get_busp(ctx), &to_usec);
281 
282     if (to_usec == UINT64_MAX)
283     {
284         // sd_bus_get_timeout returns UINT64_MAX to indicate 'wait forever'.
285         // Turn this into -1 for sd-event.
286         timeout = std::chrono::microseconds{-1};
287     }
288     else
289     {
290         timeout = std::chrono::microseconds(to_usec);
291     }
292 
293     // Assign ourselves as the pending completion and release the caller.
294     std::lock_guard lock{ctx.lock};
295     ctx.staged = this;
296     ctx.caller_wait.notify_one();
297 }
298 
299 void details::wait_process_completion::wait_once(context& ctx)
300 {
301     // Scope for lock.
302     {
303         std::unique_lock lock{ctx.lock};
304 
305         // If there isn't a completion waiting already, wait on the condition
306         // variable for one to show up (we can't call `poll` yet because we
307         // don't have the required parameters).
308         ctx.caller_wait.wait(lock, [&] {
309             return (ctx.pending != nullptr) || (ctx.staged != nullptr) ||
310                    ctx.final_stop.stop_requested();
311         });
312 
313         // Save the waiter as pending.
314         if (ctx.pending == nullptr)
315         {
316             ctx.pending = std::exchange(ctx.staged, nullptr);
317         }
318     }
319 
320     // Run the event loop to process one request.
321     // If the context has been requested to be stopped, skip the event loop.
322     if (!ctx.final_stop.stop_requested() && ctx.pending)
323     {
324         ctx.event_loop.run_one(ctx.pending->timeout);
325     }
326 }
327 
328 int context::dbus_event_handle(sd_event_source*, int, uint32_t, void* data)
329 {
330     auto self = static_cast<context*>(data);
331 
332     auto pending = std::exchange(self->pending, nullptr);
333     if (pending != nullptr)
334     {
335         pending->complete();
336     }
337 
338     return 0;
339 }
340 
341 } // namespace sdbusplus::async
342