xref: /openbmc/sdbusplus/include/sdbusplus/async/stdexec/__detail/__schedule_from.hpp (revision 36137e09614746b13603b5fbae79e6f70819c46b)
1 /*
2  * Copyright (c) 2021-2024 NVIDIA Corporation
3  *
4  * Licensed under the Apache License Version 2.0 with LLVM Exceptions
5  * (the "License"); you may not use this file except in compliance with
6  * the License. You may obtain a copy of the License at
7  *
8  *   https://llvm.org/LICENSE.txt
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
18 #include "__execution_fwd.hpp"
19 
20 // include these after __execution_fwd.hpp
21 #include "__concepts.hpp"
22 #include "__domain.hpp"
23 #include "__env.hpp"
24 #include "__meta.hpp"
25 #include "__operation_states.hpp"
26 #include "__schedulers.hpp"
27 #include "__senders_core.hpp"
28 #include "__transform_completion_signatures.hpp"
29 #include "__tuple.hpp"
30 #include "__variant.hpp"
31 
32 namespace stdexec
33 {
34 /////////////////////////////////////////////////////////////////////////////
35 // [execution.senders.adaptors.schedule_from]
36 namespace __schfr
37 {
38 template <class... _Ts>
39 using __tuple_t = __tuple_for<__decay_t<_Ts>...>;
40 
41 template <class... _Ts>
42 using __variant_t = __variant_for<__monostate, _Ts...>;
43 
44 // Compute a variant type that is capable of storing the results of the
45 // input sender when it completes. The variant has type:
46 //   variant<
47 //     monostate,
48 //     tuple<set_stopped_t>,
49 //     tuple<set_value_t, __decay_t<_Values1>...>,
50 //     tuple<set_value_t, __decay_t<_Values2>...>,
51 //        ...
52 //     tuple<set_error_t, __decay_t<_Error1>>,
53 //     tuple<set_error_t, __decay_t<_Error2>>,
54 //        ...
55 //   >
56 template <class _CvrefSender, class _Env>
57 using __variant_for = //
58     __for_each_completion_signature<
59         __completion_signatures_of_t<_CvrefSender, _Env>, __tuple_t,
60         __munique<__qq<__variant_for>>::__f>;
61 
62 template <class... _Values>
63 using __decay_value_sig = set_value_t (*)(__decay_t<_Values>...);
64 
65 template <class _Error>
66 using __decay_error_sig = set_error_t (*)(__decay_t<_Error>);
67 
68 template <class... _Ts>
69 using __all_nothrow_decay_copyable =
70     __mbool<(__nothrow_decay_copyable<_Ts> && ...)>;
71 
72 template <class _CvrefSender, class... _Env>
73 using __all_nothrow_decay_copyable_results = //
74     __for_each_completion_signature<
75         __completion_signatures_of_t<_CvrefSender, _Env...>,
76         __all_nothrow_decay_copyable, __mand_t>;
77 
78 template <class _Scheduler, class _CvrefSender, class... _Env>
79 using __completions_t = //
80     __mtry_q<__concat_completion_signatures>::__f<
81         __transform_completion_signatures<
82             __completion_signatures_of_t<_CvrefSender, _Env...>,
83             __decay_value_sig, __decay_error_sig, set_stopped_t (*)(),
84             __completion_signature_ptrs>,
85         transform_completion_signatures<
86             __completion_signatures_of_t<schedule_result_t<_Scheduler>,
87                                          _Env...>,
88             __eptr_completion_if_t<
89                 __all_nothrow_decay_copyable_results<_CvrefSender, _Env...>>,
90             __mconst<completion_signatures<>>::__f>>;
91 
92 template <class _SchedulerId>
93 struct __environ
94 {
95     using _Scheduler = stdexec::__t<_SchedulerId>;
96 
97     struct __t
98     {
99         using __id = __environ;
100 
101         _Scheduler __sched_;
102 
103         template <__one_of<set_value_t, set_stopped_t> _Tag>
querystdexec::__schfr::__environ::__t104         auto query(get_completion_scheduler_t<_Tag>) const noexcept
105         {
106             return __sched_;
107         }
108 
querystdexec::__schfr::__environ::__t109         auto query(get_domain_t) const noexcept
110         {
111             return query_or(get_domain, __sched_, default_domain());
112         }
113     };
114 };
115 
116 template <class _Scheduler, class _Sexpr, class _Receiver>
117 struct __state;
118 
119 template <class _State>
120 STDEXEC_ATTRIBUTE((always_inline))
__make_visitor_fn(_State * __state)121 auto __make_visitor_fn(_State* __state) noexcept
122 {
123     return [__state]<class _Tup>(_Tup& __tupl) noexcept -> void {
124         if constexpr (__same_as<_Tup, __monostate>)
125         {
126             std::terminate(); // reaching this indicates a bug in schedule_from
127         }
128         else
129         {
130             __tupl.apply(
131                 [&]<class... _Args>(auto __tag,
132                                     _Args&... __args) noexcept -> void {
133                     __tag(std::move(__state->__receiver()),
134                           static_cast<_Args&&>(__args)...);
135                 },
136                 __tupl);
137         }
138     };
139 }
140 
141 // This receiver is to be completed on the execution context associated with the
142 // scheduler. When the source sender completes, the completion information is
143 // saved off in the operation state so that when this receiver completes, it can
144 // read the completion out of the operation state and forward it to the output
145 // receiver after transitioning to the scheduler's context.
146 template <class _Scheduler, class _Sexpr, class _Receiver>
147 struct __receiver2
148 {
149     using receiver_concept = receiver_t;
150 
set_valuestdexec::__schfr::__receiver2151     void set_value() noexcept
152     {
153         __state_->__data_.visit(__schfr::__make_visitor_fn(__state_),
154                                 __state_->__data_);
155     }
156 
157     template <class _Error>
set_errorstdexec::__schfr::__receiver2158     void set_error(_Error&& __err) noexcept
159     {
160         stdexec::set_error(static_cast<_Receiver&&>(__state_->__receiver()),
161                            static_cast<_Error&&>(__err));
162     }
163 
set_stoppedstdexec::__schfr::__receiver2164     void set_stopped() noexcept
165     {
166         stdexec::set_stopped(static_cast<_Receiver&&>(__state_->__receiver()));
167     }
168 
get_envstdexec::__schfr::__receiver2169     auto get_env() const noexcept -> env_of_t<_Receiver>
170     {
171         return stdexec::get_env(__state_->__receiver());
172     }
173 
174     __state<_Scheduler, _Sexpr, _Receiver>* __state_;
175 };
176 
177 template <class _Scheduler, class _Sexpr, class _Receiver>
178 struct __state :
179     __enable_receiver_from_this<_Sexpr, _Receiver,
180                                 __state<_Scheduler, _Sexpr, _Receiver>>,
181     __immovable
182 {
183     using __variant_t = __variant_for<__child_of<_Sexpr>, env_of_t<_Receiver>>;
184     using __receiver2_t = __receiver2<_Scheduler, _Sexpr, _Receiver>;
185 
186     __variant_t __data_;
187     connect_result_t<schedule_result_t<_Scheduler>, __receiver2_t> __state2_;
188 
__statestdexec::__schfr::__state189     explicit __state(_Scheduler __sched) :
190         __data_(), __state2_(connect(schedule(__sched), __receiver2_t{this}))
191     {}
192 };
193 
194 struct schedule_from_t
195 {
196     template <scheduler _Scheduler, sender _Sender>
operator ()stdexec::__schfr::schedule_from_t197     auto operator()(_Scheduler&& __sched, _Sender&& __sndr) const
198         -> __well_formed_sender auto
199     {
200         using _Env = __t<__environ<__id<__decay_t<_Scheduler>>>>;
201         auto __env = _Env{{static_cast<_Scheduler&&>(__sched)}};
202         auto __domain = query_or(get_domain, __sched, default_domain());
203         return stdexec::transform_sender(
204             __domain, __make_sexpr<schedule_from_t>(
205                           std::move(__env), static_cast<_Sender&&>(__sndr)));
206     }
207 
208     using _Sender = __1;
209     using _Env = __0;
210     using __legacy_customizations_t = __types<tag_invoke_t(
211         schedule_from_t, get_completion_scheduler_t<set_value_t>(_Env&),
212         _Sender)>;
213 };
214 
215 struct __schedule_from_impl : __sexpr_defaults
216 {
217     template <class _Sender>
218     using __scheduler_t =
219         __decay_t<__call_result_t<get_completion_scheduler_t<set_value_t>,
220                                   env_of_t<_Sender>>>;
221 
222     static constexpr auto get_attrs = //
223         []<class _Data, class _Child>(const _Data& __data,
224                                       const _Child& __child) noexcept {
225             return __env::__join(__data, stdexec::get_env(__child));
226         };
227 
228     static constexpr auto get_completion_signatures = //
229         []<class _Sender, class... _Env>(_Sender&&, _Env&&...) noexcept
230         -> __completions_t<__scheduler_t<_Sender>, __child_of<_Sender>,
231                            _Env...> {
232         static_assert(sender_expr_for<_Sender, schedule_from_t>);
233         return {};
234     };
235 
236     static constexpr auto get_state =
237         []<class _Sender, class _Receiver>(_Sender&& __sndr, _Receiver&) {
238             static_assert(sender_expr_for<_Sender, schedule_from_t>);
239             auto __sched =
240                 get_completion_scheduler<set_value_t>(stdexec::get_env(__sndr));
241             using _Scheduler = decltype(__sched);
242             return __state<_Scheduler, _Sender, _Receiver>{__sched};
243         };
244 
245     static constexpr auto complete = //
246         []<class _State, class _Receiver, class _Tag, class... _Args>(
247             __ignore, _State& __state, _Receiver& __rcvr, _Tag __tag,
248             _Args&&... __args) noexcept -> void {
249         // Write the tag and the args into the operation state so that we can
250         // forward the completion from within the scheduler's execution context.
251         if constexpr (__nothrow_callable<__tup::__mktuple_t, _Tag, _Args...>)
252         {
253             __state.__data_.emplace_from(__tup::__mktuple, __tag,
254                                          static_cast<_Args&&>(__args)...);
255         }
256         else
257         {
258             try
259             {
260                 __state.__data_.emplace_from(__tup::__mktuple, __tag,
261                                              static_cast<_Args&&>(__args)...);
262             }
263             catch (...)
264             {
265                 stdexec::set_error(static_cast<_Receiver&&>(__rcvr),
266                                    std::current_exception());
267                 return;
268             }
269         }
270 
271         // Enqueue the schedule operation so the completion happens on the
272         // scheduler's execution context.
273         stdexec::start(__state.__state2_);
274     };
275 };
276 } // namespace __schfr
277 
278 using __schfr::schedule_from_t;
279 inline constexpr schedule_from_t schedule_from{};
280 
281 template <>
282 struct __sexpr_impl<schedule_from_t> : __schfr::__schedule_from_impl
283 {};
284 } // namespace stdexec
285