174187667SPatrick Williams #include <systemd/sd-bus.h>
274187667SPatrick Williams
374187667SPatrick Williams #include <sdbusplus/async/context.hpp>
474187667SPatrick Williams
573e278b5SPatrick Williams #include <chrono>
673e278b5SPatrick Williams
774187667SPatrick Williams namespace sdbusplus::async
874187667SPatrick Williams {
974187667SPatrick Williams
context(bus_t && b)1073e278b5SPatrick Williams context::context(bus_t&& b) : bus(std::move(b))
1173e278b5SPatrick Williams {
1206f265f6SPatrick Williams dbus_source =
1306f265f6SPatrick Williams event_loop.add_io(bus.get_fd(), EPOLLIN, dbus_event_handle, this);
1473e278b5SPatrick Williams }
1573e278b5SPatrick Williams
1674187667SPatrick Williams namespace details
1774187667SPatrick Williams {
1874187667SPatrick Williams
1974187667SPatrick Williams /* The sd_bus_wait/process completion event.
2074187667SPatrick Williams *
2174187667SPatrick Williams * The wait/process handshake is modelled as a Sender with the the worker
2274187667SPatrick Williams * task `co_await`ing Senders over and over. This class is the completion
2374187667SPatrick Williams * handling for the Sender (to get it back to the Receiver, ie. the worker).
2474187667SPatrick Williams */
254a9e4221SPatrick Williams struct wait_process_completion : context_ref, bus::details::bus_friend
2674187667SPatrick Williams {
wait_process_completionsdbusplus::async::details::wait_process_completion274a9e4221SPatrick Williams explicit wait_process_completion(context& ctx) : context_ref(ctx) {}
2874187667SPatrick Williams virtual ~wait_process_completion() = default;
2974187667SPatrick Williams
3074187667SPatrick Williams // Called by the `caller` to indicate the Sender is completed.
3174187667SPatrick Williams virtual void complete() noexcept = 0;
3273e278b5SPatrick Williams // Called by the `caller` to indicate the Sender should be stopped.
3373e278b5SPatrick Williams virtual void stop() noexcept = 0;
3474187667SPatrick Williams
3574187667SPatrick Williams // Arm the completion event.
3674187667SPatrick Williams void arm() noexcept;
3774187667SPatrick Williams
3874187667SPatrick Williams // Data to share with the worker.
39435eb1bdSPatrick Williams event_t::time_resolution timeout{};
4074187667SPatrick Williams
41bbc181e3SPatrick Williams static auto loop(context& ctx) -> task<>;
4274187667SPatrick Williams static void wait_once(context& ctx);
4374187667SPatrick Williams };
4474187667SPatrick Williams
4574187667SPatrick Williams /* The completion template based on receiver type.
4674187667SPatrick Williams *
4774187667SPatrick Williams * The type of the receivers (typically the co_awaiter) is only known by
4874187667SPatrick Williams * a template, so we need a sub-class of completion to hold the receiver.
4974187667SPatrick Williams */
5074187667SPatrick Williams template <execution::receiver R>
5174187667SPatrick Williams struct wait_process_operation : public wait_process_completion
5274187667SPatrick Williams {
wait_process_operationsdbusplus::async::details::wait_process_operation5374187667SPatrick Williams wait_process_operation(context& ctx, R r) :
5474187667SPatrick Williams wait_process_completion(ctx), receiver(std::move(r))
5574187667SPatrick Williams {}
5674187667SPatrick Williams
5774187667SPatrick Williams wait_process_operation(wait_process_operation&&) = delete;
5874187667SPatrick Williams
completesdbusplus::async::details::wait_process_operation5974187667SPatrick Williams void complete() noexcept override final
6074187667SPatrick Williams {
6174187667SPatrick Williams execution::set_value(std::move(this->receiver));
6274187667SPatrick Williams }
6374187667SPatrick Williams
stopsdbusplus::async::details::wait_process_operation6473e278b5SPatrick Williams void stop() noexcept override final
6573e278b5SPatrick Williams {
6673e278b5SPatrick Williams // Stop can be called when the context is shutting down,
6773e278b5SPatrick Williams // so treat it as if the receiver completed.
6873e278b5SPatrick Williams execution::set_value(std::move(this->receiver));
6973e278b5SPatrick Williams }
7073e278b5SPatrick Williams
tag_invoke(execution::start_t,wait_process_operation & self)7174187667SPatrick Williams friend void tag_invoke(execution::start_t,
7274187667SPatrick Williams wait_process_operation& self) noexcept
7374187667SPatrick Williams {
7474187667SPatrick Williams self.arm();
7574187667SPatrick Williams }
7674187667SPatrick Williams
7774187667SPatrick Williams R receiver;
7874187667SPatrick Williams };
7974187667SPatrick Williams
8074187667SPatrick Williams /* The sender for the wait/process event. */
814a9e4221SPatrick Williams struct wait_process_sender : public context_ref
8274187667SPatrick Williams {
839c6ec9b3SPatrick Williams using is_sender = void;
849c6ec9b3SPatrick Williams
wait_process_sendersdbusplus::async::details::wait_process_sender854a9e4221SPatrick Williams explicit wait_process_sender(context& ctx) : context_ref(ctx) {}
8674187667SPatrick Williams
87*36137e09SPatrick Williams friend auto tag_invoke(execution::get_completion_signatures_t,
88*36137e09SPatrick Williams const wait_process_sender&, auto)
89*36137e09SPatrick Williams -> execution::completion_signatures<execution::set_value_t()>;
9074187667SPatrick Williams
9174187667SPatrick Williams template <execution::receiver R>
tag_invoke(execution::connect_t,wait_process_sender && self,R r)9274187667SPatrick Williams friend auto tag_invoke(execution::connect_t, wait_process_sender&& self,
9374187667SPatrick Williams R r) -> wait_process_operation<R>
9474187667SPatrick Williams {
9574187667SPatrick Williams // Create the completion for the wait.
9674187667SPatrick Williams return {self.ctx, std::move(r)};
9774187667SPatrick Williams }
9874187667SPatrick Williams };
9974187667SPatrick Williams
loop(context & ctx)100bbc181e3SPatrick Williams auto wait_process_completion::loop(context& ctx) -> task<>
10174187667SPatrick Williams {
10278e436feSPatrick Williams while (!ctx.final_stop.stop_requested())
10374187667SPatrick Williams {
104e8e6631bSPatrick Williams // Handle the next sdbus event. Completion likely happened on a
105e8e6631bSPatrick Williams // different thread so we need to transfer back to the worker thread.
106f083bc1aSPatrick Williams co_await execution::continues_on(wait_process_sender(ctx),
107e8e6631bSPatrick Williams ctx.loop.get_scheduler());
10874187667SPatrick Williams }
10910483c94SPatrick Williams
11010483c94SPatrick Williams {
11110483c94SPatrick Williams std::lock_guard lock{ctx.lock};
11210483c94SPatrick Williams ctx.wait_process_stopped = true;
11310483c94SPatrick Williams }
11474187667SPatrick Williams }
11574187667SPatrick Williams
11674187667SPatrick Williams } // namespace details
11774187667SPatrick Williams
~context()11874187667SPatrick Williams context::~context() noexcept(false)
11974187667SPatrick Williams {
12074187667SPatrick Williams if (worker_thread.joinable())
12174187667SPatrick Williams {
12274187667SPatrick Williams throw std::logic_error(
12374187667SPatrick Williams "sdbusplus::async::context destructed without completion.");
12474187667SPatrick Williams }
12574187667SPatrick Williams }
12674187667SPatrick Williams
run()1273c242ba4SPatrick Williams void context::run()
12874187667SPatrick Williams {
12910483c94SPatrick Williams // Run the primary portion of the run-loop.
13010483c94SPatrick Williams caller_run();
13174187667SPatrick Williams
1325d16a8edSPatrick Williams // This should be final_stop...
13310483c94SPatrick Williams
13410483c94SPatrick Williams // We need to wait for the pending wait process and stop it.
13510483c94SPatrick Williams wait_for_wait_process_stopped();
1364cfc284dSPatrick Williams
13778e436feSPatrick Williams // Wait for all the internal tasks to complete.
13897c31c82SPatrick Williams stdexec::sync_wait(internal_tasks.on_empty());
139c5b5ff57SPatrick Williams
14010483c94SPatrick Williams // Finish up the loop and join the thread.
14110483c94SPatrick Williams // (There shouldn't be anything going on by this point anyhow.)
14274187667SPatrick Williams loop.finish();
14374187667SPatrick Williams if (worker_thread.joinable())
14474187667SPatrick Williams {
14574187667SPatrick Williams worker_thread.join();
14674187667SPatrick Williams }
14774187667SPatrick Williams }
14874187667SPatrick Williams
worker_run()1493c242ba4SPatrick Williams void context::worker_run()
15074187667SPatrick Williams {
1513c242ba4SPatrick Williams // Start the sdbus 'wait/process' loop; treat it as an internal task.
15297c31c82SPatrick Williams internal_tasks.spawn(details::wait_process_completion::loop(*this));
15374187667SPatrick Williams
15474187667SPatrick Williams // Run the execution::run_loop to handle all the tasks.
15574187667SPatrick Williams loop.run();
15674187667SPatrick Williams }
15774187667SPatrick Williams
spawn_complete()1585d16a8edSPatrick Williams void context::spawn_complete()
15910483c94SPatrick Williams {
16010483c94SPatrick Williams {
16110483c94SPatrick Williams std::lock_guard l{lock};
16210483c94SPatrick Williams spawn_watcher_running = false;
16310483c94SPatrick Williams }
16410483c94SPatrick Williams
16510483c94SPatrick Williams if (stop_requested())
16610483c94SPatrick Williams {
16710483c94SPatrick Williams final_stop.request_stop();
16810483c94SPatrick Williams }
16910483c94SPatrick Williams
17010483c94SPatrick Williams caller_wait.notify_one();
17110483c94SPatrick Williams event_loop.break_run();
17210483c94SPatrick Williams }
17310483c94SPatrick Williams
check_stop_requested()17410483c94SPatrick Williams void context::check_stop_requested()
17510483c94SPatrick Williams {
17610483c94SPatrick Williams if (stop_requested())
17710483c94SPatrick Williams {
17810483c94SPatrick Williams throw std::logic_error(
17910483c94SPatrick Williams "sdbusplus::async::context spawn called while already stopped.");
18010483c94SPatrick Williams }
18110483c94SPatrick Williams }
18210483c94SPatrick Williams
spawn_watcher()18310483c94SPatrick Williams void context::spawn_watcher()
18410483c94SPatrick Williams {
18510483c94SPatrick Williams {
18610483c94SPatrick Williams std::lock_guard l{lock};
18710483c94SPatrick Williams if (spawn_watcher_running)
18810483c94SPatrick Williams {
18910483c94SPatrick Williams return;
19010483c94SPatrick Williams }
19110483c94SPatrick Williams
19210483c94SPatrick Williams spawn_watcher_running = true;
19310483c94SPatrick Williams }
19410483c94SPatrick Williams
19510483c94SPatrick Williams // Spawn the watch for completion / exceptions.
19697c31c82SPatrick Williams internal_tasks.spawn(pending_tasks.on_empty() |
19797c31c82SPatrick Williams execution::then([this]() { spawn_complete(); }));
19810483c94SPatrick Williams }
19910483c94SPatrick Williams
caller_run()20010483c94SPatrick Williams void context::caller_run()
20110483c94SPatrick Williams {
20210483c94SPatrick Williams // We are able to run the loop until the context is requested to stop or
20310483c94SPatrick Williams // we get an exception.
20410483c94SPatrick Williams auto keep_running = [this]() {
20510483c94SPatrick Williams std::lock_guard l{lock};
2065d16a8edSPatrick Williams return !final_stop.stop_requested();
20710483c94SPatrick Williams };
20810483c94SPatrick Williams
20910483c94SPatrick Williams // If we are suppose to keep running, start the run loop.
21010483c94SPatrick Williams if (keep_running())
21110483c94SPatrick Williams {
21210483c94SPatrick Williams // Start up the worker thread.
21310483c94SPatrick Williams if (!worker_thread.joinable())
21410483c94SPatrick Williams {
21510483c94SPatrick Williams worker_thread = std::thread{[this]() { worker_run(); }};
21610483c94SPatrick Williams }
21710483c94SPatrick Williams else
21810483c94SPatrick Williams {
2195d16a8edSPatrick Williams // We've already been running and there might a completion pending.
2205d16a8edSPatrick Williams // Spawn a new watcher that checks for these.
22110483c94SPatrick Williams spawn_watcher();
22210483c94SPatrick Williams }
22310483c94SPatrick Williams
22410483c94SPatrick Williams while (keep_running())
22510483c94SPatrick Williams {
22610483c94SPatrick Williams // Handle waiting on all the sd-events.
22710483c94SPatrick Williams details::wait_process_completion::wait_once(*this);
22810483c94SPatrick Williams }
22910483c94SPatrick Williams }
23010483c94SPatrick Williams else
23110483c94SPatrick Williams {
2325d16a8edSPatrick Williams // There might be pending completions still, so spawn a watcher for
2335d16a8edSPatrick Williams // them.
23410483c94SPatrick Williams spawn_watcher();
23510483c94SPatrick Williams }
23610483c94SPatrick Williams }
23710483c94SPatrick Williams
wait_for_wait_process_stopped()23810483c94SPatrick Williams void context::wait_for_wait_process_stopped()
23910483c94SPatrick Williams {
24010483c94SPatrick Williams auto worker = std::exchange(pending, nullptr);
24110483c94SPatrick Williams while (worker == nullptr)
24210483c94SPatrick Williams {
24310483c94SPatrick Williams std::lock_guard l{lock};
24410483c94SPatrick Williams if (wait_process_stopped)
24510483c94SPatrick Williams {
24610483c94SPatrick Williams break;
24710483c94SPatrick Williams }
24810483c94SPatrick Williams
24910483c94SPatrick Williams worker = std::exchange(staged, nullptr);
25010483c94SPatrick Williams if (!worker)
25110483c94SPatrick Williams {
25210483c94SPatrick Williams std::this_thread::yield();
25310483c94SPatrick Williams }
25410483c94SPatrick Williams }
25510483c94SPatrick Williams if (worker)
25610483c94SPatrick Williams {
25710483c94SPatrick Williams worker->stop();
25810483c94SPatrick Williams wait_process_stopped = true;
25910483c94SPatrick Williams }
26010483c94SPatrick Williams }
26110483c94SPatrick Williams
arm()26274187667SPatrick Williams void details::wait_process_completion::arm() noexcept
26374187667SPatrick Williams {
26474187667SPatrick Williams // Call process. True indicates something was handled and we do not
26574187667SPatrick Williams // need to `wait`, because there might be yet another pending operation
26674187667SPatrick Williams // to process, so immediately signal the operation as complete.
26774187667SPatrick Williams if (ctx.get_bus().process_discard())
26874187667SPatrick Williams {
26974187667SPatrick Williams this->complete();
27074187667SPatrick Williams return;
27174187667SPatrick Williams }
27274187667SPatrick Williams
27373e278b5SPatrick Williams // We need to call wait now, get the current timeout and stage ourselves
27473e278b5SPatrick Williams // as the next completion.
27574187667SPatrick Williams
27674187667SPatrick Williams // Get the bus' timeout.
27773e278b5SPatrick Williams uint64_t to_usec = 0;
2781ee60d6dSPatrick Williams sd_bus_get_timeout(get_busp(ctx), &to_usec);
27974187667SPatrick Williams
28073e278b5SPatrick Williams if (to_usec == UINT64_MAX)
28174187667SPatrick Williams {
28274187667SPatrick Williams // sd_bus_get_timeout returns UINT64_MAX to indicate 'wait forever'.
28373e278b5SPatrick Williams // Turn this into -1 for sd-event.
28473e278b5SPatrick Williams timeout = std::chrono::microseconds{-1};
28574187667SPatrick Williams }
28674187667SPatrick Williams else
28774187667SPatrick Williams {
28873e278b5SPatrick Williams timeout = std::chrono::microseconds(to_usec);
28974187667SPatrick Williams }
29074187667SPatrick Williams
29174187667SPatrick Williams // Assign ourselves as the pending completion and release the caller.
29274187667SPatrick Williams std::lock_guard lock{ctx.lock};
29373e278b5SPatrick Williams ctx.staged = this;
29474187667SPatrick Williams ctx.caller_wait.notify_one();
29574187667SPatrick Williams }
29674187667SPatrick Williams
wait_once(context & ctx)29774187667SPatrick Williams void details::wait_process_completion::wait_once(context& ctx)
29874187667SPatrick Williams {
29974187667SPatrick Williams // Scope for lock.
30074187667SPatrick Williams {
30174187667SPatrick Williams std::unique_lock lock{ctx.lock};
30274187667SPatrick Williams
30374187667SPatrick Williams // If there isn't a completion waiting already, wait on the condition
30474187667SPatrick Williams // variable for one to show up (we can't call `poll` yet because we
30574187667SPatrick Williams // don't have the required parameters).
30673e278b5SPatrick Williams ctx.caller_wait.wait(lock, [&] {
30773e278b5SPatrick Williams return (ctx.pending != nullptr) || (ctx.staged != nullptr) ||
3085d16a8edSPatrick Williams ctx.final_stop.stop_requested();
30973e278b5SPatrick Williams });
31074187667SPatrick Williams
31173e278b5SPatrick Williams // Save the waiter as pending.
31273e278b5SPatrick Williams if (ctx.pending == nullptr)
31373e278b5SPatrick Williams {
31473e278b5SPatrick Williams ctx.pending = std::exchange(ctx.staged, nullptr);
31573e278b5SPatrick Williams }
31674187667SPatrick Williams }
31774187667SPatrick Williams
31873e278b5SPatrick Williams // Run the event loop to process one request.
31910483c94SPatrick Williams // If the context has been requested to be stopped, skip the event loop.
32010483c94SPatrick Williams if (!ctx.final_stop.stop_requested() && ctx.pending)
32110483c94SPatrick Williams {
32273e278b5SPatrick Williams ctx.event_loop.run_one(ctx.pending->timeout);
32373e278b5SPatrick Williams }
32473e278b5SPatrick Williams }
32573e278b5SPatrick Williams
dbus_event_handle(sd_event_source *,int,uint32_t,void * data)32673e278b5SPatrick Williams int context::dbus_event_handle(sd_event_source*, int, uint32_t, void* data)
32773e278b5SPatrick Williams {
32873e278b5SPatrick Williams auto self = static_cast<context*>(data);
32973e278b5SPatrick Williams
33010483c94SPatrick Williams auto pending = std::exchange(self->pending, nullptr);
33173e278b5SPatrick Williams if (pending != nullptr)
33273e278b5SPatrick Williams {
33373e278b5SPatrick Williams pending->complete();
33473e278b5SPatrick Williams }
33573e278b5SPatrick Williams
33673e278b5SPatrick Williams return 0;
33774187667SPatrick Williams }
33874187667SPatrick Williams
33974187667SPatrick Williams } // namespace sdbusplus::async
340