1 #include <systemd/sd-bus.h>
2
3 #include <sdbusplus/async/context.hpp>
4 #include <sdbusplus/async/task.hpp>
5 #include <sdbusplus/async/timer.hpp>
6
7 #include <chrono>
8
9 namespace sdbusplus::async
10 {
11
context(bus_t && b)12 context::context(bus_t&& b) : bus(std::move(b))
13 {
14 dbus_source =
15 event_loop.add_io(bus.get_fd(), EPOLLIN, dbus_event_handle, this);
16 }
17
18 namespace details
19 {
20
21 /* The sd_bus_wait/process completion event.
22 *
23 * The wait/process handshake is modelled as a Sender with the the worker
24 * task `co_await`ing Senders over and over. This class is the completion
25 * handling for the Sender (to get it back to the Receiver, ie. the worker).
26 */
27 struct wait_process_completion : context_ref, bus::details::bus_friend
28 {
wait_process_completionsdbusplus::async::details::wait_process_completion29 explicit wait_process_completion(context& ctx) : context_ref(ctx) {}
30 virtual ~wait_process_completion() = default;
31
32 // Called by the `caller` to indicate the Sender is completed.
33 virtual void complete() noexcept = 0;
34 // Called by the `caller` to indicate the Sender should be stopped.
35 virtual void stop() noexcept = 0;
36
37 void start() noexcept;
38
39 // Data to share with the worker.
40 event_t::time_resolution timeout{};
41
42 static auto loop(context& ctx) -> task<>;
43 static void wait_once(context& ctx);
44 };
45
46 /* The completion template based on receiver type.
47 *
48 * The type of the receivers (typically the co_awaiter) is only known by
49 * a template, so we need a sub-class of completion to hold the receiver.
50 */
51 template <execution::receiver R>
52 struct wait_process_operation : public wait_process_completion
53 {
wait_process_operationsdbusplus::async::details::wait_process_operation54 wait_process_operation(context& ctx, R r) :
55 wait_process_completion(ctx), receiver(std::move(r))
56 {}
57
58 wait_process_operation(wait_process_operation&&) = delete;
59
completesdbusplus::async::details::wait_process_operation60 void complete() noexcept override final
61 {
62 execution::set_value(std::move(this->receiver));
63 }
64
stopsdbusplus::async::details::wait_process_operation65 void stop() noexcept override final
66 {
67 // Stop can be called when the context is shutting down,
68 // so treat it as if the receiver completed.
69 execution::set_value(std::move(this->receiver));
70 }
71
72 R receiver;
73 };
74
75 /* The sender for the wait/process event. */
76 struct wait_process_sender : public context_ref
77 {
78 using sender_concept = execution::sender_t;
79
wait_process_sendersdbusplus::async::details::wait_process_sender80 explicit wait_process_sender(context& ctx) : context_ref(ctx) {}
81
82 template <typename Self, class... Env>
83 static constexpr auto get_completion_signatures(Self&&, Env&&...)
84 -> execution::completion_signatures<execution::set_value_t()>;
85
86 template <execution::receiver R>
connectsdbusplus::async::details::wait_process_sender87 auto connect(R r) -> wait_process_operation<R>
88 {
89 // Create the completion for the wait.
90 return {ctx, std::move(r)};
91 }
92 };
93
loop(context & ctx)94 auto wait_process_completion::loop(context& ctx) -> task<>
95 {
96 while (!ctx.final_stop.stop_requested())
97 {
98 // Handle the next sdbus event. Completion likely happened on a
99 // different thread so we need to transfer back to the worker thread.
100 co_await execution::continues_on(wait_process_sender(ctx),
101 ctx.loop.get_scheduler());
102 }
103
104 {
105 std::lock_guard lock{ctx.lock};
106 ctx.wait_process_stopped = true;
107 }
108 }
109
110 } // namespace details
111
~context()112 context::~context() noexcept(false)
113 {
114 if (worker_thread.joinable())
115 {
116 throw std::logic_error(
117 "sdbusplus::async::context destructed without completion.");
118 }
119 }
120
run()121 void context::run()
122 {
123 // Run the primary portion of the run-loop.
124 caller_run();
125
126 // This should be final_stop...
127
128 // We need to wait for the pending wait process and stop it.
129 wait_for_wait_process_stopped();
130
131 // Wait for all the internal tasks to complete.
132 stdexec::sync_wait(internal_tasks.on_empty());
133
134 // Finish up the loop and join the thread.
135 // (There shouldn't be anything going on by this point anyhow.)
136 loop.finish();
137 if (worker_thread.joinable())
138 {
139 worker_thread.join();
140 }
141 }
142
watchdog_loop(sdbusplus::async::context & ctx)143 static auto watchdog_loop(sdbusplus::async::context& ctx) -> task<>
144 {
145 auto watchdog_time =
146 std::chrono::microseconds(ctx.get_bus().watchdog_enabled());
147 if (watchdog_time.count() == 0)
148 {
149 co_return;
150 }
151
152 // Recommended interval is half of WATCHDOG_USEC
153 watchdog_time /= 2;
154
155 while (!ctx.stop_requested())
156 {
157 ctx.get_bus().watchdog_pet();
158 co_await sleep_for(ctx, watchdog_time);
159 }
160 }
161
worker_run()162 void context::worker_run()
163 {
164 internal_tasks.spawn(watchdog_loop(*this));
165
166 // Start the sdbus 'wait/process' loop; treat it as an internal task.
167 internal_tasks.spawn(details::wait_process_completion::loop(*this));
168
169 // Run the execution::run_loop to handle all the tasks.
170 loop.run();
171 }
172
spawn_complete()173 void context::spawn_complete()
174 {
175 {
176 std::lock_guard l{lock};
177 spawn_watcher_running = false;
178 }
179
180 if (stop_requested())
181 {
182 final_stop.request_stop();
183 }
184
185 caller_wait.notify_one();
186 event_loop.break_run();
187 }
188
check_stop_requested()189 void context::check_stop_requested()
190 {
191 if (stop_requested())
192 {
193 throw std::logic_error(
194 "sdbusplus::async::context spawn called while already stopped.");
195 }
196 }
197
spawn_watcher()198 void context::spawn_watcher()
199 {
200 {
201 std::lock_guard l{lock};
202 if (spawn_watcher_running)
203 {
204 return;
205 }
206
207 spawn_watcher_running = true;
208 }
209
210 // Spawn the watch for completion / exceptions.
211 internal_tasks.spawn(pending_tasks.on_empty() |
212 execution::then([this]() { spawn_complete(); }));
213 }
214
caller_run()215 void context::caller_run()
216 {
217 // We are able to run the loop until the context is requested to stop or
218 // we get an exception.
219 auto keep_running = [this]() {
220 std::lock_guard l{lock};
221 return !final_stop.stop_requested();
222 };
223
224 // If we are suppose to keep running, start the run loop.
225 if (keep_running())
226 {
227 // Start up the worker thread.
228 if (!worker_thread.joinable())
229 {
230 worker_thread = std::thread{[this]() { worker_run(); }};
231 }
232 else
233 {
234 // We've already been running and there might a completion pending.
235 // Spawn a new watcher that checks for these.
236 spawn_watcher();
237 }
238
239 while (keep_running())
240 {
241 // Handle waiting on all the sd-events.
242 details::wait_process_completion::wait_once(*this);
243 }
244 }
245 else
246 {
247 // There might be pending completions still, so spawn a watcher for
248 // them.
249 spawn_watcher();
250 }
251 }
252
wait_for_wait_process_stopped()253 void context::wait_for_wait_process_stopped()
254 {
255 auto worker = std::exchange(pending, nullptr);
256 while (worker == nullptr)
257 {
258 std::lock_guard l{lock};
259 if (wait_process_stopped)
260 {
261 break;
262 }
263
264 worker = std::exchange(staged, nullptr);
265 if (!worker)
266 {
267 std::this_thread::yield();
268 }
269 }
270 if (worker)
271 {
272 worker->stop();
273 wait_process_stopped = true;
274 }
275 }
276
start()277 void details::wait_process_completion::start() noexcept
278 {
279 // Call process. True indicates something was handled and we do not
280 // need to `wait`, because there might be yet another pending operation
281 // to process, so immediately signal the operation as complete.
282 if (ctx.get_bus().process_discard())
283 {
284 this->complete();
285 return;
286 }
287
288 // We need to call wait now, get the current timeout and stage ourselves
289 // as the next completion.
290
291 // Get the bus' timeout.
292 uint64_t to_usec = 0;
293 sd_bus_get_timeout(get_busp(ctx), &to_usec);
294
295 if (to_usec == UINT64_MAX)
296 {
297 // sd_bus_get_timeout returns UINT64_MAX to indicate 'wait forever'.
298 // Turn this into -1 for sd-event.
299 timeout = std::chrono::microseconds{-1};
300 }
301 else
302 {
303 timeout = std::chrono::microseconds(to_usec);
304 }
305
306 // Assign ourselves as the pending completion and release the caller.
307 std::lock_guard lock{ctx.lock};
308 ctx.staged = this;
309 ctx.caller_wait.notify_one();
310 }
311
wait_once(context & ctx)312 void details::wait_process_completion::wait_once(context& ctx)
313 {
314 // Scope for lock.
315 {
316 std::unique_lock lock{ctx.lock};
317
318 // If there isn't a completion waiting already, wait on the condition
319 // variable for one to show up (we can't call `poll` yet because we
320 // don't have the required parameters).
321 ctx.caller_wait.wait(lock, [&] {
322 return (ctx.pending != nullptr) || (ctx.staged != nullptr) ||
323 ctx.final_stop.stop_requested();
324 });
325
326 // Save the waiter as pending.
327 if (ctx.pending == nullptr)
328 {
329 ctx.pending = std::exchange(ctx.staged, nullptr);
330 }
331 }
332
333 // Run the event loop to process one request.
334 // If the context has been requested to be stopped, skip the event loop.
335 if (!ctx.final_stop.stop_requested() && ctx.pending)
336 {
337 ctx.event_loop.run_one(ctx.pending->timeout);
338 }
339 }
340
dbus_event_handle(sd_event_source *,int,uint32_t,void * data)341 int context::dbus_event_handle(sd_event_source*, int, uint32_t, void* data)
342 {
343 auto self = static_cast<context*>(data);
344
345 auto pending = std::exchange(self->pending, nullptr);
346 if (pending != nullptr)
347 {
348 pending->complete();
349 }
350
351 return 0;
352 }
353
354 } // namespace sdbusplus::async
355