1 /* 2 * Copyright (c) 2021-2024 NVIDIA Corporation 3 * 4 * Licensed under the Apache License Version 2.0 with LLVM Exceptions 5 * (the "License"); you may not use this file except in compliance with 6 * the License. You may obtain a copy of the License at 7 * 8 * https://llvm.org/LICENSE.txt 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 #pragma once 17 18 #include "__execution_fwd.hpp" 19 20 // include these after __execution_fwd.hpp 21 #include "__basic_sender.hpp" 22 #include "__concepts.hpp" 23 #include "__domain.hpp" 24 #include "__env.hpp" 25 #include "__meta.hpp" 26 #include "__operation_states.hpp" 27 #include "__senders.hpp" 28 #include "__schedulers.hpp" 29 #include "__transform_completion_signatures.hpp" 30 #include "__tuple.hpp" 31 #include "__variant.hpp" 32 33 namespace stdexec { 34 ///////////////////////////////////////////////////////////////////////////// 35 // [execution.senders.adaptors.schedule_from] 36 namespace __schfr { 37 // Compute a variant type that is capable of storing the results of the 38 // input sender when it completes. The variant has type: 39 // variant< 40 // monostate, 41 // tuple<set_stopped_t>, 42 // tuple<set_value_t, __decay_t<_Values1>...>, 43 // tuple<set_value_t, __decay_t<_Values2>...>, 44 // ... 45 // tuple<set_error_t, __decay_t<_Error1>>, 46 // tuple<set_error_t, __decay_t<_Error2>>, 47 // ... 48 // > 49 template <class _CvrefSender, class _Env> 50 using __results_of = __for_each_completion_signature< 51 __completion_signatures_of_t<_CvrefSender, _Env>, 52 __decayed_tuple, 53 __munique<__qq<stdexec::__variant_for>>::__f 54 >; 55 56 template <class... _Values> 57 using __decay_value_sig = set_value_t (*)(__decay_t<_Values>...); 58 59 template <class _Error> 60 using __decay_error_sig = set_error_t (*)(__decay_t<_Error>); 61 62 template <class _Scheduler, class _Completions, class... _Env> 63 using __completions_impl_t = __mtry_q<__concat_completion_signatures>::__f< 64 __transform_completion_signatures< 65 _Completions, 66 __decay_value_sig, 67 __decay_error_sig, 68 set_stopped_t (*)(), 69 __completion_signature_ptrs 70 >, 71 transform_completion_signatures< 72 __completion_signatures_of_t<schedule_result_t<_Scheduler>, _Env...>, 73 __eptr_completion_if_t<__nothrow_decay_copyable_results_t<_Completions>>, 74 __mconst<completion_signatures<>>::__f 75 > 76 >; 77 78 template <class _Scheduler, class _CvrefSender, class... _Env> 79 using __completions_t = 80 __completions_impl_t<_Scheduler, __completion_signatures_of_t<_CvrefSender, _Env...>, _Env...>; 81 82 template <class _Scheduler, class _Sexpr, class _Receiver> 83 struct __state; 84 85 template <class _State> STDEXEC_ATTRIBUTE(always_inline)86 STDEXEC_ATTRIBUTE(always_inline) 87 auto __make_visitor_fn(_State* __state) noexcept { 88 return [__state]<class _Tup>(_Tup& __tupl) noexcept -> void { 89 __tupl.apply( 90 [&]<class... _Args>(auto __tag, _Args&... __args) noexcept -> void { 91 __tag(std::move(__state->__receiver()), static_cast<_Args&&>(__args)...); 92 }, 93 __tupl); 94 }; 95 } 96 97 // This receiver is to be completed on the execution context associated with the scheduler. When 98 // the source sender completes, the completion information is saved off in the operation state 99 // so that when this receiver completes, it can read the completion out of the operation state 100 // and forward it to the output receiver after transitioning to the scheduler's context. 101 template <class _SchedulerId, class _SexprId, class _ReceiverId> 102 struct __rcvr2 { 103 using _Scheduler = stdexec::__t<_SchedulerId>; 104 using _Sexpr = stdexec::__t<_SexprId>; 105 using _Receiver = stdexec::__t<_ReceiverId>; 106 107 struct __t { 108 using receiver_concept = receiver_t; 109 using __id = __rcvr2; 110 set_valuestdexec::__schfr::__rcvr2::__t111 void set_value() noexcept { 112 __state_->__data_.visit(__schfr::__make_visitor_fn(__state_), __state_->__data_); 113 } 114 115 template <class _Error> set_errorstdexec::__schfr::__rcvr2::__t116 void set_error(_Error&& __err) noexcept { 117 stdexec::set_error( 118 static_cast<_Receiver&&>(__state_->__receiver()), static_cast<_Error&&>(__err)); 119 } 120 set_stoppedstdexec::__schfr::__rcvr2::__t121 void set_stopped() noexcept { 122 stdexec::set_stopped(static_cast<_Receiver&&>(__state_->__receiver())); 123 } 124 get_envstdexec::__schfr::__rcvr2::__t125 auto get_env() const noexcept -> env_of_t<_Receiver> { 126 return stdexec::get_env(__state_->__receiver()); 127 } 128 129 __state<_Scheduler, _Sexpr, _Receiver>* __state_; 130 }; 131 }; 132 133 template <class _Scheduler, class _Sexpr, class _Receiver> 134 using __receiver2 = __t<__rcvr2<__id<_Scheduler>, __id<_Sexpr>, __id<_Receiver>>>; 135 136 template <class _Scheduler, class _Sexpr, class _Receiver> 137 struct __state 138 : __enable_receiver_from_this<_Sexpr, _Receiver, __state<_Scheduler, _Sexpr, _Receiver>> 139 , __immovable { 140 using __variant_t = __results_of<__child_of<_Sexpr>, env_of_t<_Receiver>>; 141 using __receiver2_t = __receiver2<_Scheduler, _Sexpr, _Receiver>; 142 143 __variant_t __data_; 144 connect_result_t<schedule_result_t<_Scheduler>, __receiver2_t> __state2_; 145 __statestdexec::__schfr::__state146 explicit __state(_Scheduler __sched) 147 : __data_() 148 , __state2_(connect(schedule(__sched), __receiver2_t{this})) { 149 } 150 }; 151 152 struct schedule_from_t { 153 template <scheduler _Scheduler, sender _Sender> operator ()stdexec::__schfr::schedule_from_t154 auto operator()(_Scheduler __sched, _Sender&& __sndr) const -> __well_formed_sender auto { 155 auto __domain = query_or(get_domain, __sched, default_domain()); 156 return stdexec::transform_sender( 157 __domain, 158 __make_sexpr<schedule_from_t>( 159 static_cast<_Scheduler&&>(__sched), static_cast<_Sender&&>(__sndr))); 160 } 161 }; 162 163 struct __schedule_from_impl : __sexpr_defaults { 164 template <class _Sender> 165 using __scheduler_t = 166 __decay_t<__call_result_t<get_completion_scheduler_t<set_value_t>, env_of_t<_Sender>>>; 167 168 static constexpr auto get_attrs = []<class _Data, class _Child>( 169 const _Data& __data, 170 const _Child& __child) noexcept { 171 auto __domain = query_or(get_domain, __data, default_domain{}); 172 return __env::__join(__sched_attrs{std::cref(__data), __domain}, stdexec::get_env(__child)); 173 }; 174 175 static constexpr auto get_completion_signatures = 176 []<class _Sender, class... _Env>(_Sender&&, _Env&&...) noexcept 177 -> __completions_t<__scheduler_t<_Sender>, __child_of<_Sender>, _Env...> { 178 static_assert(sender_expr_for<_Sender, schedule_from_t>); 179 return {}; 180 }; 181 182 static constexpr auto get_state = 183 []<class _Sender, class _Receiver>(_Sender&& __sndr, _Receiver&) { 184 static_assert(sender_expr_for<_Sender, schedule_from_t>); 185 auto __sched = get_completion_scheduler<set_value_t>(stdexec::get_env(__sndr)); 186 using _Scheduler = decltype(__sched); 187 return __state<_Scheduler, _Sender, _Receiver>{__sched}; 188 }; 189 190 static constexpr auto complete = 191 []<class _State, class _Receiver, class _Tag, class... _Args>( 192 __ignore, 193 _State& __state, 194 _Receiver& __rcvr, 195 _Tag __tag, 196 _Args&&... __args) noexcept -> void { 197 // Write the tag and the args into the operation state so that we can forward the completion 198 // from within the scheduler's execution context. 199 if constexpr (__nothrow_callable<__tup::__mktuple_t, _Tag, _Args...>) { 200 __state.__data_.emplace_from(__tup::__mktuple, __tag, static_cast<_Args&&>(__args)...); 201 } else { 202 STDEXEC_TRY { 203 __state.__data_.emplace_from(__tup::__mktuple, __tag, static_cast<_Args&&>(__args)...); 204 } 205 STDEXEC_CATCH_ALL { 206 stdexec::set_error(static_cast<_Receiver&&>(__rcvr), std::current_exception()); 207 return; 208 } 209 } 210 211 // Enqueue the schedule operation so the completion happens on the scheduler's execution 212 // context. 213 stdexec::start(__state.__state2_); 214 }; 215 }; 216 } // namespace __schfr 217 218 using __schfr::schedule_from_t; 219 inline constexpr schedule_from_t schedule_from{}; 220 221 template <> 222 struct __sexpr_impl<schedule_from_t> : __schfr::__schedule_from_impl { }; 223 } // namespace stdexec 224