1 /*
2  * Copyright (c) 2021-2024 NVIDIA Corporation
3  *
4  * Licensed under the Apache License Version 2.0 with LLVM Exceptions
5  * (the "License"); you may not use this file except in compliance with
6  * the License. You may obtain a copy of the License at
7  *
8  *   https://llvm.org/LICENSE.txt
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
18 #include "__execution_fwd.hpp"
19 
20 // include these after __execution_fwd.hpp
21 #include "__concepts.hpp"
22 #include "__cpo.hpp"
23 #include "__debug.hpp"
24 #include "__diagnostics.hpp"
25 #include "__domain.hpp"
26 #include "__env.hpp"
27 #include "__into_variant.hpp"
28 #include "__meta.hpp"
29 #include "__receivers.hpp"
30 #include "__run_loop.hpp"
31 #include "__senders.hpp"
32 #include "__transform_completion_signatures.hpp"
33 #include "__transform_sender.hpp"
34 #include "__type_traits.hpp"
35 
36 #include <exception>
37 #include <optional>
38 #include <system_error>
39 #include <tuple>
40 #include <variant>
41 
42 namespace stdexec
43 {
44 /////////////////////////////////////////////////////////////////////////////
45 // [execution.senders.consumers.sync_wait]
46 // [execution.senders.consumers.sync_wait_with_variant]
47 namespace __sync_wait
48 {
49 struct __env
50 {
51     run_loop* __loop_ = nullptr;
52 
querystdexec::__sync_wait::__env53     auto query(get_scheduler_t) const noexcept -> run_loop::__scheduler
54     {
55         return __loop_->get_scheduler();
56     }
57 
58     auto
querystdexec::__sync_wait::__env59         query(get_delegatee_scheduler_t) const noexcept -> run_loop::__scheduler
60     {
61         return __loop_->get_scheduler();
62     }
63 
64     // static constexpr auto query(__debug::__is_debug_env_t) noexcept -> bool {
65     //   return true;
66     // }
67 };
68 
69 // What should sync_wait(just_stopped()) return?
70 template <class _Sender, class _Continuation>
71 using __sync_wait_result_impl = //
72     __value_types_of_t<_Sender, __env,
73                        __mtransform<__q<__decay_t>, _Continuation>,
74                        __q<__msingle>>;
75 
76 template <class _Sender>
77 using __sync_wait_result_t =
78     __mtry_eval<__sync_wait_result_impl, _Sender, __qq<std::tuple>>;
79 
80 template <class _Sender>
81 using __sync_wait_with_variant_result_t =
82     __mtry_eval<__sync_wait_result_impl, __result_of<into_variant, _Sender>,
83                 __q<__midentity>>;
84 
85 struct __state
86 {
87     std::exception_ptr __eptr_;
88     run_loop __loop_;
89 };
90 
91 template <class... _Values>
92 struct __receiver
93 {
94     struct __t
95     {
96         using receiver_concept = receiver_t;
97         using __id = __receiver;
98         __state* __state_;
99         std::optional<std::tuple<_Values...>>* __values_;
100 
101         template <class... _As>
102             requires constructible_from<std::tuple<_Values...>, _As...>
set_valuestdexec::__sync_wait::__receiver::__t103         void set_value(_As&&... __as) noexcept
104         {
105             try
106             {
107                 __values_->emplace(static_cast<_As&&>(__as)...);
108             }
109             catch (...)
110             {
111                 __state_->__eptr_ = std::current_exception();
112             }
113             __state_->__loop_.finish();
114         }
115 
116         template <class _Error>
set_errorstdexec::__sync_wait::__receiver::__t117         void set_error(_Error __err) noexcept
118         {
119             if constexpr (__same_as<_Error, std::exception_ptr>)
120             {
121                 STDEXEC_ASSERT(
122                     __err != nullptr); // std::exception_ptr must not be null.
123                 __state_->__eptr_ = static_cast<_Error&&>(__err);
124             }
125             else if constexpr (__same_as<_Error, std::error_code>)
126             {
127                 __state_->__eptr_ =
128                     std::make_exception_ptr(std::system_error(__err));
129             }
130             else
131             {
132                 __state_->__eptr_ =
133                     std::make_exception_ptr(static_cast<_Error&&>(__err));
134             }
135             __state_->__loop_.finish();
136         }
137 
set_stoppedstdexec::__sync_wait::__receiver::__t138         void set_stopped() noexcept
139         {
140             __state_->__loop_.finish();
141         }
142 
get_envstdexec::__sync_wait::__receiver::__t143         auto get_env() const noexcept -> __env
144         {
145             return __env{&__state_->__loop_};
146         }
147     };
148 };
149 
150 template <class _Sender>
151 using __receiver_t = __t<__sync_wait_result_impl<_Sender, __q<__receiver>>>;
152 
153 // These are for hiding the metaprogramming in diagnostics
154 template <class _Sender>
155 struct __sync_receiver_for
156 {
157     using __t = __receiver_t<_Sender>;
158 };
159 template <class _Sender>
160 using __sync_receiver_for_t = __t<__sync_receiver_for<_Sender>>;
161 
162 template <class _Sender>
163 struct __value_tuple_for
164 {
165     using __t = __sync_wait_result_t<_Sender>;
166 };
167 template <class _Sender>
168 using __value_tuple_for_t = __t<__value_tuple_for<_Sender>>;
169 
170 template <class _Sender>
171 struct __variant_for
172 {
173     using __t = __sync_wait_with_variant_result_t<_Sender>;
174 };
175 template <class _Sender>
176 using __variant_for_t = __t<__variant_for<_Sender>>;
177 
178 inline constexpr __mstring __sync_wait_context_diag = //
179     "In stdexec::sync_wait()..."_mstr;
180 inline constexpr __mstring __too_many_successful_completions_diag =
181     "The argument to stdexec::sync_wait() is a sender that can complete successfully in more "
182     "than one way. Use stdexec::sync_wait_with_variant() instead."_mstr;
183 
184 template <__mstring _Context, __mstring _Diagnostic>
185 struct _INVALID_ARGUMENT_TO_SYNC_WAIT_;
186 
187 template <__mstring _Diagnostic>
188 using __invalid_argument_to_sync_wait =
189     _INVALID_ARGUMENT_TO_SYNC_WAIT_<__sync_wait_context_diag, _Diagnostic>;
190 
191 template <__mstring _Diagnostic, class _Sender, class _Env = __env>
192 using __sync_wait_error =
193     __mexception<__invalid_argument_to_sync_wait<_Diagnostic>,
194                  _WITH_SENDER_<_Sender>, _WITH_ENVIRONMENT_<_Env>>;
195 
196 template <class _Sender, class>
197 using __too_many_successful_completions_error =
198     __sync_wait_error<__too_many_successful_completions_diag, _Sender>;
199 
200 template <class _Sender>
201 concept __valid_sync_wait_argument =
202     __ok<__minvoke<__mtry_catch_q<__single_value_variant_sender_t,
203                                   __q<__too_many_successful_completions_error>>,
204                    _Sender, __env>>;
205 
206 #if STDEXEC_NVHPC()
207 // It requires some hoop-jumping to get the NVHPC compiler to report a
208 // meaningful diagnostic for SFINAE failures.
209 template <class _Sender>
__diagnose_error()210 auto __diagnose_error()
211 {
212     if constexpr (!sender_in<_Sender, __env>)
213     {
214         using _Completions = __completion_signatures_of_t<_Sender, __env>;
215         if constexpr (__merror<_Completions>)
216         {
217             return _Completions();
218         }
219         else
220         {
221             constexpr __mstring __diag =
222                 "The stdexec::sender_in<Sender, Environment> concept check has failed."_mstr;
223             return __sync_wait_error<__diag, _Sender>();
224         }
225     }
226     else if constexpr (!__valid_sync_wait_argument<_Sender>)
227     {
228         return __sync_wait_error<__too_many_successful_completions_diag,
229                                  _Sender>();
230     }
231     else if constexpr (!sender_to<_Sender, __sync_receiver_for_t<_Sender>>)
232     {
233         constexpr __mstring __diag =
234             "Failed to connect the given sender to sync_wait's internal receiver. "
235             "The stdexec::connect(Sender, Receiver) expression is ill-formed."_mstr;
236         return __sync_wait_error<__diag, _Sender>();
237     }
238     else
239     {
240         constexpr __mstring __diag = "Unknown concept check failure."_mstr;
241         return __sync_wait_error<__diag, _Sender>();
242     }
243 }
244 
245 template <class _Sender>
246 using __error_description_t =
247     decltype(__sync_wait::__diagnose_error<_Sender>());
248 #endif
249 
250 ////////////////////////////////////////////////////////////////////////////
251 // [execution.senders.consumers.sync_wait]
252 struct sync_wait_t
253 {
254     template <sender_in<__env> _Sender>
255         requires __valid_sync_wait_argument<_Sender> &&
256                      __has_implementation_for<
257                          sync_wait_t, __early_domain_of_t<_Sender>, _Sender>
operator ()stdexec::__sync_wait::sync_wait_t258     auto operator()(_Sender&& __sndr) const
259         -> std::optional<__value_tuple_for_t<_Sender>>
260     {
261         auto __domain = __get_early_domain(__sndr);
262         return stdexec::apply_sender(__domain, *this,
263                                      static_cast<_Sender&&>(__sndr));
264     }
265 
266 #if STDEXEC_NVHPC()
267     // This is needed to get sensible diagnostics from nvc++
268     template <class _Sender, class _Error = __error_description_t<_Sender>>
269     auto operator()(_Sender&&, [[maybe_unused]] _Error __diagnostic = {}) const
270         -> std::optional<std::tuple<int>> = delete;
271 #endif
272 
273     using _Sender = __0;
274     using __legacy_customizations_t = __types<
275         // For legacy reasons:
276         tag_invoke_t(
277             sync_wait_t,
278             get_completion_scheduler_t<set_value_t>(get_env_t(const _Sender&)),
279             _Sender),
280         tag_invoke_t(sync_wait_t, _Sender)>;
281 
282     // clang-format off
283       /// @brief Synchronously wait for the result of a sender, blocking the
284       ///         current thread.
285       ///
286       /// `sync_wait` connects and starts the given sender, and then drives a
287       ///         `run_loop` instance until the sender completes. Additional work
288       ///         can be delegated to the `run_loop` by scheduling work on the
289       ///         scheduler returned by calling `get_delegatee_scheduler` on the
290       ///         receiver's environment.
291       ///
292       /// @pre The sender must have a exactly one value completion signature. That
293       ///         is, it can only complete successfully in one way, with a single
294       ///         set of values.
295       ///
296       /// @retval success Returns an engaged `std::optional` containing the result
297       ///         values in a `std::tuple`.
298       /// @retval canceled Returns an empty `std::optional`.
299       /// @retval error Throws the error.
300       ///
301       /// @throws std::rethrow_exception(error) if the error has type
302       ///         `std::exception_ptr`.
303       /// @throws std::system_error(error) if the error has type
304       ///         `std::error_code`.
305       /// @throws error otherwise
306     // clang-format on
307     template <sender_in<__env> _Sender>
apply_senderstdexec::__sync_wait::sync_wait_t308     auto apply_sender(_Sender&& __sndr) const
309         -> std::optional<__sync_wait_result_t<_Sender>>
310     {
311         __state __local{};
312         std::optional<__sync_wait_result_t<_Sender>> __result{};
313 
314         // Launch the sender with a continuation that will fill in the __result
315         // optional or set the exception_ptr in __local.
316         auto __op_state = connect(static_cast<_Sender&&>(__sndr),
317                                   __receiver_t<_Sender>{&__local, &__result});
318         stdexec::start(__op_state);
319 
320         // Wait for the variant to be filled in.
321         __local.__loop_.run();
322 
323         if (__local.__eptr_)
324         {
325             std::rethrow_exception(
326                 static_cast<std::exception_ptr&&>(__local.__eptr_));
327         }
328 
329         return __result;
330     }
331 };
332 
333 ////////////////////////////////////////////////////////////////////////////
334 // [execution.senders.consumers.sync_wait_with_variant]
335 struct sync_wait_with_variant_t
336 {
337     struct __impl;
338 
339     template <sender_in<__env> _Sender>
340         requires __callable<apply_sender_t, __early_domain_of_t<_Sender>,
341                             sync_wait_with_variant_t, _Sender>
operator ()stdexec::__sync_wait::sync_wait_with_variant_t342     auto operator()(_Sender&& __sndr) const -> decltype(auto)
343     {
344         using __result_t =
345             __call_result_t<apply_sender_t, __early_domain_of_t<_Sender>,
346                             sync_wait_with_variant_t, _Sender>;
347         static_assert(__is_instance_of<__result_t, std::optional>);
348         using __variant_t = typename __result_t::value_type;
349         static_assert(__is_instance_of<__variant_t, std::variant>);
350 
351         auto __domain = __get_early_domain(__sndr);
352         return stdexec::apply_sender(__domain, *this,
353                                      static_cast<_Sender&&>(__sndr));
354     }
355 
356 #if STDEXEC_NVHPC()
357     template <class _Sender, class _Error = __error_description_t<
358                                  __result_of<into_variant, _Sender>>>
359     auto operator()(_Sender&&, [[maybe_unused]] _Error __diagnostic = {}) const
360         -> std::optional<std::tuple<std::variant<std::tuple<>>>> = delete;
361 #endif
362 
363     using _Sender = __0;
364     using __legacy_customizations_t = __types<
365         // For legacy reasons:
366         tag_invoke_t(
367             sync_wait_with_variant_t,
368             get_completion_scheduler_t<set_value_t>(get_env_t(const _Sender&)),
369             _Sender),
370         tag_invoke_t(sync_wait_with_variant_t, _Sender)>;
371 
372     template <class _Sender>
373         requires __callable<sync_wait_t, __result_of<into_variant, _Sender>>
apply_senderstdexec::__sync_wait::sync_wait_with_variant_t374     auto apply_sender(_Sender&& __sndr) const
375         -> std::optional<__variant_for_t<_Sender>>
376     {
377         if (auto __opt_values =
378                 sync_wait_t()(into_variant(static_cast<_Sender&&>(__sndr))))
379         {
380             return std::move(std::get<0>(*__opt_values));
381         }
382         return std::nullopt;
383     }
384 };
385 } // namespace __sync_wait
386 
387 using __sync_wait::sync_wait_t;
388 inline constexpr sync_wait_t sync_wait{};
389 
390 using __sync_wait::sync_wait_with_variant_t;
391 inline constexpr sync_wait_with_variant_t sync_wait_with_variant{};
392 } // namespace stdexec
393