xref: /openbmc/sdbusplus/include/sdbusplus/async/stdexec/__detail/__sync_wait.hpp (revision 10d0b4b7d1498cfd5c3d37edea271a54d1984e41)
1 /*
2  * Copyright (c) 2021-2024 NVIDIA Corporation
3  *
4  * Licensed under the Apache License Version 2.0 with LLVM Exceptions
5  * (the "License"); you may not use this file except in compliance with
6  * the License. You may obtain a copy of the License at
7  *
8  *   https://llvm.org/LICENSE.txt
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
18 #include "__execution_fwd.hpp"
19 
20 // include these after __execution_fwd.hpp
21 #include "__concepts.hpp"
22 #include "__debug.hpp" // IWYU pragma: keep
23 #include "__diagnostics.hpp"
24 #include "__domain.hpp"
25 #include "__env.hpp"
26 #include "__into_variant.hpp"
27 #include "__meta.hpp"
28 #include "__senders.hpp"
29 #include "__receivers.hpp"
30 #include "__transform_completion_signatures.hpp"
31 #include "__transform_sender.hpp"
32 #include "__run_loop.hpp"
33 #include "__type_traits.hpp"
34 
35 #include <exception>
36 #include <system_error>
37 #include <optional>
38 #include <tuple>
39 #include <variant>
40 
41 namespace stdexec {
42   /////////////////////////////////////////////////////////////////////////////
43   // [execution.senders.consumers.sync_wait]
44   // [execution.senders.consumers.sync_wait_with_variant]
45   namespace __sync_wait {
46     struct __env {
47       using __t = __env;
48       using __id = __env;
49 
50       run_loop* __loop_ = nullptr;
51 
52       [[nodiscard]]
querystdexec::__sync_wait::__env53       auto query(get_scheduler_t) const noexcept -> run_loop::__scheduler {
54         return __loop_->get_scheduler();
55       }
56 
57       [[nodiscard]]
querystdexec::__sync_wait::__env58       auto query(get_delegation_scheduler_t) const noexcept -> run_loop::__scheduler {
59         return __loop_->get_scheduler();
60       }
61 
62       [[nodiscard]]
querystdexec::__sync_wait::__env63       constexpr auto query(__root_t) const noexcept -> bool {
64         return true;
65       }
66 
67       // static constexpr auto query(__debug::__is_debug_env_t) noexcept -> bool
68       // {
69       //   return true;
70       // }
71     };
72 
73     // What should sync_wait(just_stopped()) return?
74     template <class _Sender, class _Continuation>
75     using __sync_wait_result_impl = __value_types_of_t<
76       _Sender,
77       __env,
78       __mtransform<__q<__decay_t>, _Continuation>,
79       __q<__msingle>
80     >;
81 
82     template <class _Sender>
83     using __sync_wait_result_t = __mtry_eval<__sync_wait_result_impl, _Sender, __qq<std::tuple>>;
84 
85     template <class _Sender>
86     using __sync_wait_with_variant_result_t =
87       __mtry_eval<__sync_wait_result_impl, __result_of<into_variant, _Sender>, __q<__midentity>>;
88 
89     struct __state {
90       std::exception_ptr __eptr_;
91       run_loop __loop_;
92     };
93 
94     template <class... _Values>
95     struct __receiver {
96       struct __t {
97         using receiver_concept = receiver_t;
98         using __id = __receiver;
99         __state* __state_;
100         std::optional<std::tuple<_Values...>>* __values_;
101 
102         template <class... _As>
103           requires constructible_from<std::tuple<_Values...>, _As...>
set_valuestdexec::__sync_wait::__receiver::__t104         void set_value(_As&&... __as) noexcept {
105           STDEXEC_TRY {
106             __values_->emplace(static_cast<_As&&>(__as)...);
107           }
108           STDEXEC_CATCH_ALL {
109             __state_->__eptr_ = std::current_exception();
110           }
111           __state_->__loop_.finish();
112         }
113 
114         template <class _Error>
set_errorstdexec::__sync_wait::__receiver::__t115         void set_error(_Error __err) noexcept {
116           if constexpr (__same_as<_Error, std::exception_ptr>) {
117             STDEXEC_ASSERT(__err != nullptr); // std::exception_ptr must not be null.
118             __state_->__eptr_ = static_cast<_Error&&>(__err);
119           } else if constexpr (__same_as<_Error, std::error_code>) {
120             __state_->__eptr_ = std::make_exception_ptr(std::system_error(__err));
121           } else {
122             __state_->__eptr_ = std::make_exception_ptr(static_cast<_Error&&>(__err));
123           }
124           __state_->__loop_.finish();
125         }
126 
set_stoppedstdexec::__sync_wait::__receiver::__t127         void set_stopped() noexcept {
128           __state_->__loop_.finish();
129         }
130 
131         [[nodiscard]]
get_envstdexec::__sync_wait::__receiver::__t132         auto get_env() const noexcept -> __env {
133           return __env{&__state_->__loop_};
134         }
135       };
136     };
137 
138     template <class _Sender>
139     using __receiver_t = __t<__sync_wait_result_impl<_Sender, __q<__receiver>>>;
140 
141     // These are for hiding the metaprogramming in diagnostics
142     template <class _Sender>
143     struct __sync_receiver_for {
144       using __t = __receiver_t<_Sender>;
145     };
146     template <class _Sender>
147     using __sync_receiver_for_t = __t<__sync_receiver_for<_Sender>>;
148 
149     template <class _Sender>
150     struct __value_tuple_for {
151       using __t = __sync_wait_result_t<_Sender>;
152     };
153     template <class _Sender>
154     using __value_tuple_for_t = __t<__value_tuple_for<_Sender>>;
155 
156     template <class _Sender>
157     struct __variant_for {
158       using __t = __sync_wait_with_variant_result_t<_Sender>;
159     };
160     template <class _Sender>
161     using __variant_for_t = __t<__variant_for<_Sender>>;
162 
163     inline constexpr __mstring __sync_wait_context_diag = "In stdexec::sync_wait()..."_mstr;
164     inline constexpr __mstring __too_many_successful_completions_diag =
165       "The argument to stdexec::sync_wait() is a sender that can complete successfully in more "
166       "than one way. Use stdexec::sync_wait_with_variant() instead."_mstr;
167 
168     template <__mstring _Context, __mstring _Diagnostic>
169     struct _INVALID_ARGUMENT_TO_SYNC_WAIT_;
170 
171     template <__mstring _Diagnostic>
172     using __invalid_argument_to_sync_wait =
173       _INVALID_ARGUMENT_TO_SYNC_WAIT_<__sync_wait_context_diag, _Diagnostic>;
174 
175     template <__mstring _Diagnostic, class _Sender, class _Env = __env>
176     using __sync_wait_error = __mexception<
177       __invalid_argument_to_sync_wait<_Diagnostic>,
178       _WITH_SENDER_<_Sender>,
179       _WITH_ENVIRONMENT_<_Env>
180     >;
181 
182     template <class _Sender, class>
183     using __too_many_successful_completions_error =
184       __sync_wait_error<__too_many_successful_completions_diag, _Sender>;
185 
186     template <class _Sender>
187     concept __valid_sync_wait_argument = __ok<__minvoke<
188       __mtry_catch_q<__single_value_variant_sender_t, __q<__too_many_successful_completions_error>>,
189       _Sender,
190       __env
191     >>;
192 
193     ////////////////////////////////////////////////////////////////////////////
194     // [execution.senders.consumers.sync_wait]
195     struct sync_wait_t {
196       template <class _Sender>
operator ()stdexec::__sync_wait::sync_wait_t197       auto operator()(_Sender&& __sndr) const {
198         if constexpr (!sender_in<_Sender, __env>) {
199           stdexec::__diagnose_sender_concept_failure<_Sender, __env>();
200         } else {
201           using __early_domain_t = __early_domain_of_t<_Sender>;
202           using __domain_t = __late_domain_of_t<_Sender, __env, __early_domain_t>;
203           constexpr auto __success_completion_count =
204             __v<value_types_of_t<_Sender, __env, __types, __msize::__f>>;
205           static_assert(
206             __success_completion_count != 0,
207             "The argument to stdexec::sync_wait() is a sender that cannot complete successfully. "
208             "stdexec::sync_wait() requires a sender that can complete successfully in exactly one "
209             "way. In other words, the sender's completion signatures must include exactly one "
210             "signature of the form `set_value_t(value-types...)`.");
211           static_assert(
212             __success_completion_count <= 1,
213             "The sender passed to stdexec::sync_wait() can complete successfully in "
214             "more than one way. Use stdexec::sync_wait_with_variant() instead.");
215           if constexpr (1 == __success_completion_count) {
216             using __sync_wait_receiver = __receiver_t<_Sender>;
217             constexpr bool __no_custom_sync_wait = __same_as<__domain_t, default_domain>;
218             if constexpr (__no_custom_sync_wait && sender_to<_Sender, __sync_wait_receiver>) {
219               // using __connect_result = connect_result_t<_Sender, __sync_wait_receiver>;
220               // if constexpr (!operation_state<__connect_result>) {
221               //   static_assert(
222               //     operation_state<__connect_result>,
223               //     "The `connect` member function of the sender passed to stdexec::sync_wait() does "
224               //     "not return an operation state. An operation state is required to have a "
225               //     "no-throw .start() member function.");
226               // } else
227               {
228                 // success path, dispatch to the default domain's sync_wait
229                 return default_domain().apply_sender(*this, static_cast<_Sender&&>(__sndr));
230               }
231             } else if constexpr (__no_custom_sync_wait) {
232               static_assert(
233                 sender_to<_Sender, __sync_wait_receiver>,
234                 STDEXEC_ERROR_SYNC_WAIT_CANNOT_CONNECT_SENDER_TO_RECEIVER);
235             } else if constexpr (!__has_implementation_for<sync_wait_t, __domain_t, _Sender>) {
236               static_assert(
237                 __has_implementation_for<sync_wait_t, __domain_t, _Sender>,
238                 "The sender passed to stdexec::sync_wait() has a domain that does not provide a "
239                 "usable implementation for sync_wait().");
240             } else {
241               // success path, dispatch to the custom domain's sync_wait
242               return stdexec::apply_sender(__domain_t(), *this, static_cast<_Sender&&>(__sndr));
243             }
244           }
245         }
246       }
247 
248       // clang-format off
249       /// @brief Synchronously wait for the result of a sender, blocking the
250       ///         current thread.
251       ///
252       /// `sync_wait` connects and starts the given sender, and then drives a
253       ///         `run_loop` instance until the sender completes. Additional work
254       ///         can be delegated to the `run_loop` by scheduling work on the
255       ///         scheduler returned by calling `get_delegation_scheduler` on the
256       ///         receiver's environment.
257       ///
258       /// @pre The sender must have a exactly one value completion signature. That
259       ///         is, it can only complete successfully in one way, with a single
260       ///         set of values.
261       ///
262       /// @retval success Returns an engaged `std::optional` containing the result
263       ///         values in a `std::tuple`.
264       /// @retval canceled Returns an empty `std::optional`.
265       /// @retval error Throws the error.
266       ///
267       /// @throws std::rethrow_exception(error) if the error has type
268       ///         `std::exception_ptr`.
269       /// @throws std::system_error(error) if the error has type
270       ///         `std::error_code`.
271       /// @throws error otherwise
272       // clang-format on
273 
274       template <sender_in<__env> _Sender>
apply_senderstdexec::__sync_wait::sync_wait_t275       auto apply_sender(_Sender&& __sndr) const -> std::optional<__sync_wait_result_t<_Sender>> {
276         __state __local_state{};
277         std::optional<__sync_wait_result_t<_Sender>> __result{};
278 
279         // Launch the sender with a continuation that will fill in the __result optional or set the
280         // exception_ptr in __local_state.
281         [[maybe_unused]]
282         auto __op = stdexec::connect(
283           static_cast<_Sender&&>(__sndr), __receiver_t<_Sender>{&__local_state, &__result});
284         stdexec::start(__op);
285 
286         // Wait for the variant to be filled in.
287         __local_state.__loop_.run();
288 
289         if (__local_state.__eptr_) {
290           std::rethrow_exception(static_cast<std::exception_ptr&&>(__local_state.__eptr_));
291         }
292 
293         return __result;
294       }
295     };
296 
297     ////////////////////////////////////////////////////////////////////////////
298     // [execution.senders.consumers.sync_wait_with_variant]
299     struct sync_wait_with_variant_t {
300       struct __impl;
301 
302       template <sender_in<__env> _Sender>
303         requires __callable<
304           apply_sender_t,
305           __early_domain_of_t<_Sender>,
306           sync_wait_with_variant_t,
307           _Sender
308         >
operator ()stdexec::__sync_wait::sync_wait_with_variant_t309       auto operator()(_Sender&& __sndr) const -> decltype(auto) {
310         using __result_t = __call_result_t<
311           apply_sender_t,
312           __early_domain_of_t<_Sender>,
313           sync_wait_with_variant_t,
314           _Sender
315         >;
316         static_assert(__is_instance_of<__result_t, std::optional>);
317         using __variant_t = __result_t::value_type;
318         static_assert(__is_instance_of<__variant_t, std::variant>);
319 
320         using _Domain = __late_domain_of_t<_Sender, __env>;
321         return stdexec::apply_sender(_Domain(), *this, static_cast<_Sender&&>(__sndr));
322       }
323 
324       template <class _Sender>
325         requires __callable<sync_wait_t, __result_of<into_variant, _Sender>>
apply_senderstdexec::__sync_wait::sync_wait_with_variant_t326       auto apply_sender(_Sender&& __sndr) const -> std::optional<__variant_for_t<_Sender>> {
327         if (auto __opt_values = sync_wait_t()(into_variant(static_cast<_Sender&&>(__sndr)))) {
328           return std::move(std::get<0>(*__opt_values));
329         }
330         return std::nullopt;
331       }
332     };
333   } // namespace __sync_wait
334 
335   using __sync_wait::sync_wait_t;
336   inline constexpr sync_wait_t sync_wait{};
337 
338   using __sync_wait::sync_wait_with_variant_t;
339   inline constexpr sync_wait_with_variant_t sync_wait_with_variant{};
340 } // namespace stdexec
341