1 /* 2 * Copyright (c) 2021-2024 NVIDIA Corporation 3 * 4 * Licensed under the Apache License Version 2.0 with LLVM Exceptions 5 * (the "License"); you may not use this file except in compliance with 6 * the License. You may obtain a copy of the License at 7 * 8 * https://llvm.org/LICENSE.txt 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 #pragma once 17 18 #include "__execution_fwd.hpp" 19 20 // include these after __execution_fwd.hpp 21 #include "__completion_signatures.hpp" 22 #include "__env.hpp" 23 #include "__meta.hpp" 24 #include "__receivers.hpp" 25 #include "__utility.hpp" 26 27 #include <condition_variable> 28 #include <exception> 29 #include <mutex> 30 #include <utility> 31 32 namespace stdexec { 33 ///////////////////////////////////////////////////////////////////////////// 34 // run_loop 35 namespace __loop { 36 class run_loop; 37 38 struct __task : __immovable { 39 __task* __next_ = this; 40 41 union { 42 __task* __tail_ = nullptr; 43 void (*__execute_)(__task*) noexcept; 44 }; 45 __executestdexec::__loop::__task46 void __execute() noexcept { 47 (*__execute_)(this); 48 } 49 }; 50 51 template <class _ReceiverId> 52 struct __operation { 53 using _Receiver = stdexec::__t<_ReceiverId>; 54 55 struct __t : __task { 56 using __id = __operation; 57 58 run_loop* __loop_; 59 STDEXEC_ATTRIBUTE(no_unique_address) _Receiver __rcvr_; 60 __execute_implstdexec::__loop::__operation::__t61 static void __execute_impl(__task* __p) noexcept { 62 auto& __rcvr = static_cast<__t*>(__p)->__rcvr_; 63 STDEXEC_TRY { 64 if (stdexec::get_stop_token(stdexec::get_env(__rcvr)).stop_requested()) { 65 stdexec::set_stopped(static_cast<_Receiver&&>(__rcvr)); 66 } else { 67 stdexec::set_value(static_cast<_Receiver&&>(__rcvr)); 68 } 69 } 70 STDEXEC_CATCH_ALL { 71 stdexec::set_error(static_cast<_Receiver&&>(__rcvr), std::current_exception()); 72 } 73 } 74 __tstdexec::__loop::__operation::__t75 explicit __t(__task* __tail) noexcept 76 : __task{{}, this, __tail} { 77 } 78 __tstdexec::__loop::__operation::__t79 __t(__task* __next, run_loop* __loop, _Receiver __rcvr) 80 : __task{{}, __next, {}} 81 , __loop_{__loop} 82 , __rcvr_{static_cast<_Receiver&&>(__rcvr)} { 83 __execute_ = &__execute_impl; 84 } 85 86 void start() & noexcept; 87 }; 88 }; 89 90 class run_loop { 91 template <class> 92 friend struct __operation; 93 public: 94 struct __scheduler { 95 private: 96 struct __schedule_task { 97 using __t = __schedule_task; 98 using __id = __schedule_task; 99 using sender_concept = sender_t; 100 using completion_signatures = stdexec::completion_signatures< 101 set_value_t(), 102 set_error_t(std::exception_ptr), 103 set_stopped_t() 104 >; 105 106 template <class _Receiver> 107 using __operation = stdexec::__t<__operation<stdexec::__id<_Receiver>>>; 108 109 template <class _Receiver> connectstdexec::__loop::run_loop::__scheduler::__schedule_task110 auto connect(_Receiver __rcvr) const -> __operation<_Receiver> { 111 return {&__loop_->__head_, __loop_, static_cast<_Receiver&&>(__rcvr)}; 112 } 113 114 private: 115 friend __scheduler; 116 117 struct __env { 118 using __t = __env; 119 using __id = __env; 120 121 run_loop* __loop_; 122 123 template <class _CPO> querystdexec::__loop::run_loop::__scheduler::__schedule_task::__env124 auto query(get_completion_scheduler_t<_CPO>) const noexcept -> __scheduler { 125 return __loop_->get_scheduler(); 126 } 127 }; 128 __schedule_taskstdexec::__loop::run_loop::__scheduler::__schedule_task129 explicit __schedule_task(run_loop* __loop) noexcept 130 : __loop_(__loop) { 131 } 132 133 run_loop* const __loop_; 134 135 public: 136 [[nodiscard]] get_envstdexec::__loop::run_loop::__scheduler::__schedule_task137 auto get_env() const noexcept -> __env { 138 return __env{__loop_}; 139 } 140 }; 141 142 friend run_loop; 143 __schedulerstdexec::__loop::run_loop::__scheduler144 explicit __scheduler(run_loop* __loop) noexcept 145 : __loop_(__loop) { 146 } 147 148 run_loop* __loop_; 149 150 public: 151 using __t = __scheduler; 152 using __id = __scheduler; 153 auto operator==(const __scheduler&) const noexcept -> bool = default; 154 155 [[nodiscard]] schedulestdexec::__loop::run_loop::__scheduler156 auto schedule() const noexcept -> __schedule_task { 157 return __schedule_task{__loop_}; 158 } 159 160 [[nodiscard]] querystdexec::__loop::run_loop::__scheduler161 auto query(get_forward_progress_guarantee_t) const noexcept 162 -> stdexec::forward_progress_guarantee { 163 return stdexec::forward_progress_guarantee::parallel; 164 } 165 166 // BUGBUG NOT TO SPEC 167 [[nodiscard]] querystdexec::__loop::run_loop::__scheduler168 auto query(execute_may_block_caller_t) const noexcept -> bool { 169 return false; 170 } 171 }; 172 get_scheduler()173 auto get_scheduler() noexcept -> __scheduler { 174 return __scheduler{this}; 175 } 176 177 void run(); 178 179 void finish(); 180 181 private: 182 void __push_back_(__task* __task); 183 auto __pop_front_() -> __task*; 184 185 std::mutex __mutex_; 186 std::condition_variable __cv_; 187 __task __head_{{}, &__head_, {&__head_}}; 188 bool __stop_ = false; 189 }; 190 191 template <class _ReceiverId> start()192 inline void __operation<_ReceiverId>::__t::start() & noexcept { 193 STDEXEC_TRY { 194 __loop_->__push_back_(this); 195 } 196 STDEXEC_CATCH_ALL { 197 stdexec::set_error(static_cast<_Receiver&&>(__rcvr_), std::current_exception()); 198 } 199 } 200 run()201 inline void run_loop::run() { 202 for (__task* __task; (__task = __pop_front_()) != &__head_;) { 203 __task->__execute(); 204 } 205 } 206 finish()207 inline void run_loop::finish() { 208 std::unique_lock __lock{__mutex_}; 209 __stop_ = true; 210 __cv_.notify_all(); 211 } 212 __push_back_(__task * __task)213 inline void run_loop::__push_back_(__task* __task) { 214 std::unique_lock __lock{__mutex_}; 215 __task->__next_ = &__head_; 216 __head_.__tail_ = __head_.__tail_->__next_ = __task; 217 __cv_.notify_one(); 218 } 219 __pop_front_()220 inline auto run_loop::__pop_front_() -> __task* { 221 std::unique_lock __lock{__mutex_}; 222 __cv_.wait(__lock, [this] { return __head_.__next_ != &__head_ || __stop_; }); 223 if (__head_.__tail_ == __head_.__next_) 224 __head_.__tail_ = &__head_; 225 return std::exchange(__head_.__next_, __head_.__next_->__next_); 226 } 227 } // namespace __loop 228 229 // NOT TO SPEC 230 using run_loop = __loop::run_loop; 231 } // namespace stdexec 232