xref: /openbmc/sdbusplus/src/async/context.cpp (revision f083bc1a)
1 #include <systemd/sd-bus.h>
2 
3 #include <sdbusplus/async/context.hpp>
4 
5 #include <chrono>
6 
7 namespace sdbusplus::async
8 {
9 
10 context::context(bus_t&& b) : bus(std::move(b))
11 {
12     dbus_source =
13         event_loop.add_io(bus.get_fd(), EPOLLIN, dbus_event_handle, this);
14 }
15 
16 namespace details
17 {
18 
19 /* The sd_bus_wait/process completion event.
20  *
21  *  The wait/process handshake is modelled as a Sender with the the worker
22  *  task `co_await`ing Senders over and over.  This class is the completion
23  *  handling for the Sender (to get it back to the Receiver, ie. the worker).
24  */
25 struct wait_process_completion : context_ref, bus::details::bus_friend
26 {
27     explicit wait_process_completion(context& ctx) : context_ref(ctx) {}
28     virtual ~wait_process_completion() = default;
29 
30     // Called by the `caller` to indicate the Sender is completed.
31     virtual void complete() noexcept = 0;
32     // Called by the `caller` to indicate the Sender should be stopped.
33     virtual void stop() noexcept = 0;
34 
35     // Arm the completion event.
36     void arm() noexcept;
37 
38     // Data to share with the worker.
39     event_t::time_resolution timeout{};
40 
41     static auto loop(context& ctx) -> task<>;
42     static void wait_once(context& ctx);
43 };
44 
45 /* The completion template based on receiver type.
46  *
47  * The type of the receivers (typically the co_awaiter) is only known by
48  * a template, so we need a sub-class of completion to hold the receiver.
49  */
50 template <execution::receiver R>
51 struct wait_process_operation : public wait_process_completion
52 {
53     wait_process_operation(context& ctx, R r) :
54         wait_process_completion(ctx), receiver(std::move(r))
55     {}
56 
57     wait_process_operation(wait_process_operation&&) = delete;
58 
59     void complete() noexcept override final
60     {
61         execution::set_value(std::move(this->receiver));
62     }
63 
64     void stop() noexcept override final
65     {
66         // Stop can be called when the context is shutting down,
67         // so treat it as if the receiver completed.
68         execution::set_value(std::move(this->receiver));
69     }
70 
71     friend void tag_invoke(execution::start_t,
72                            wait_process_operation& self) noexcept
73     {
74         self.arm();
75     }
76 
77     R receiver;
78 };
79 
80 /* The sender for the wait/process event. */
81 struct wait_process_sender : public context_ref
82 {
83     using is_sender = void;
84 
85     explicit wait_process_sender(context& ctx) : context_ref(ctx) {}
86 
87     friend auto tag_invoke(
88         execution::get_completion_signatures_t, const wait_process_sender&,
89         auto) -> execution::completion_signatures<execution::set_value_t()>;
90 
91     template <execution::receiver R>
92     friend auto tag_invoke(execution::connect_t, wait_process_sender&& self,
93                            R r) -> wait_process_operation<R>
94     {
95         // Create the completion for the wait.
96         return {self.ctx, std::move(r)};
97     }
98 };
99 
100 auto wait_process_completion::loop(context& ctx) -> task<>
101 {
102     while (!ctx.final_stop.stop_requested())
103     {
104         // Handle the next sdbus event.  Completion likely happened on a
105         // different thread so we need to transfer back to the worker thread.
106         co_await execution::continues_on(wait_process_sender(ctx),
107                                          ctx.loop.get_scheduler());
108     }
109 
110     {
111         std::lock_guard lock{ctx.lock};
112         ctx.wait_process_stopped = true;
113     }
114 }
115 
116 } // namespace details
117 
118 context::~context() noexcept(false)
119 {
120     if (worker_thread.joinable())
121     {
122         throw std::logic_error(
123             "sdbusplus::async::context destructed without completion.");
124     }
125 }
126 
127 void context::run()
128 {
129     // Run the primary portion of the run-loop.
130     caller_run();
131 
132     // This should be final_stop...
133 
134     // We need to wait for the pending wait process and stop it.
135     wait_for_wait_process_stopped();
136 
137     // Wait for all the internal tasks to complete.
138     stdexec::sync_wait(internal_tasks.on_empty());
139 
140     // Finish up the loop and join the thread.
141     // (There shouldn't be anything going on by this point anyhow.)
142     loop.finish();
143     if (worker_thread.joinable())
144     {
145         worker_thread.join();
146     }
147 }
148 
149 void context::worker_run()
150 {
151     // Start the sdbus 'wait/process' loop; treat it as an internal task.
152     internal_tasks.spawn(details::wait_process_completion::loop(*this));
153 
154     // Run the execution::run_loop to handle all the tasks.
155     loop.run();
156 }
157 
158 void context::spawn_complete()
159 {
160     {
161         std::lock_guard l{lock};
162         spawn_watcher_running = false;
163     }
164 
165     if (stop_requested())
166     {
167         final_stop.request_stop();
168     }
169 
170     caller_wait.notify_one();
171     event_loop.break_run();
172 }
173 
174 void context::check_stop_requested()
175 {
176     if (stop_requested())
177     {
178         throw std::logic_error(
179             "sdbusplus::async::context spawn called while already stopped.");
180     }
181 }
182 
183 void context::spawn_watcher()
184 {
185     {
186         std::lock_guard l{lock};
187         if (spawn_watcher_running)
188         {
189             return;
190         }
191 
192         spawn_watcher_running = true;
193     }
194 
195     // Spawn the watch for completion / exceptions.
196     internal_tasks.spawn(pending_tasks.on_empty() |
197                          execution::then([this]() { spawn_complete(); }));
198 }
199 
200 void context::caller_run()
201 {
202     // We are able to run the loop until the context is requested to stop or
203     // we get an exception.
204     auto keep_running = [this]() {
205         std::lock_guard l{lock};
206         return !final_stop.stop_requested();
207     };
208 
209     // If we are suppose to keep running, start the run loop.
210     if (keep_running())
211     {
212         // Start up the worker thread.
213         if (!worker_thread.joinable())
214         {
215             worker_thread = std::thread{[this]() { worker_run(); }};
216         }
217         else
218         {
219             // We've already been running and there might a completion pending.
220             // Spawn a new watcher that checks for these.
221             spawn_watcher();
222         }
223 
224         while (keep_running())
225         {
226             // Handle waiting on all the sd-events.
227             details::wait_process_completion::wait_once(*this);
228         }
229     }
230     else
231     {
232         // There might be pending completions still, so spawn a watcher for
233         // them.
234         spawn_watcher();
235     }
236 }
237 
238 void context::wait_for_wait_process_stopped()
239 {
240     auto worker = std::exchange(pending, nullptr);
241     while (worker == nullptr)
242     {
243         std::lock_guard l{lock};
244         if (wait_process_stopped)
245         {
246             break;
247         }
248 
249         worker = std::exchange(staged, nullptr);
250         if (!worker)
251         {
252             std::this_thread::yield();
253         }
254     }
255     if (worker)
256     {
257         worker->stop();
258         wait_process_stopped = true;
259     }
260 }
261 
262 void details::wait_process_completion::arm() noexcept
263 {
264     // Call process.  True indicates something was handled and we do not
265     // need to `wait`, because there might be yet another pending operation
266     // to process, so immediately signal the operation as complete.
267     if (ctx.get_bus().process_discard())
268     {
269         this->complete();
270         return;
271     }
272 
273     // We need to call wait now, get the current timeout and stage ourselves
274     // as the next completion.
275 
276     // Get the bus' timeout.
277     uint64_t to_usec = 0;
278     sd_bus_get_timeout(get_busp(ctx), &to_usec);
279 
280     if (to_usec == UINT64_MAX)
281     {
282         // sd_bus_get_timeout returns UINT64_MAX to indicate 'wait forever'.
283         // Turn this into -1 for sd-event.
284         timeout = std::chrono::microseconds{-1};
285     }
286     else
287     {
288         timeout = std::chrono::microseconds(to_usec);
289     }
290 
291     // Assign ourselves as the pending completion and release the caller.
292     std::lock_guard lock{ctx.lock};
293     ctx.staged = this;
294     ctx.caller_wait.notify_one();
295 }
296 
297 void details::wait_process_completion::wait_once(context& ctx)
298 {
299     // Scope for lock.
300     {
301         std::unique_lock lock{ctx.lock};
302 
303         // If there isn't a completion waiting already, wait on the condition
304         // variable for one to show up (we can't call `poll` yet because we
305         // don't have the required parameters).
306         ctx.caller_wait.wait(lock, [&] {
307             return (ctx.pending != nullptr) || (ctx.staged != nullptr) ||
308                    ctx.final_stop.stop_requested();
309         });
310 
311         // Save the waiter as pending.
312         if (ctx.pending == nullptr)
313         {
314             ctx.pending = std::exchange(ctx.staged, nullptr);
315         }
316     }
317 
318     // Run the event loop to process one request.
319     // If the context has been requested to be stopped, skip the event loop.
320     if (!ctx.final_stop.stop_requested() && ctx.pending)
321     {
322         ctx.event_loop.run_one(ctx.pending->timeout);
323     }
324 }
325 
326 int context::dbus_event_handle(sd_event_source*, int, uint32_t, void* data)
327 {
328     auto self = static_cast<context*>(data);
329 
330     auto pending = std::exchange(self->pending, nullptr);
331     if (pending != nullptr)
332     {
333         pending->complete();
334     }
335 
336     return 0;
337 }
338 
339 } // namespace sdbusplus::async
340