xref: /openbmc/sdbusplus/src/async/context.cpp (revision bc1399744a317307a3f9061de32b27681721248b)
1 #include <systemd/sd-bus.h>
2 
3 #include <sdbusplus/async/context.hpp>
4 #include <sdbusplus/async/task.hpp>
5 #include <sdbusplus/async/timer.hpp>
6 
7 #include <chrono>
8 
9 namespace sdbusplus::async
10 {
11 
context(bus_t && b)12 context::context(bus_t&& b) : bus(std::move(b))
13 {
14     dbus_source =
15         event_loop.add_io(bus.get_fd(), EPOLLIN, dbus_event_handle, this);
16 }
17 
18 namespace details
19 {
20 
21 /* The sd_bus_wait/process completion event.
22  *
23  *  The wait/process handshake is modelled as a Sender with the the worker
24  *  task `co_await`ing Senders over and over.  This class is the completion
25  *  handling for the Sender (to get it back to the Receiver, ie. the worker).
26  */
27 struct wait_process_completion : context_ref, bus::details::bus_friend
28 {
wait_process_completionsdbusplus::async::details::wait_process_completion29     explicit wait_process_completion(context& ctx) : context_ref(ctx) {}
30     virtual ~wait_process_completion() = default;
31 
32     // Called by the `caller` to indicate the Sender is completed.
33     virtual void complete() noexcept = 0;
34     // Called by the `caller` to indicate the Sender should be stopped.
35     virtual void stop() noexcept = 0;
36 
37     // Arm the completion event.
38     void arm() noexcept;
39 
40     // Data to share with the worker.
41     event_t::time_resolution timeout{};
42 
43     static auto loop(context& ctx) -> task<>;
44     static void wait_once(context& ctx);
45 };
46 
47 /* The completion template based on receiver type.
48  *
49  * The type of the receivers (typically the co_awaiter) is only known by
50  * a template, so we need a sub-class of completion to hold the receiver.
51  */
52 template <execution::receiver R>
53 struct wait_process_operation : public wait_process_completion
54 {
wait_process_operationsdbusplus::async::details::wait_process_operation55     wait_process_operation(context& ctx, R r) :
56         wait_process_completion(ctx), receiver(std::move(r))
57     {}
58 
59     wait_process_operation(wait_process_operation&&) = delete;
60 
completesdbusplus::async::details::wait_process_operation61     void complete() noexcept override final
62     {
63         execution::set_value(std::move(this->receiver));
64     }
65 
stopsdbusplus::async::details::wait_process_operation66     void stop() noexcept override final
67     {
68         // Stop can be called when the context is shutting down,
69         // so treat it as if the receiver completed.
70         execution::set_value(std::move(this->receiver));
71     }
72 
tag_invoke(execution::start_t,wait_process_operation & self)73     friend void tag_invoke(execution::start_t,
74                            wait_process_operation& self) noexcept
75     {
76         self.arm();
77     }
78 
79     R receiver;
80 };
81 
82 /* The sender for the wait/process event. */
83 struct wait_process_sender : public context_ref
84 {
85     using is_sender = void;
86 
wait_process_sendersdbusplus::async::details::wait_process_sender87     explicit wait_process_sender(context& ctx) : context_ref(ctx) {}
88 
89     friend auto tag_invoke(execution::get_completion_signatures_t,
90                            const wait_process_sender&, auto)
91         -> execution::completion_signatures<execution::set_value_t()>;
92 
93     template <execution::receiver R>
tag_invoke(execution::connect_t,wait_process_sender && self,R r)94     friend auto tag_invoke(execution::connect_t, wait_process_sender&& self,
95                            R r) -> wait_process_operation<R>
96     {
97         // Create the completion for the wait.
98         return {self.ctx, std::move(r)};
99     }
100 };
101 
loop(context & ctx)102 auto wait_process_completion::loop(context& ctx) -> task<>
103 {
104     while (!ctx.final_stop.stop_requested())
105     {
106         // Handle the next sdbus event.  Completion likely happened on a
107         // different thread so we need to transfer back to the worker thread.
108         co_await execution::continues_on(wait_process_sender(ctx),
109                                          ctx.loop.get_scheduler());
110     }
111 
112     {
113         std::lock_guard lock{ctx.lock};
114         ctx.wait_process_stopped = true;
115     }
116 }
117 
118 } // namespace details
119 
~context()120 context::~context() noexcept(false)
121 {
122     if (worker_thread.joinable())
123     {
124         throw std::logic_error(
125             "sdbusplus::async::context destructed without completion.");
126     }
127 }
128 
run()129 void context::run()
130 {
131     // Run the primary portion of the run-loop.
132     caller_run();
133 
134     // This should be final_stop...
135 
136     // We need to wait for the pending wait process and stop it.
137     wait_for_wait_process_stopped();
138 
139     // Wait for all the internal tasks to complete.
140     stdexec::sync_wait(internal_tasks.on_empty());
141 
142     // Finish up the loop and join the thread.
143     // (There shouldn't be anything going on by this point anyhow.)
144     loop.finish();
145     if (worker_thread.joinable())
146     {
147         worker_thread.join();
148     }
149 }
150 
watchdog_loop(sdbusplus::async::context & ctx)151 static auto watchdog_loop(sdbusplus::async::context& ctx) -> task<>
152 {
153     auto watchdog_time =
154         std::chrono::microseconds(ctx.get_bus().watchdog_enabled());
155     if (watchdog_time.count() == 0)
156     {
157         co_return;
158     }
159 
160     // Recommended interval is half of WATCHDOG_USEC
161     watchdog_time /= 2;
162 
163     while (!ctx.stop_requested())
164     {
165         ctx.get_bus().watchdog_pet();
166         co_await sleep_for(ctx, watchdog_time);
167     }
168 }
169 
worker_run()170 void context::worker_run()
171 {
172     internal_tasks.spawn(watchdog_loop(*this));
173 
174     // Start the sdbus 'wait/process' loop; treat it as an internal task.
175     internal_tasks.spawn(details::wait_process_completion::loop(*this));
176 
177     // Run the execution::run_loop to handle all the tasks.
178     loop.run();
179 }
180 
spawn_complete()181 void context::spawn_complete()
182 {
183     {
184         std::lock_guard l{lock};
185         spawn_watcher_running = false;
186     }
187 
188     if (stop_requested())
189     {
190         final_stop.request_stop();
191     }
192 
193     caller_wait.notify_one();
194     event_loop.break_run();
195 }
196 
check_stop_requested()197 void context::check_stop_requested()
198 {
199     if (stop_requested())
200     {
201         throw std::logic_error(
202             "sdbusplus::async::context spawn called while already stopped.");
203     }
204 }
205 
spawn_watcher()206 void context::spawn_watcher()
207 {
208     {
209         std::lock_guard l{lock};
210         if (spawn_watcher_running)
211         {
212             return;
213         }
214 
215         spawn_watcher_running = true;
216     }
217 
218     // Spawn the watch for completion / exceptions.
219     internal_tasks.spawn(pending_tasks.on_empty() |
220                          execution::then([this]() { spawn_complete(); }));
221 }
222 
caller_run()223 void context::caller_run()
224 {
225     // We are able to run the loop until the context is requested to stop or
226     // we get an exception.
227     auto keep_running = [this]() {
228         std::lock_guard l{lock};
229         return !final_stop.stop_requested();
230     };
231 
232     // If we are suppose to keep running, start the run loop.
233     if (keep_running())
234     {
235         // Start up the worker thread.
236         if (!worker_thread.joinable())
237         {
238             worker_thread = std::thread{[this]() { worker_run(); }};
239         }
240         else
241         {
242             // We've already been running and there might a completion pending.
243             // Spawn a new watcher that checks for these.
244             spawn_watcher();
245         }
246 
247         while (keep_running())
248         {
249             // Handle waiting on all the sd-events.
250             details::wait_process_completion::wait_once(*this);
251         }
252     }
253     else
254     {
255         // There might be pending completions still, so spawn a watcher for
256         // them.
257         spawn_watcher();
258     }
259 }
260 
wait_for_wait_process_stopped()261 void context::wait_for_wait_process_stopped()
262 {
263     auto worker = std::exchange(pending, nullptr);
264     while (worker == nullptr)
265     {
266         std::lock_guard l{lock};
267         if (wait_process_stopped)
268         {
269             break;
270         }
271 
272         worker = std::exchange(staged, nullptr);
273         if (!worker)
274         {
275             std::this_thread::yield();
276         }
277     }
278     if (worker)
279     {
280         worker->stop();
281         wait_process_stopped = true;
282     }
283 }
284 
arm()285 void details::wait_process_completion::arm() noexcept
286 {
287     // Call process.  True indicates something was handled and we do not
288     // need to `wait`, because there might be yet another pending operation
289     // to process, so immediately signal the operation as complete.
290     if (ctx.get_bus().process_discard())
291     {
292         this->complete();
293         return;
294     }
295 
296     // We need to call wait now, get the current timeout and stage ourselves
297     // as the next completion.
298 
299     // Get the bus' timeout.
300     uint64_t to_usec = 0;
301     sd_bus_get_timeout(get_busp(ctx), &to_usec);
302 
303     if (to_usec == UINT64_MAX)
304     {
305         // sd_bus_get_timeout returns UINT64_MAX to indicate 'wait forever'.
306         // Turn this into -1 for sd-event.
307         timeout = std::chrono::microseconds{-1};
308     }
309     else
310     {
311         timeout = std::chrono::microseconds(to_usec);
312     }
313 
314     // Assign ourselves as the pending completion and release the caller.
315     std::lock_guard lock{ctx.lock};
316     ctx.staged = this;
317     ctx.caller_wait.notify_one();
318 }
319 
wait_once(context & ctx)320 void details::wait_process_completion::wait_once(context& ctx)
321 {
322     // Scope for lock.
323     {
324         std::unique_lock lock{ctx.lock};
325 
326         // If there isn't a completion waiting already, wait on the condition
327         // variable for one to show up (we can't call `poll` yet because we
328         // don't have the required parameters).
329         ctx.caller_wait.wait(lock, [&] {
330             return (ctx.pending != nullptr) || (ctx.staged != nullptr) ||
331                    ctx.final_stop.stop_requested();
332         });
333 
334         // Save the waiter as pending.
335         if (ctx.pending == nullptr)
336         {
337             ctx.pending = std::exchange(ctx.staged, nullptr);
338         }
339     }
340 
341     // Run the event loop to process one request.
342     // If the context has been requested to be stopped, skip the event loop.
343     if (!ctx.final_stop.stop_requested() && ctx.pending)
344     {
345         ctx.event_loop.run_one(ctx.pending->timeout);
346     }
347 }
348 
dbus_event_handle(sd_event_source *,int,uint32_t,void * data)349 int context::dbus_event_handle(sd_event_source*, int, uint32_t, void* data)
350 {
351     auto self = static_cast<context*>(data);
352 
353     auto pending = std::exchange(self->pending, nullptr);
354     if (pending != nullptr)
355     {
356         pending->complete();
357     }
358 
359     return 0;
360 }
361 
362 } // namespace sdbusplus::async
363