1 #include <systemd/sd-bus.h> 2 3 #include <sdbusplus/async/context.hpp> 4 #include <sdbusplus/async/task.hpp> 5 #include <sdbusplus/async/timer.hpp> 6 7 #include <chrono> 8 9 namespace sdbusplus::async 10 { 11 12 context::context(bus_t&& b) : bus(std::move(b)) 13 { 14 dbus_source = 15 event_loop.add_io(bus.get_fd(), EPOLLIN, dbus_event_handle, this); 16 } 17 18 namespace details 19 { 20 21 /* The sd_bus_wait/process completion event. 22 * 23 * The wait/process handshake is modelled as a Sender with the the worker 24 * task `co_await`ing Senders over and over. This class is the completion 25 * handling for the Sender (to get it back to the Receiver, ie. the worker). 26 */ 27 struct wait_process_completion : context_ref, bus::details::bus_friend 28 { 29 explicit wait_process_completion(context& ctx) : context_ref(ctx) {} 30 virtual ~wait_process_completion() = default; 31 32 // Called by the `caller` to indicate the Sender is completed. 33 virtual void complete() noexcept = 0; 34 // Called by the `caller` to indicate the Sender should be stopped. 35 virtual void stop() noexcept = 0; 36 37 void start() noexcept; 38 39 // Data to share with the worker. 40 event_t::time_resolution timeout{}; 41 42 static auto loop(context& ctx) -> task<>; 43 static void wait_once(context& ctx); 44 }; 45 46 /* The completion template based on receiver type. 47 * 48 * The type of the receivers (typically the co_awaiter) is only known by 49 * a template, so we need a sub-class of completion to hold the receiver. 50 */ 51 template <execution::receiver R> 52 struct wait_process_operation : public wait_process_completion 53 { 54 wait_process_operation(context& ctx, R r) : 55 wait_process_completion(ctx), receiver(std::move(r)) 56 {} 57 58 wait_process_operation(wait_process_operation&&) = delete; 59 60 void complete() noexcept override final 61 { 62 execution::set_value(std::move(this->receiver)); 63 } 64 65 void stop() noexcept override final 66 { 67 // Stop can be called when the context is shutting down, 68 // so treat it as if the receiver completed. 69 execution::set_value(std::move(this->receiver)); 70 } 71 72 R receiver; 73 }; 74 75 /* The sender for the wait/process event. */ 76 struct wait_process_sender : public context_ref 77 { 78 using sender_concept = execution::sender_t; 79 80 explicit wait_process_sender(context& ctx) : context_ref(ctx) {} 81 82 template <typename Self, class... Env> 83 static constexpr auto get_completion_signatures(Self&&, Env&&...) 84 -> execution::completion_signatures<execution::set_value_t()>; 85 86 template <execution::receiver R> 87 auto connect(R r) -> wait_process_operation<R> 88 { 89 // Create the completion for the wait. 90 return {ctx, std::move(r)}; 91 } 92 }; 93 94 auto wait_process_completion::loop(context& ctx) -> task<> 95 { 96 while (!ctx.final_stop.stop_requested()) 97 { 98 // Handle the next sdbus event. Completion likely happened on a 99 // different thread so we need to transfer back to the worker thread. 100 co_await execution::continues_on(wait_process_sender(ctx), 101 ctx.loop.get_scheduler()); 102 } 103 104 { 105 std::lock_guard lock{ctx.lock}; 106 ctx.wait_process_stopped = true; 107 } 108 } 109 110 } // namespace details 111 112 context::~context() noexcept(false) 113 { 114 if (worker_thread.joinable()) 115 { 116 throw std::logic_error( 117 "sdbusplus::async::context destructed without completion."); 118 } 119 } 120 121 void context::run() 122 { 123 // Run the primary portion of the run-loop. 124 caller_run(); 125 126 // This should be final_stop... 127 128 // We need to wait for the pending wait process and stop it. 129 wait_for_wait_process_stopped(); 130 131 // Wait for all the internal tasks to complete. 132 stdexec::sync_wait(internal_tasks.on_empty()); 133 134 // Finish up the loop and join the thread. 135 // (There shouldn't be anything going on by this point anyhow.) 136 loop.finish(); 137 if (worker_thread.joinable()) 138 { 139 worker_thread.join(); 140 } 141 } 142 143 static auto watchdog_loop(sdbusplus::async::context& ctx) -> task<> 144 { 145 auto watchdog_time = 146 std::chrono::microseconds(ctx.get_bus().watchdog_enabled()); 147 if (watchdog_time.count() == 0) 148 { 149 co_return; 150 } 151 152 // Recommended interval is half of WATCHDOG_USEC 153 watchdog_time /= 2; 154 155 while (!ctx.stop_requested()) 156 { 157 ctx.get_bus().watchdog_pet(); 158 co_await sleep_for(ctx, watchdog_time); 159 } 160 } 161 162 void context::worker_run() 163 { 164 internal_tasks.spawn(watchdog_loop(*this)); 165 166 // Start the sdbus 'wait/process' loop; treat it as an internal task. 167 internal_tasks.spawn(details::wait_process_completion::loop(*this)); 168 169 // Run the execution::run_loop to handle all the tasks. 170 loop.run(); 171 } 172 173 void context::spawn_complete() 174 { 175 { 176 std::lock_guard l{lock}; 177 spawn_watcher_running = false; 178 } 179 180 if (stop_requested()) 181 { 182 final_stop.request_stop(); 183 } 184 185 caller_wait.notify_one(); 186 event_loop.break_run(); 187 } 188 189 void context::check_stop_requested() 190 { 191 if (stop_requested()) 192 { 193 throw std::logic_error( 194 "sdbusplus::async::context spawn called while already stopped."); 195 } 196 } 197 198 void context::spawn_watcher() 199 { 200 { 201 std::lock_guard l{lock}; 202 if (spawn_watcher_running) 203 { 204 return; 205 } 206 207 spawn_watcher_running = true; 208 } 209 210 // Spawn the watch for completion / exceptions. 211 internal_tasks.spawn(pending_tasks.on_empty() | 212 execution::then([this]() { spawn_complete(); })); 213 } 214 215 void context::caller_run() 216 { 217 // We are able to run the loop until the context is requested to stop or 218 // we get an exception. 219 auto keep_running = [this]() { 220 std::lock_guard l{lock}; 221 return !final_stop.stop_requested(); 222 }; 223 224 // If we are suppose to keep running, start the run loop. 225 if (keep_running()) 226 { 227 // Start up the worker thread. 228 if (!worker_thread.joinable()) 229 { 230 worker_thread = std::thread{[this]() { worker_run(); }}; 231 } 232 else 233 { 234 // We've already been running and there might a completion pending. 235 // Spawn a new watcher that checks for these. 236 spawn_watcher(); 237 } 238 239 while (keep_running()) 240 { 241 // Handle waiting on all the sd-events. 242 details::wait_process_completion::wait_once(*this); 243 } 244 } 245 else 246 { 247 // There might be pending completions still, so spawn a watcher for 248 // them. 249 spawn_watcher(); 250 } 251 } 252 253 void context::wait_for_wait_process_stopped() 254 { 255 auto worker = std::exchange(pending, nullptr); 256 while (worker == nullptr) 257 { 258 std::lock_guard l{lock}; 259 if (wait_process_stopped) 260 { 261 break; 262 } 263 264 worker = std::exchange(staged, nullptr); 265 if (!worker) 266 { 267 std::this_thread::yield(); 268 } 269 } 270 if (worker) 271 { 272 worker->stop(); 273 wait_process_stopped = true; 274 } 275 } 276 277 void details::wait_process_completion::start() noexcept 278 { 279 // Call process. True indicates something was handled and we do not 280 // need to `wait`, because there might be yet another pending operation 281 // to process, so immediately signal the operation as complete. 282 if (ctx.get_bus().process_discard()) 283 { 284 this->complete(); 285 return; 286 } 287 288 // We need to call wait now, get the current timeout and stage ourselves 289 // as the next completion. 290 291 // Get the bus' timeout. 292 uint64_t to_usec = 0; 293 sd_bus_get_timeout(get_busp(ctx), &to_usec); 294 295 if (to_usec == UINT64_MAX) 296 { 297 // sd_bus_get_timeout returns UINT64_MAX to indicate 'wait forever'. 298 // Turn this into -1 for sd-event. 299 timeout = std::chrono::microseconds{-1}; 300 } 301 else 302 { 303 timeout = std::chrono::microseconds(to_usec); 304 } 305 306 // Assign ourselves as the pending completion and release the caller. 307 std::lock_guard lock{ctx.lock}; 308 ctx.staged = this; 309 ctx.caller_wait.notify_one(); 310 } 311 312 void details::wait_process_completion::wait_once(context& ctx) 313 { 314 // Scope for lock. 315 { 316 std::unique_lock lock{ctx.lock}; 317 318 // If there isn't a completion waiting already, wait on the condition 319 // variable for one to show up (we can't call `poll` yet because we 320 // don't have the required parameters). 321 ctx.caller_wait.wait(lock, [&] { 322 return (ctx.pending != nullptr) || (ctx.staged != nullptr) || 323 ctx.final_stop.stop_requested(); 324 }); 325 326 // Save the waiter as pending. 327 if (ctx.pending == nullptr) 328 { 329 ctx.pending = std::exchange(ctx.staged, nullptr); 330 } 331 } 332 333 // Run the event loop to process one request. 334 // If the context has been requested to be stopped, skip the event loop. 335 if (!ctx.final_stop.stop_requested() && ctx.pending) 336 { 337 ctx.event_loop.run_one(ctx.pending->timeout); 338 } 339 } 340 341 int context::dbus_event_handle(sd_event_source*, int, uint32_t, void* data) 342 { 343 auto self = static_cast<context*>(data); 344 345 auto pending = std::exchange(self->pending, nullptr); 346 if (pending != nullptr) 347 { 348 pending->complete(); 349 } 350 351 return 0; 352 } 353 354 } // namespace sdbusplus::async 355