xref: /openbmc/sdbusplus/include/sdbusplus/async/stdexec/__detail/__run_loop.hpp (revision 10d0b4b7d1498cfd5c3d37edea271a54d1984e41)
1 /*
2  * Copyright (c) 2021-2024 NVIDIA Corporation
3  *
4  * Licensed under the Apache License Version 2.0 with LLVM Exceptions
5  * (the "License"); you may not use this file except in compliance with
6  * the License. You may obtain a copy of the License at
7  *
8  *   https://llvm.org/LICENSE.txt
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
18 #include "__execution_fwd.hpp"
19 
20 // include these after __execution_fwd.hpp
21 #include "__completion_signatures.hpp"
22 #include "__env.hpp"
23 #include "__meta.hpp"
24 #include "__receivers.hpp"
25 #include "__utility.hpp"
26 
27 #include <condition_variable>
28 #include <exception>
29 #include <mutex>
30 #include <utility>
31 
32 namespace stdexec {
33   /////////////////////////////////////////////////////////////////////////////
34   // run_loop
35   namespace __loop {
36     class run_loop;
37 
38     struct __task : __immovable {
39       __task* __next_ = this;
40 
41       union {
42         __task* __tail_ = nullptr;
43         void (*__execute_)(__task*) noexcept;
44       };
45 
__executestdexec::__loop::__task46       void __execute() noexcept {
47         (*__execute_)(this);
48       }
49     };
50 
51     template <class _ReceiverId>
52     struct __operation {
53       using _Receiver = stdexec::__t<_ReceiverId>;
54 
55       struct __t : __task {
56         using __id = __operation;
57 
58         run_loop* __loop_;
59         STDEXEC_ATTRIBUTE(no_unique_address) _Receiver __rcvr_;
60 
__execute_implstdexec::__loop::__operation::__t61         static void __execute_impl(__task* __p) noexcept {
62           auto& __rcvr = static_cast<__t*>(__p)->__rcvr_;
63           STDEXEC_TRY {
64             if (stdexec::get_stop_token(stdexec::get_env(__rcvr)).stop_requested()) {
65               stdexec::set_stopped(static_cast<_Receiver&&>(__rcvr));
66             } else {
67               stdexec::set_value(static_cast<_Receiver&&>(__rcvr));
68             }
69           }
70           STDEXEC_CATCH_ALL {
71             stdexec::set_error(static_cast<_Receiver&&>(__rcvr), std::current_exception());
72           }
73         }
74 
__tstdexec::__loop::__operation::__t75         explicit __t(__task* __tail) noexcept
76           : __task{{}, this, __tail} {
77         }
78 
__tstdexec::__loop::__operation::__t79         __t(__task* __next, run_loop* __loop, _Receiver __rcvr)
80           : __task{{}, __next, {}}
81           , __loop_{__loop}
82           , __rcvr_{static_cast<_Receiver&&>(__rcvr)} {
83           __execute_ = &__execute_impl;
84         }
85 
86         void start() & noexcept;
87       };
88     };
89 
90     class run_loop {
91       template <class>
92       friend struct __operation;
93      public:
94       struct __scheduler {
95        private:
96         struct __schedule_task {
97           using __t = __schedule_task;
98           using __id = __schedule_task;
99           using sender_concept = sender_t;
100           using completion_signatures = stdexec::completion_signatures<
101             set_value_t(),
102             set_error_t(std::exception_ptr),
103             set_stopped_t()
104           >;
105 
106           template <class _Receiver>
107           using __operation = stdexec::__t<__operation<stdexec::__id<_Receiver>>>;
108 
109           template <class _Receiver>
connectstdexec::__loop::run_loop::__scheduler::__schedule_task110           auto connect(_Receiver __rcvr) const -> __operation<_Receiver> {
111             return {&__loop_->__head_, __loop_, static_cast<_Receiver&&>(__rcvr)};
112           }
113 
114          private:
115           friend __scheduler;
116 
117           struct __env {
118             using __t = __env;
119             using __id = __env;
120 
121             run_loop* __loop_;
122 
123             template <class _CPO>
querystdexec::__loop::run_loop::__scheduler::__schedule_task::__env124             auto query(get_completion_scheduler_t<_CPO>) const noexcept -> __scheduler {
125               return __loop_->get_scheduler();
126             }
127           };
128 
__schedule_taskstdexec::__loop::run_loop::__scheduler::__schedule_task129           explicit __schedule_task(run_loop* __loop) noexcept
130             : __loop_(__loop) {
131           }
132 
133           run_loop* const __loop_;
134 
135          public:
136           [[nodiscard]]
get_envstdexec::__loop::run_loop::__scheduler::__schedule_task137           auto get_env() const noexcept -> __env {
138             return __env{__loop_};
139           }
140         };
141 
142         friend run_loop;
143 
__schedulerstdexec::__loop::run_loop::__scheduler144         explicit __scheduler(run_loop* __loop) noexcept
145           : __loop_(__loop) {
146         }
147 
148         run_loop* __loop_;
149 
150        public:
151         using __t = __scheduler;
152         using __id = __scheduler;
153         auto operator==(const __scheduler&) const noexcept -> bool = default;
154 
155         [[nodiscard]]
schedulestdexec::__loop::run_loop::__scheduler156         auto schedule() const noexcept -> __schedule_task {
157           return __schedule_task{__loop_};
158         }
159 
160         [[nodiscard]]
querystdexec::__loop::run_loop::__scheduler161         auto query(get_forward_progress_guarantee_t) const noexcept
162           -> stdexec::forward_progress_guarantee {
163           return stdexec::forward_progress_guarantee::parallel;
164         }
165 
166         // BUGBUG NOT TO SPEC
167         [[nodiscard]]
querystdexec::__loop::run_loop::__scheduler168         auto query(execute_may_block_caller_t) const noexcept -> bool {
169           return false;
170         }
171       };
172 
get_scheduler()173       auto get_scheduler() noexcept -> __scheduler {
174         return __scheduler{this};
175       }
176 
177       void run();
178 
179       void finish();
180 
181      private:
182       void __push_back_(__task* __task);
183       auto __pop_front_() -> __task*;
184 
185       std::mutex __mutex_;
186       std::condition_variable __cv_;
187       __task __head_{{}, &__head_, {&__head_}};
188       bool __stop_ = false;
189     };
190 
191     template <class _ReceiverId>
start()192     inline void __operation<_ReceiverId>::__t::start() & noexcept {
193       STDEXEC_TRY {
194         __loop_->__push_back_(this);
195       }
196       STDEXEC_CATCH_ALL {
197         stdexec::set_error(static_cast<_Receiver&&>(__rcvr_), std::current_exception());
198       }
199     }
200 
run()201     inline void run_loop::run() {
202       for (__task* __task; (__task = __pop_front_()) != &__head_;) {
203         __task->__execute();
204       }
205     }
206 
finish()207     inline void run_loop::finish() {
208       std::unique_lock __lock{__mutex_};
209       __stop_ = true;
210       __cv_.notify_all();
211     }
212 
__push_back_(__task * __task)213     inline void run_loop::__push_back_(__task* __task) {
214       std::unique_lock __lock{__mutex_};
215       __task->__next_ = &__head_;
216       __head_.__tail_ = __head_.__tail_->__next_ = __task;
217       __cv_.notify_one();
218     }
219 
__pop_front_()220     inline auto run_loop::__pop_front_() -> __task* {
221       std::unique_lock __lock{__mutex_};
222       __cv_.wait(__lock, [this] { return __head_.__next_ != &__head_ || __stop_; });
223       if (__head_.__tail_ == __head_.__next_)
224         __head_.__tail_ = &__head_;
225       return std::exchange(__head_.__next_, __head_.__next_->__next_);
226     }
227   } // namespace __loop
228 
229   // NOT TO SPEC
230   using run_loop = __loop::run_loop;
231 } // namespace stdexec
232