xref: /openbmc/sdbusplus/include/sdbusplus/async/stdexec/__detail/__schedule_from.hpp (revision 10d0b4b7d1498cfd5c3d37edea271a54d1984e41)
1 /*
2  * Copyright (c) 2021-2024 NVIDIA Corporation
3  *
4  * Licensed under the Apache License Version 2.0 with LLVM Exceptions
5  * (the "License"); you may not use this file except in compliance with
6  * the License. You may obtain a copy of the License at
7  *
8  *   https://llvm.org/LICENSE.txt
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
18 #include "__execution_fwd.hpp"
19 
20 // include these after __execution_fwd.hpp
21 #include "__basic_sender.hpp"
22 #include "__concepts.hpp"
23 #include "__domain.hpp"
24 #include "__env.hpp"
25 #include "__meta.hpp"
26 #include "__operation_states.hpp"
27 #include "__senders.hpp"
28 #include "__schedulers.hpp"
29 #include "__transform_completion_signatures.hpp"
30 #include "__tuple.hpp"
31 #include "__variant.hpp"
32 
33 namespace stdexec {
34   /////////////////////////////////////////////////////////////////////////////
35   // [execution.senders.adaptors.schedule_from]
36   namespace __schfr {
37     // Compute a variant type that is capable of storing the results of the
38     // input sender when it completes. The variant has type:
39     //   variant<
40     //     monostate,
41     //     tuple<set_stopped_t>,
42     //     tuple<set_value_t, __decay_t<_Values1>...>,
43     //     tuple<set_value_t, __decay_t<_Values2>...>,
44     //        ...
45     //     tuple<set_error_t, __decay_t<_Error1>>,
46     //     tuple<set_error_t, __decay_t<_Error2>>,
47     //        ...
48     //   >
49     template <class _CvrefSender, class _Env>
50     using __results_of = __for_each_completion_signature<
51       __completion_signatures_of_t<_CvrefSender, _Env>,
52       __decayed_tuple,
53       __munique<__qq<stdexec::__variant_for>>::__f
54     >;
55 
56     template <class... _Values>
57     using __decay_value_sig = set_value_t (*)(__decay_t<_Values>...);
58 
59     template <class _Error>
60     using __decay_error_sig = set_error_t (*)(__decay_t<_Error>);
61 
62     template <class _Scheduler, class _Completions, class... _Env>
63     using __completions_impl_t = __mtry_q<__concat_completion_signatures>::__f<
64       __transform_completion_signatures<
65         _Completions,
66         __decay_value_sig,
67         __decay_error_sig,
68         set_stopped_t (*)(),
69         __completion_signature_ptrs
70       >,
71       transform_completion_signatures<
72         __completion_signatures_of_t<schedule_result_t<_Scheduler>, _Env...>,
73         __eptr_completion_if_t<__nothrow_decay_copyable_results_t<_Completions>>,
74         __mconst<completion_signatures<>>::__f
75       >
76     >;
77 
78     template <class _Scheduler, class _CvrefSender, class... _Env>
79     using __completions_t =
80       __completions_impl_t<_Scheduler, __completion_signatures_of_t<_CvrefSender, _Env...>, _Env...>;
81 
82     template <class _Scheduler, class _Sexpr, class _Receiver>
83     struct __state;
84 
85     template <class _State>
STDEXEC_ATTRIBUTE(always_inline)86     STDEXEC_ATTRIBUTE(always_inline)
87     auto __make_visitor_fn(_State* __state) noexcept {
88       return [__state]<class _Tup>(_Tup& __tupl) noexcept -> void {
89         __tupl.apply(
90           [&]<class... _Args>(auto __tag, _Args&... __args) noexcept -> void {
91             __tag(std::move(__state->__receiver()), static_cast<_Args&&>(__args)...);
92           },
93           __tupl);
94       };
95     }
96 
97     // This receiver is to be completed on the execution context associated with the scheduler. When
98     // the source sender completes, the completion information is saved off in the operation state
99     // so that when this receiver completes, it can read the completion out of the operation state
100     // and forward it to the output receiver after transitioning to the scheduler's context.
101     template <class _SchedulerId, class _SexprId, class _ReceiverId>
102     struct __rcvr2 {
103       using _Scheduler = stdexec::__t<_SchedulerId>;
104       using _Sexpr = stdexec::__t<_SexprId>;
105       using _Receiver = stdexec::__t<_ReceiverId>;
106 
107       struct __t {
108         using receiver_concept = receiver_t;
109         using __id = __rcvr2;
110 
set_valuestdexec::__schfr::__rcvr2::__t111         void set_value() noexcept {
112           __state_->__data_.visit(__schfr::__make_visitor_fn(__state_), __state_->__data_);
113         }
114 
115         template <class _Error>
set_errorstdexec::__schfr::__rcvr2::__t116         void set_error(_Error&& __err) noexcept {
117           stdexec::set_error(
118             static_cast<_Receiver&&>(__state_->__receiver()), static_cast<_Error&&>(__err));
119         }
120 
set_stoppedstdexec::__schfr::__rcvr2::__t121         void set_stopped() noexcept {
122           stdexec::set_stopped(static_cast<_Receiver&&>(__state_->__receiver()));
123         }
124 
get_envstdexec::__schfr::__rcvr2::__t125         auto get_env() const noexcept -> env_of_t<_Receiver> {
126           return stdexec::get_env(__state_->__receiver());
127         }
128 
129         __state<_Scheduler, _Sexpr, _Receiver>* __state_;
130       };
131     };
132 
133     template <class _Scheduler, class _Sexpr, class _Receiver>
134     using __receiver2 = __t<__rcvr2<__id<_Scheduler>, __id<_Sexpr>, __id<_Receiver>>>;
135 
136     template <class _Scheduler, class _Sexpr, class _Receiver>
137     struct __state
138       : __enable_receiver_from_this<_Sexpr, _Receiver, __state<_Scheduler, _Sexpr, _Receiver>>
139       , __immovable {
140       using __variant_t = __results_of<__child_of<_Sexpr>, env_of_t<_Receiver>>;
141       using __receiver2_t = __receiver2<_Scheduler, _Sexpr, _Receiver>;
142 
143       __variant_t __data_;
144       connect_result_t<schedule_result_t<_Scheduler>, __receiver2_t> __state2_;
145 
__statestdexec::__schfr::__state146       explicit __state(_Scheduler __sched)
147         : __data_()
148         , __state2_(connect(schedule(__sched), __receiver2_t{this})) {
149       }
150     };
151 
152     struct schedule_from_t {
153       template <scheduler _Scheduler, sender _Sender>
operator ()stdexec::__schfr::schedule_from_t154       auto operator()(_Scheduler __sched, _Sender&& __sndr) const -> __well_formed_sender auto {
155         auto __domain = query_or(get_domain, __sched, default_domain());
156         return stdexec::transform_sender(
157           __domain,
158           __make_sexpr<schedule_from_t>(
159             static_cast<_Scheduler&&>(__sched), static_cast<_Sender&&>(__sndr)));
160       }
161     };
162 
163     struct __schedule_from_impl : __sexpr_defaults {
164       template <class _Sender>
165       using __scheduler_t =
166         __decay_t<__call_result_t<get_completion_scheduler_t<set_value_t>, env_of_t<_Sender>>>;
167 
168       static constexpr auto get_attrs = []<class _Data, class _Child>(
169                                           const _Data& __data,
170                                           const _Child& __child) noexcept {
171         auto __domain = query_or(get_domain, __data, default_domain{});
172         return __env::__join(__sched_attrs{std::cref(__data), __domain}, stdexec::get_env(__child));
173       };
174 
175       static constexpr auto get_completion_signatures =
176         []<class _Sender, class... _Env>(_Sender&&, _Env&&...) noexcept
177         -> __completions_t<__scheduler_t<_Sender>, __child_of<_Sender>, _Env...> {
178         static_assert(sender_expr_for<_Sender, schedule_from_t>);
179         return {};
180       };
181 
182       static constexpr auto get_state =
183         []<class _Sender, class _Receiver>(_Sender&& __sndr, _Receiver&) {
184           static_assert(sender_expr_for<_Sender, schedule_from_t>);
185           auto __sched = get_completion_scheduler<set_value_t>(stdexec::get_env(__sndr));
186           using _Scheduler = decltype(__sched);
187           return __state<_Scheduler, _Sender, _Receiver>{__sched};
188         };
189 
190       static constexpr auto complete =
191         []<class _State, class _Receiver, class _Tag, class... _Args>(
192           __ignore,
193           _State& __state,
194           _Receiver& __rcvr,
195           _Tag __tag,
196           _Args&&... __args) noexcept -> void {
197         // Write the tag and the args into the operation state so that we can forward the completion
198         // from within the scheduler's execution context.
199         if constexpr (__nothrow_callable<__tup::__mktuple_t, _Tag, _Args...>) {
200           __state.__data_.emplace_from(__tup::__mktuple, __tag, static_cast<_Args&&>(__args)...);
201         } else {
202           STDEXEC_TRY {
203             __state.__data_.emplace_from(__tup::__mktuple, __tag, static_cast<_Args&&>(__args)...);
204           }
205           STDEXEC_CATCH_ALL {
206             stdexec::set_error(static_cast<_Receiver&&>(__rcvr), std::current_exception());
207             return;
208           }
209         }
210 
211         // Enqueue the schedule operation so the completion happens on the scheduler's execution
212         // context.
213         stdexec::start(__state.__state2_);
214       };
215     };
216   } // namespace __schfr
217 
218   using __schfr::schedule_from_t;
219   inline constexpr schedule_from_t schedule_from{};
220 
221   template <>
222   struct __sexpr_impl<schedule_from_t> : __schfr::__schedule_from_impl { };
223 } // namespace stdexec
224