xref: /openbmc/sdbusplus/src/async/context.cpp (revision 3c242ba4c3839ea6dc64c67edb87d1a84d43968a)
1 #include <systemd/sd-bus.h>
2 
3 #include <sdbusplus/async/context.hpp>
4 
5 #include <chrono>
6 
7 namespace sdbusplus::async
8 {
9 
10 context::context(bus_t&& b) : bus(std::move(b))
11 {
12     dbus_source = event_loop.add_io(bus.get_fd(), EPOLLIN, dbus_event_handle,
13                                     this);
14 }
15 
16 namespace details
17 {
18 
19 /* The sd_bus_wait/process completion event.
20  *
21  *  The wait/process handshake is modelled as a Sender with the the worker
22  *  task `co_await`ing Senders over and over.  This class is the completion
23  *  handling for the Sender (to get it back to the Receiver, ie. the worker).
24  */
25 struct wait_process_completion : bus::details::bus_friend
26 {
27     explicit wait_process_completion(context& ctx) : ctx(ctx) {}
28     virtual ~wait_process_completion() = default;
29 
30     // Called by the `caller` to indicate the Sender is completed.
31     virtual void complete() noexcept = 0;
32     // Called by the `caller` to indicate the Sender should be stopped.
33     virtual void stop() noexcept = 0;
34 
35     // Arm the completion event.
36     void arm() noexcept;
37 
38     // Data to share with the worker.
39     context& ctx;
40     event_t::time_resolution timeout{};
41 
42     static task<> loop(context& ctx);
43     static void wait_once(context& ctx);
44 };
45 
46 /* The completion template based on receiver type.
47  *
48  * The type of the receivers (typically the co_awaiter) is only known by
49  * a template, so we need a sub-class of completion to hold the receiver.
50  */
51 template <execution::receiver R>
52 struct wait_process_operation : public wait_process_completion
53 {
54     wait_process_operation(context& ctx, R r) :
55         wait_process_completion(ctx), receiver(std::move(r))
56     {}
57 
58     wait_process_operation(wait_process_operation&&) = delete;
59 
60     void complete() noexcept override final
61     {
62         execution::set_value(std::move(this->receiver));
63     }
64 
65     void stop() noexcept override final
66     {
67         // Stop can be called when the context is shutting down,
68         // so treat it as if the receiver completed.
69         execution::set_value(std::move(this->receiver));
70     }
71 
72     friend void tag_invoke(execution::start_t,
73                            wait_process_operation& self) noexcept
74     {
75         self.arm();
76     }
77 
78     R receiver;
79 };
80 
81 /* The sender for the wait/process event. */
82 struct wait_process_sender
83 {
84     explicit wait_process_sender(context& ctx) : ctx(ctx){};
85 
86     friend auto tag_invoke(execution::get_completion_signatures_t,
87                            const wait_process_sender&, auto)
88         -> execution::completion_signatures<execution::set_value_t()>;
89 
90     template <execution::receiver R>
91     friend auto tag_invoke(execution::connect_t, wait_process_sender&& self,
92                            R r) -> wait_process_operation<R>
93     {
94         // Create the completion for the wait.
95         return {self.ctx, std::move(r)};
96     }
97 
98   private:
99     context& ctx;
100 };
101 
102 task<> wait_process_completion::loop(context& ctx)
103 {
104     while (!ctx.final_stop.stop_requested())
105     {
106         // Handle the next sdbus event.
107         co_await wait_process_sender(ctx);
108 
109         // Completion likely happened on the context 'caller' thread, so
110         // we need to switch to the worker thread.
111         co_await execution::schedule(ctx.loop.get_scheduler());
112     }
113 }
114 
115 } // namespace details
116 
117 context::~context() noexcept(false)
118 {
119     if (worker_thread.joinable())
120     {
121         throw std::logic_error(
122             "sdbusplus::async::context destructed without completion.");
123     }
124 }
125 
126 bool context::request_stop() noexcept
127 {
128     auto first_stop = initial_stop.request_stop();
129 
130     if (first_stop)
131     {
132         // Now that the workers have been requested to stop, we need to wait
133         // until they all drain and then stop the internal tasks.
134 
135         auto complete = [this]() {
136             final_stop.request_stop();
137             caller_wait.notify_one();
138             event_loop.break_run();
139         };
140 
141         internal_tasks.spawn(pending_tasks.empty() | execution::then(complete) |
142                              execution::upon_error([=](auto&& e) {
143                                  complete();
144                                  std::rethrow_exception(e);
145                              }));
146     }
147 
148     return first_stop;
149 }
150 
151 void context::run()
152 {
153     // Start up the worker thread.
154     worker_thread = std::thread{[this]() { worker_run(); }};
155 
156     // Run until the context requested to stop.
157     while (!final_stop.stop_requested())
158     {
159         // Handle waiting on all the sd-events.
160         details::wait_process_completion::wait_once(*this);
161     }
162 
163     std::exception_ptr pending_exception{};
164 
165     // Wait for all the internal tasks to complete.
166     std::this_thread::sync_wait(internal_tasks.empty() |
167                                 execution::upon_error([&](auto&& e) {
168                                     pending_exception = std::move(e);
169                                 }));
170 
171     // Stop has been requested, so finish up the loop.
172     loop.finish();
173     if (worker_thread.joinable())
174     {
175         worker_thread.join();
176     }
177 
178     // If there was an exception inside the context, rethrow it.
179     if (pending_exception)
180     {
181         std::rethrow_exception(std::move(pending_exception));
182     }
183 }
184 
185 void context::worker_run()
186 {
187     // Start the sdbus 'wait/process' loop; treat it as an internal task.
188     internal_tasks.spawn(details::wait_process_completion::loop(*this));
189 
190     // Run the execution::run_loop to handle all the tasks.
191     loop.run();
192 }
193 
194 void details::wait_process_completion::arm() noexcept
195 {
196     // Call process.  True indicates something was handled and we do not
197     // need to `wait`, because there might be yet another pending operation
198     // to process, so immediately signal the operation as complete.
199     if (ctx.get_bus().process_discard())
200     {
201         this->complete();
202         return;
203     }
204 
205     // We need to call wait now, get the current timeout and stage ourselves
206     // as the next completion.
207 
208     // Get the bus' timeout.
209     uint64_t to_usec = 0;
210     sd_bus_get_timeout(get_busp(ctx.get_bus()), &to_usec);
211 
212     if (to_usec == UINT64_MAX)
213     {
214         // sd_bus_get_timeout returns UINT64_MAX to indicate 'wait forever'.
215         // Turn this into -1 for sd-event.
216         timeout = std::chrono::microseconds{-1};
217     }
218     else
219     {
220         timeout = std::chrono::microseconds(to_usec);
221     }
222 
223     // Assign ourselves as the pending completion and release the caller.
224     std::lock_guard lock{ctx.lock};
225     ctx.staged = this;
226     ctx.caller_wait.notify_one();
227 }
228 
229 void details::wait_process_completion::wait_once(context& ctx)
230 {
231     // Scope for lock.
232     {
233         std::unique_lock lock{ctx.lock};
234 
235         // If there isn't a completion waiting already, wait on the condition
236         // variable for one to show up (we can't call `poll` yet because we
237         // don't have the required parameters).
238         ctx.caller_wait.wait(lock, [&] {
239             return (ctx.pending != nullptr) || (ctx.staged != nullptr) ||
240                    (ctx.final_stop.stop_requested());
241         });
242 
243         // Save the waiter as pending.
244         if (ctx.pending == nullptr)
245         {
246             ctx.pending = std::exchange(ctx.staged, nullptr);
247         }
248     }
249 
250     // If the context has been requested to be stopped, exit now instead of
251     // running the context event loop.
252     if (ctx.final_stop.stop_requested())
253     {
254         return;
255     }
256 
257     // Run the event loop to process one request.
258     ctx.event_loop.run_one(ctx.pending->timeout);
259 
260     // If there is a stop requested, we need to stop the pending operation.
261     if (ctx.final_stop.stop_requested())
262     {
263         decltype(ctx.pending) pending = nullptr;
264 
265         {
266             std::lock_guard lock{ctx.lock};
267             pending = std::exchange(ctx.pending, nullptr);
268         }
269 
270         // Do the stop outside the lock to prevent potential deadlocks due to
271         // the stop handler running.
272         if (pending != nullptr)
273         {
274             pending->stop();
275         }
276     }
277 }
278 
279 int context::dbus_event_handle(sd_event_source*, int, uint32_t, void* data)
280 {
281     auto self = static_cast<context*>(data);
282 
283     decltype(self->pending) pending = nullptr;
284     {
285         std::lock_guard lock{self->lock};
286         pending = std::exchange(self->pending, nullptr);
287     }
288 
289     // Outside the lock complete the pending operation.
290     //
291     // This can cause the Receiver task (the worker) to start executing (on
292     // this thread!), hence we do not want the lock held in order to avoid
293     // deadlocks.
294     if (pending != nullptr)
295     {
296         if (self->final_stop.stop_requested())
297         {
298             pending->stop();
299         }
300         else
301         {
302             pending->complete();
303         }
304     }
305 
306     return 0;
307 }
308 
309 } // namespace sdbusplus::async
310