1 /*
2 * Copyright (c) 2021-2024 NVIDIA Corporation
3 *
4 * Licensed under the Apache License Version 2.0 with LLVM Exceptions
5 * (the "License"); you may not use this file except in compliance with
6 * the License. You may obtain a copy of the License at
7 *
8 * https://llvm.org/LICENSE.txt
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #pragma once
17
18 #include "__execution_fwd.hpp"
19
20 // include these after __execution_fwd.hpp
21 #include "__completion_signatures.hpp"
22 #include "__cpo.hpp"
23 #include "__env.hpp"
24 #include "__meta.hpp"
25 #include "__receivers.hpp"
26 #include "__utility.hpp"
27
28 #include <condition_variable>
29 #include <exception>
30 #include <mutex>
31 #include <utility>
32
33 namespace stdexec
34 {
35 /////////////////////////////////////////////////////////////////////////////
36 // run_loop
37 namespace __loop
38 {
39 class run_loop;
40
41 struct __task : __immovable
42 {
43 __task* __next_ = this;
44
45 union
46 {
47 __task* __tail_ = nullptr;
48 void (*__execute_)(__task*) noexcept;
49 };
50
__executestdexec::__loop::__task51 void __execute() noexcept
52 {
53 (*__execute_)(this);
54 }
55 };
56
57 template <class _ReceiverId>
58 struct __operation
59 {
60 using _Receiver = stdexec::__t<_ReceiverId>;
61
62 struct __t : __task
63 {
64 using __id = __operation;
65
66 run_loop* __loop_;
67 STDEXEC_ATTRIBUTE((no_unique_address))
68 _Receiver __rcvr_;
69
__execute_implstdexec::__loop::__operation::__t70 static void __execute_impl(__task* __p) noexcept
71 {
72 auto& __rcvr = static_cast<__t*>(__p)->__rcvr_;
73 try
74 {
75 if (stdexec::get_stop_token(stdexec::get_env(__rcvr))
76 .stop_requested())
77 {
78 stdexec::set_stopped(static_cast<_Receiver&&>(__rcvr));
79 }
80 else
81 {
82 stdexec::set_value(static_cast<_Receiver&&>(__rcvr));
83 }
84 }
85 catch (...)
86 {
87 stdexec::set_error(static_cast<_Receiver&&>(__rcvr),
88 std::current_exception());
89 }
90 }
91
__tstdexec::__loop::__operation::__t92 explicit __t(__task* __tail) noexcept : __task{{}, this, __tail} {}
93
__tstdexec::__loop::__operation::__t94 __t(__task* __next, run_loop* __loop, _Receiver __rcvr) :
95 __task{{}, __next, {}}, __loop_{__loop},
96 __rcvr_{static_cast<_Receiver&&>(__rcvr)}
97 {
98 __execute_ = &__execute_impl;
99 }
100
101 void start() & noexcept;
102 };
103 };
104
105 class run_loop
106 {
107 template <class>
108 friend struct __operation;
109
110 public:
111 struct __scheduler
112 {
113 private:
114 struct __schedule_task
115 {
116 using __t = __schedule_task;
117 using __id = __schedule_task;
118 using sender_concept = sender_t;
119 using completion_signatures = stdexec::completion_signatures<
120 set_value_t(), set_error_t(std::exception_ptr),
121 set_stopped_t()>;
122
123 template <class _Receiver>
124 using __operation =
125 stdexec::__t<__operation<stdexec::__id<_Receiver>>>;
126
127 template <class _Receiver>
connectstdexec::__loop::run_loop::__scheduler::__schedule_task128 auto connect(_Receiver __rcvr) const -> __operation<_Receiver>
129 {
130 return {&__loop_->__head_, __loop_,
131 static_cast<_Receiver&&>(__rcvr)};
132 }
133
134 private:
135 friend __scheduler;
136
137 struct __env
138 {
139 using __t = __env;
140 using __id = __env;
141
142 run_loop* __loop_;
143
144 template <class _CPO>
querystdexec::__loop::run_loop::__scheduler::__schedule_task::__env145 auto query(get_completion_scheduler_t<_CPO>) const noexcept
146 -> __scheduler
147 {
148 return __loop_->get_scheduler();
149 }
150 };
151
__schedule_taskstdexec::__loop::run_loop::__scheduler::__schedule_task152 explicit __schedule_task(run_loop* __loop) noexcept :
153 __loop_(__loop)
154 {}
155
156 run_loop* const __loop_;
157
158 public:
get_envstdexec::__loop::run_loop::__scheduler::__schedule_task159 auto get_env() const noexcept -> __env
160 {
161 return __env{__loop_};
162 }
163 };
164
165 friend run_loop;
166
__schedulerstdexec::__loop::run_loop::__scheduler167 explicit __scheduler(run_loop* __loop) noexcept : __loop_(__loop) {}
168
169 run_loop* __loop_;
170
171 public:
172 using __t = __scheduler;
173 using __id = __scheduler;
174 auto operator==(const __scheduler&) const noexcept -> bool = default;
175
schedulestdexec::__loop::run_loop::__scheduler176 [[nodiscard]] auto schedule() const noexcept -> __schedule_task
177 {
178 return __schedule_task{__loop_};
179 }
180
querystdexec::__loop::run_loop::__scheduler181 auto query(get_forward_progress_guarantee_t) const noexcept
182 -> stdexec::forward_progress_guarantee
183 {
184 return stdexec::forward_progress_guarantee::parallel;
185 }
186
187 // BUGBUG NOT TO SPEC
querystdexec::__loop::run_loop::__scheduler188 auto query(execute_may_block_caller_t) const noexcept -> bool
189 {
190 return false;
191 }
192 };
193
get_scheduler()194 auto get_scheduler() noexcept -> __scheduler
195 {
196 return __scheduler{this};
197 }
198
199 void run();
200
201 void finish();
202
203 private:
204 void __push_back_(__task* __task);
205 auto __pop_front_() -> __task*;
206
207 std::mutex __mutex_;
208 std::condition_variable __cv_;
209 __task __head_{{}, &__head_, {&__head_}};
210 bool __stop_ = false;
211 };
212
213 template <class _ReceiverId>
start()214 inline void __operation<_ReceiverId>::__t::start() & noexcept
215 {
216 try
217 {
218 __loop_->__push_back_(this);
219 }
220 catch (...)
221 {
222 stdexec::set_error(static_cast<_Receiver&&>(__rcvr_),
223 std::current_exception());
224 }
225 }
226
run()227 inline void run_loop::run()
228 {
229 for (__task* __task; (__task = __pop_front_()) != &__head_;)
230 {
231 __task->__execute();
232 }
233 }
234
finish()235 inline void run_loop::finish()
236 {
237 std::unique_lock __lock{__mutex_};
238 __stop_ = true;
239 __cv_.notify_all();
240 }
241
__push_back_(__task * __task)242 inline void run_loop::__push_back_(__task* __task)
243 {
244 std::unique_lock __lock{__mutex_};
245 __task->__next_ = &__head_;
246 __head_.__tail_ = __head_.__tail_->__next_ = __task;
247 __cv_.notify_one();
248 }
249
__pop_front_()250 inline auto run_loop::__pop_front_() -> __task*
251 {
252 std::unique_lock __lock{__mutex_};
253 __cv_.wait(__lock,
254 [this] { return __head_.__next_ != &__head_ || __stop_; });
255 if (__head_.__tail_ == __head_.__next_)
256 __head_.__tail_ = &__head_;
257 return std::exchange(__head_.__next_, __head_.__next_->__next_);
258 }
259 } // namespace __loop
260
261 // NOT TO SPEC
262 using run_loop = __loop::run_loop;
263 } // namespace stdexec
264