xref: /openbmc/sdbusplus/src/async/context.cpp (revision 5e36f4ab9b19cebc8a1e9be82ae0c3363838b4ce)
1 #include <systemd/sd-bus.h>
2 
3 #include <sdbusplus/async/context.hpp>
4 #include <sdbusplus/async/task.hpp>
5 #include <sdbusplus/async/timer.hpp>
6 
7 #include <chrono>
8 
9 namespace sdbusplus::async
10 {
11 
context(bus_t && b)12 context::context(bus_t&& b) : bus(std::move(b))
13 {
14     dbus_source =
15         event_loop.add_io(bus.get_fd(), EPOLLIN, dbus_event_handle, this);
16 }
17 
18 namespace details
19 {
20 
21 /* The sd_bus_wait/process completion event.
22  *
23  *  The wait/process handshake is modelled as a Sender with the the worker
24  *  task `co_await`ing Senders over and over.  This class is the completion
25  *  handling for the Sender (to get it back to the Receiver, ie. the worker).
26  */
27 struct wait_process_completion : context_ref, bus::details::bus_friend
28 {
wait_process_completionsdbusplus::async::details::wait_process_completion29     explicit wait_process_completion(context& ctx) : context_ref(ctx) {}
30     virtual ~wait_process_completion() = default;
31 
32     // Called by the `caller` to indicate the Sender is completed.
33     virtual void complete() noexcept = 0;
34     // Called by the `caller` to indicate the Sender should be stopped.
35     virtual void stop() noexcept = 0;
36 
37     void start() noexcept;
38 
39     // Data to share with the worker.
40     event_t::time_resolution timeout{};
41 
42     static auto loop(context& ctx) -> task<>;
43     static void wait_once(context& ctx);
44 };
45 
46 /* The completion template based on receiver type.
47  *
48  * The type of the receivers (typically the co_awaiter) is only known by
49  * a template, so we need a sub-class of completion to hold the receiver.
50  */
51 template <execution::receiver R>
52 struct wait_process_operation : public wait_process_completion
53 {
wait_process_operationsdbusplus::async::details::wait_process_operation54     wait_process_operation(context& ctx, R r) :
55         wait_process_completion(ctx), receiver(std::move(r))
56     {}
57 
58     wait_process_operation(wait_process_operation&&) = delete;
59 
completesdbusplus::async::details::wait_process_operation60     void complete() noexcept override final
61     {
62         execution::set_value(std::move(this->receiver));
63     }
64 
stopsdbusplus::async::details::wait_process_operation65     void stop() noexcept override final
66     {
67         // Stop can be called when the context is shutting down,
68         // so treat it as if the receiver completed.
69         execution::set_value(std::move(this->receiver));
70     }
71 
72     R receiver;
73 };
74 
75 /* The sender for the wait/process event. */
76 struct wait_process_sender : public context_ref
77 {
78     using sender_concept = execution::sender_t;
79 
wait_process_sendersdbusplus::async::details::wait_process_sender80     explicit wait_process_sender(context& ctx) : context_ref(ctx) {}
81 
82     template <typename Self, class... Env>
83     static constexpr auto get_completion_signatures(Self&&, Env&&...)
84         -> execution::completion_signatures<execution::set_value_t()>;
85 
86     template <execution::receiver R>
connectsdbusplus::async::details::wait_process_sender87     auto connect(R r) -> wait_process_operation<R>
88     {
89         // Create the completion for the wait.
90         return {ctx, std::move(r)};
91     }
92 };
93 
loop(context & ctx)94 auto wait_process_completion::loop(context& ctx) -> task<>
95 {
96     while (!ctx.final_stop.stop_requested())
97     {
98         // Handle the next sdbus event.  Completion likely happened on a
99         // different thread so we need to transfer back to the worker thread.
100         co_await execution::continues_on(wait_process_sender(ctx),
101                                          ctx.loop.get_scheduler());
102     }
103 
104     {
105         std::lock_guard lock{ctx.lock};
106         ctx.wait_process_stopped = true;
107     }
108 }
109 
110 } // namespace details
111 
~context()112 context::~context() noexcept(false)
113 {
114     if (worker_thread.joinable())
115     {
116         throw std::logic_error(
117             "sdbusplus::async::context destructed without completion.");
118     }
119 }
120 
run()121 void context::run()
122 {
123     // Run the primary portion of the run-loop.
124     caller_run();
125 
126     // This should be final_stop...
127 
128     // We need to wait for the pending wait process and stop it.
129     wait_for_wait_process_stopped();
130 
131     // Wait for all the internal tasks to complete.
132     stdexec::sync_wait(internal_tasks.on_empty());
133 
134     // Finish up the loop and join the thread.
135     // (There shouldn't be anything going on by this point anyhow.)
136     loop.finish();
137     if (worker_thread.joinable())
138     {
139         worker_thread.join();
140     }
141 }
142 
watchdog_loop(sdbusplus::async::context & ctx)143 static auto watchdog_loop(sdbusplus::async::context& ctx) -> task<>
144 {
145     auto watchdog_time =
146         std::chrono::microseconds(ctx.get_bus().watchdog_enabled());
147     if (watchdog_time.count() == 0)
148     {
149         co_return;
150     }
151 
152     // Recommended interval is half of WATCHDOG_USEC
153     watchdog_time /= 2;
154 
155     while (!ctx.stop_requested())
156     {
157         ctx.get_bus().watchdog_pet();
158         co_await sleep_for(ctx, watchdog_time);
159     }
160 }
161 
worker_run()162 void context::worker_run()
163 {
164     internal_tasks.spawn(watchdog_loop(*this));
165 
166     // Start the sdbus 'wait/process' loop; treat it as an internal task.
167     internal_tasks.spawn(details::wait_process_completion::loop(*this));
168 
169     // Run the execution::run_loop to handle all the tasks.
170     loop.run();
171 }
172 
spawn_complete()173 void context::spawn_complete()
174 {
175     {
176         std::lock_guard l{lock};
177         spawn_watcher_running = false;
178     }
179 
180     if (stop_requested())
181     {
182         final_stop.request_stop();
183     }
184 
185     caller_wait.notify_one();
186     event_loop.break_run();
187 }
188 
check_stop_requested()189 void context::check_stop_requested()
190 {
191     if (stop_requested())
192     {
193         throw std::logic_error(
194             "sdbusplus::async::context spawn called while already stopped.");
195     }
196 }
197 
spawn_watcher()198 void context::spawn_watcher()
199 {
200     {
201         std::lock_guard l{lock};
202         if (spawn_watcher_running)
203         {
204             return;
205         }
206 
207         spawn_watcher_running = true;
208     }
209 
210     // Spawn the watch for completion / exceptions.
211     internal_tasks.spawn(pending_tasks.on_empty() |
212                          execution::then([this]() { spawn_complete(); }));
213 }
214 
caller_run()215 void context::caller_run()
216 {
217     // We are able to run the loop until the context is requested to stop or
218     // we get an exception.
219     auto keep_running = [this]() {
220         std::lock_guard l{lock};
221         return !final_stop.stop_requested();
222     };
223 
224     // If we are suppose to keep running, start the run loop.
225     if (keep_running())
226     {
227         // Start up the worker thread.
228         if (!worker_thread.joinable())
229         {
230             worker_thread = std::thread{[this]() { worker_run(); }};
231         }
232         else
233         {
234             // We've already been running and there might a completion pending.
235             // Spawn a new watcher that checks for these.
236             spawn_watcher();
237         }
238 
239         while (keep_running())
240         {
241             // Handle waiting on all the sd-events.
242             details::wait_process_completion::wait_once(*this);
243         }
244     }
245     else
246     {
247         // There might be pending completions still, so spawn a watcher for
248         // them.
249         spawn_watcher();
250     }
251 }
252 
wait_for_wait_process_stopped()253 void context::wait_for_wait_process_stopped()
254 {
255     auto worker = std::exchange(pending, nullptr);
256     while (worker == nullptr)
257     {
258         std::lock_guard l{lock};
259         if (wait_process_stopped)
260         {
261             break;
262         }
263 
264         worker = std::exchange(staged, nullptr);
265         if (!worker)
266         {
267             std::this_thread::yield();
268         }
269     }
270     if (worker)
271     {
272         worker->stop();
273         wait_process_stopped = true;
274     }
275 }
276 
start()277 void details::wait_process_completion::start() noexcept
278 {
279     // Call process.  True indicates something was handled and we do not
280     // need to `wait`, because there might be yet another pending operation
281     // to process, so immediately signal the operation as complete.
282     if (ctx.get_bus().process_discard())
283     {
284         this->complete();
285         return;
286     }
287 
288     // We need to call wait now, get the current timeout and stage ourselves
289     // as the next completion.
290 
291     // Get the bus' timeout.
292     uint64_t to_usec = 0;
293     sd_bus_get_timeout(get_busp(ctx), &to_usec);
294 
295     if (to_usec == UINT64_MAX)
296     {
297         // sd_bus_get_timeout returns UINT64_MAX to indicate 'wait forever'.
298         // Turn this into -1 for sd-event.
299         timeout = std::chrono::microseconds{-1};
300     }
301     else
302     {
303         timeout = std::chrono::microseconds(to_usec);
304     }
305 
306     // Assign ourselves as the pending completion and release the caller.
307     std::lock_guard lock{ctx.lock};
308     ctx.staged = this;
309     ctx.caller_wait.notify_one();
310 }
311 
wait_once(context & ctx)312 void details::wait_process_completion::wait_once(context& ctx)
313 {
314     // Scope for lock.
315     {
316         std::unique_lock lock{ctx.lock};
317 
318         // If there isn't a completion waiting already, wait on the condition
319         // variable for one to show up (we can't call `poll` yet because we
320         // don't have the required parameters).
321         ctx.caller_wait.wait(lock, [&] {
322             return (ctx.pending != nullptr) || (ctx.staged != nullptr) ||
323                    ctx.final_stop.stop_requested();
324         });
325 
326         // Save the waiter as pending.
327         if (ctx.pending == nullptr)
328         {
329             ctx.pending = std::exchange(ctx.staged, nullptr);
330         }
331     }
332 
333     // Run the event loop to process one request.
334     // If the context has been requested to be stopped, skip the event loop.
335     if (!ctx.final_stop.stop_requested() && ctx.pending)
336     {
337         ctx.event_loop.run_one(ctx.pending->timeout);
338     }
339 }
340 
dbus_event_handle(sd_event_source *,int,uint32_t,void * data)341 int context::dbus_event_handle(sd_event_source*, int, uint32_t, void* data)
342 {
343     auto self = static_cast<context*>(data);
344 
345     auto pending = std::exchange(self->pending, nullptr);
346     if (pending != nullptr)
347     {
348         pending->complete();
349     }
350 
351     return 0;
352 }
353 
354 } // namespace sdbusplus::async
355