1 /*
2  * Copyright (c) 2021-2024 NVIDIA Corporation
3  *
4  * Licensed under the Apache License Version 2.0 with LLVM Exceptions
5  * (the "License"); you may not use this file except in compliance with
6  * the License. You may obtain a copy of the License at
7  *
8  *   https://llvm.org/LICENSE.txt
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
18 #include "__execution_fwd.hpp"
19 
20 // include these after __execution_fwd.hpp
21 #include "__completion_signatures.hpp"
22 #include "__cpo.hpp"
23 #include "__env.hpp"
24 #include "__meta.hpp"
25 #include "__receivers.hpp"
26 #include "__utility.hpp"
27 
28 #include <condition_variable>
29 #include <exception>
30 #include <mutex>
31 #include <utility>
32 
33 namespace stdexec
34 {
35 /////////////////////////////////////////////////////////////////////////////
36 // run_loop
37 namespace __loop
38 {
39 class run_loop;
40 
41 struct __task : __immovable
42 {
43     __task* __next_ = this;
44 
45     union
46     {
47         __task* __tail_ = nullptr;
48         void (*__execute_)(__task*) noexcept;
49     };
50 
__executestdexec::__loop::__task51     void __execute() noexcept
52     {
53         (*__execute_)(this);
54     }
55 };
56 
57 template <class _ReceiverId>
58 struct __operation
59 {
60     using _Receiver = stdexec::__t<_ReceiverId>;
61 
62     struct __t : __task
63     {
64         using __id = __operation;
65 
66         run_loop* __loop_;
67         STDEXEC_ATTRIBUTE((no_unique_address))
68         _Receiver __rcvr_;
69 
__execute_implstdexec::__loop::__operation::__t70         static void __execute_impl(__task* __p) noexcept
71         {
72             auto& __rcvr = static_cast<__t*>(__p)->__rcvr_;
73             try
74             {
75                 if (stdexec::get_stop_token(stdexec::get_env(__rcvr))
76                         .stop_requested())
77                 {
78                     stdexec::set_stopped(static_cast<_Receiver&&>(__rcvr));
79                 }
80                 else
81                 {
82                     stdexec::set_value(static_cast<_Receiver&&>(__rcvr));
83                 }
84             }
85             catch (...)
86             {
87                 stdexec::set_error(static_cast<_Receiver&&>(__rcvr),
88                                    std::current_exception());
89             }
90         }
91 
__tstdexec::__loop::__operation::__t92         explicit __t(__task* __tail) noexcept : __task{{}, this, __tail} {}
93 
__tstdexec::__loop::__operation::__t94         __t(__task* __next, run_loop* __loop, _Receiver __rcvr) :
95             __task{{}, __next, {}}, __loop_{__loop},
96             __rcvr_{static_cast<_Receiver&&>(__rcvr)}
97         {
98             __execute_ = &__execute_impl;
99         }
100 
101         void start() & noexcept;
102     };
103 };
104 
105 class run_loop
106 {
107     template <class>
108     friend struct __operation;
109 
110   public:
111     struct __scheduler
112     {
113       private:
114         struct __schedule_task
115         {
116             using __t = __schedule_task;
117             using __id = __schedule_task;
118             using sender_concept = sender_t;
119             using completion_signatures = stdexec::completion_signatures<
120                 set_value_t(), set_error_t(std::exception_ptr),
121                 set_stopped_t()>;
122 
123             template <class _Receiver>
124             using __operation =
125                 stdexec::__t<__operation<stdexec::__id<_Receiver>>>;
126 
127             template <class _Receiver>
connectstdexec::__loop::run_loop::__scheduler::__schedule_task128             auto connect(_Receiver __rcvr) const -> __operation<_Receiver>
129             {
130                 return {&__loop_->__head_, __loop_,
131                         static_cast<_Receiver&&>(__rcvr)};
132             }
133 
134           private:
135             friend __scheduler;
136 
137             struct __env
138             {
139                 using __t = __env;
140                 using __id = __env;
141 
142                 run_loop* __loop_;
143 
144                 template <class _CPO>
querystdexec::__loop::run_loop::__scheduler::__schedule_task::__env145                 auto query(get_completion_scheduler_t<_CPO>) const noexcept
146                     -> __scheduler
147                 {
148                     return __loop_->get_scheduler();
149                 }
150             };
151 
__schedule_taskstdexec::__loop::run_loop::__scheduler::__schedule_task152             explicit __schedule_task(run_loop* __loop) noexcept :
153                 __loop_(__loop)
154             {}
155 
156             run_loop* const __loop_;
157 
158           public:
get_envstdexec::__loop::run_loop::__scheduler::__schedule_task159             auto get_env() const noexcept -> __env
160             {
161                 return __env{__loop_};
162             }
163         };
164 
165         friend run_loop;
166 
__schedulerstdexec::__loop::run_loop::__scheduler167         explicit __scheduler(run_loop* __loop) noexcept : __loop_(__loop) {}
168 
169         run_loop* __loop_;
170 
171       public:
172         using __t = __scheduler;
173         using __id = __scheduler;
174         auto operator==(const __scheduler&) const noexcept -> bool = default;
175 
schedulestdexec::__loop::run_loop::__scheduler176         [[nodiscard]] auto schedule() const noexcept -> __schedule_task
177         {
178             return __schedule_task{__loop_};
179         }
180 
querystdexec::__loop::run_loop::__scheduler181         auto query(get_forward_progress_guarantee_t) const noexcept
182             -> stdexec::forward_progress_guarantee
183         {
184             return stdexec::forward_progress_guarantee::parallel;
185         }
186 
187         // BUGBUG NOT TO SPEC
querystdexec::__loop::run_loop::__scheduler188         auto query(execute_may_block_caller_t) const noexcept -> bool
189         {
190             return false;
191         }
192     };
193 
get_scheduler()194     auto get_scheduler() noexcept -> __scheduler
195     {
196         return __scheduler{this};
197     }
198 
199     void run();
200 
201     void finish();
202 
203   private:
204     void __push_back_(__task* __task);
205     auto __pop_front_() -> __task*;
206 
207     std::mutex __mutex_;
208     std::condition_variable __cv_;
209     __task __head_{{}, &__head_, {&__head_}};
210     bool __stop_ = false;
211 };
212 
213 template <class _ReceiverId>
start()214 inline void __operation<_ReceiverId>::__t::start() & noexcept
215 {
216     try
217     {
218         __loop_->__push_back_(this);
219     }
220     catch (...)
221     {
222         stdexec::set_error(static_cast<_Receiver&&>(__rcvr_),
223                            std::current_exception());
224     }
225 }
226 
run()227 inline void run_loop::run()
228 {
229     for (__task* __task; (__task = __pop_front_()) != &__head_;)
230     {
231         __task->__execute();
232     }
233 }
234 
finish()235 inline void run_loop::finish()
236 {
237     std::unique_lock __lock{__mutex_};
238     __stop_ = true;
239     __cv_.notify_all();
240 }
241 
__push_back_(__task * __task)242 inline void run_loop::__push_back_(__task* __task)
243 {
244     std::unique_lock __lock{__mutex_};
245     __task->__next_ = &__head_;
246     __head_.__tail_ = __head_.__tail_->__next_ = __task;
247     __cv_.notify_one();
248 }
249 
__pop_front_()250 inline auto run_loop::__pop_front_() -> __task*
251 {
252     std::unique_lock __lock{__mutex_};
253     __cv_.wait(__lock,
254                [this] { return __head_.__next_ != &__head_ || __stop_; });
255     if (__head_.__tail_ == __head_.__next_)
256         __head_.__tail_ = &__head_;
257     return std::exchange(__head_.__next_, __head_.__next_->__next_);
258 }
259 } // namespace __loop
260 
261 // NOT TO SPEC
262 using run_loop = __loop::run_loop;
263 } // namespace stdexec
264