1 /*
2 * Copyright (c) 2021-2024 NVIDIA Corporation
3 *
4 * Licensed under the Apache License Version 2.0 with LLVM Exceptions
5 * (the "License"); you may not use this file except in compliance with
6 * the License. You may obtain a copy of the License at
7 *
8 * https://llvm.org/LICENSE.txt
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #pragma once
17
18 #include "__execution_fwd.hpp"
19
20 // include these after __execution_fwd.hpp
21 #include "__concepts.hpp"
22 #include "__cpo.hpp"
23 #include "__debug.hpp"
24 #include "__diagnostics.hpp"
25 #include "__domain.hpp"
26 #include "__env.hpp"
27 #include "__into_variant.hpp"
28 #include "__meta.hpp"
29 #include "__receivers.hpp"
30 #include "__run_loop.hpp"
31 #include "__senders.hpp"
32 #include "__transform_completion_signatures.hpp"
33 #include "__transform_sender.hpp"
34 #include "__type_traits.hpp"
35
36 #include <exception>
37 #include <optional>
38 #include <system_error>
39 #include <tuple>
40 #include <variant>
41
42 namespace stdexec
43 {
44 /////////////////////////////////////////////////////////////////////////////
45 // [execution.senders.consumers.sync_wait]
46 // [execution.senders.consumers.sync_wait_with_variant]
47 namespace __sync_wait
48 {
49 struct __env
50 {
51 run_loop* __loop_ = nullptr;
52
querystdexec::__sync_wait::__env53 auto query(get_scheduler_t) const noexcept -> run_loop::__scheduler
54 {
55 return __loop_->get_scheduler();
56 }
57
58 auto
querystdexec::__sync_wait::__env59 query(get_delegatee_scheduler_t) const noexcept -> run_loop::__scheduler
60 {
61 return __loop_->get_scheduler();
62 }
63
64 // static constexpr auto query(__debug::__is_debug_env_t) noexcept -> bool {
65 // return true;
66 // }
67 };
68
69 // What should sync_wait(just_stopped()) return?
70 template <class _Sender, class _Continuation>
71 using __sync_wait_result_impl = //
72 __value_types_of_t<_Sender, __env,
73 __mtransform<__q<__decay_t>, _Continuation>,
74 __q<__msingle>>;
75
76 template <class _Sender>
77 using __sync_wait_result_t =
78 __mtry_eval<__sync_wait_result_impl, _Sender, __qq<std::tuple>>;
79
80 template <class _Sender>
81 using __sync_wait_with_variant_result_t =
82 __mtry_eval<__sync_wait_result_impl, __result_of<into_variant, _Sender>,
83 __q<__midentity>>;
84
85 struct __state
86 {
87 std::exception_ptr __eptr_;
88 run_loop __loop_;
89 };
90
91 template <class... _Values>
92 struct __receiver
93 {
94 struct __t
95 {
96 using receiver_concept = receiver_t;
97 using __id = __receiver;
98 __state* __state_;
99 std::optional<std::tuple<_Values...>>* __values_;
100
101 template <class... _As>
102 requires constructible_from<std::tuple<_Values...>, _As...>
set_valuestdexec::__sync_wait::__receiver::__t103 void set_value(_As&&... __as) noexcept
104 {
105 try
106 {
107 __values_->emplace(static_cast<_As&&>(__as)...);
108 }
109 catch (...)
110 {
111 __state_->__eptr_ = std::current_exception();
112 }
113 __state_->__loop_.finish();
114 }
115
116 template <class _Error>
set_errorstdexec::__sync_wait::__receiver::__t117 void set_error(_Error __err) noexcept
118 {
119 if constexpr (__same_as<_Error, std::exception_ptr>)
120 {
121 STDEXEC_ASSERT(
122 __err != nullptr); // std::exception_ptr must not be null.
123 __state_->__eptr_ = static_cast<_Error&&>(__err);
124 }
125 else if constexpr (__same_as<_Error, std::error_code>)
126 {
127 __state_->__eptr_ =
128 std::make_exception_ptr(std::system_error(__err));
129 }
130 else
131 {
132 __state_->__eptr_ =
133 std::make_exception_ptr(static_cast<_Error&&>(__err));
134 }
135 __state_->__loop_.finish();
136 }
137
set_stoppedstdexec::__sync_wait::__receiver::__t138 void set_stopped() noexcept
139 {
140 __state_->__loop_.finish();
141 }
142
get_envstdexec::__sync_wait::__receiver::__t143 auto get_env() const noexcept -> __env
144 {
145 return __env{&__state_->__loop_};
146 }
147 };
148 };
149
150 template <class _Sender>
151 using __receiver_t = __t<__sync_wait_result_impl<_Sender, __q<__receiver>>>;
152
153 // These are for hiding the metaprogramming in diagnostics
154 template <class _Sender>
155 struct __sync_receiver_for
156 {
157 using __t = __receiver_t<_Sender>;
158 };
159 template <class _Sender>
160 using __sync_receiver_for_t = __t<__sync_receiver_for<_Sender>>;
161
162 template <class _Sender>
163 struct __value_tuple_for
164 {
165 using __t = __sync_wait_result_t<_Sender>;
166 };
167 template <class _Sender>
168 using __value_tuple_for_t = __t<__value_tuple_for<_Sender>>;
169
170 template <class _Sender>
171 struct __variant_for
172 {
173 using __t = __sync_wait_with_variant_result_t<_Sender>;
174 };
175 template <class _Sender>
176 using __variant_for_t = __t<__variant_for<_Sender>>;
177
178 inline constexpr __mstring __sync_wait_context_diag = //
179 "In stdexec::sync_wait()..."_mstr;
180 inline constexpr __mstring __too_many_successful_completions_diag =
181 "The argument to stdexec::sync_wait() is a sender that can complete successfully in more "
182 "than one way. Use stdexec::sync_wait_with_variant() instead."_mstr;
183
184 template <__mstring _Context, __mstring _Diagnostic>
185 struct _INVALID_ARGUMENT_TO_SYNC_WAIT_;
186
187 template <__mstring _Diagnostic>
188 using __invalid_argument_to_sync_wait =
189 _INVALID_ARGUMENT_TO_SYNC_WAIT_<__sync_wait_context_diag, _Diagnostic>;
190
191 template <__mstring _Diagnostic, class _Sender, class _Env = __env>
192 using __sync_wait_error =
193 __mexception<__invalid_argument_to_sync_wait<_Diagnostic>,
194 _WITH_SENDER_<_Sender>, _WITH_ENVIRONMENT_<_Env>>;
195
196 template <class _Sender, class>
197 using __too_many_successful_completions_error =
198 __sync_wait_error<__too_many_successful_completions_diag, _Sender>;
199
200 template <class _Sender>
201 concept __valid_sync_wait_argument =
202 __ok<__minvoke<__mtry_catch_q<__single_value_variant_sender_t,
203 __q<__too_many_successful_completions_error>>,
204 _Sender, __env>>;
205
206 #if STDEXEC_NVHPC()
207 // It requires some hoop-jumping to get the NVHPC compiler to report a
208 // meaningful diagnostic for SFINAE failures.
209 template <class _Sender>
__diagnose_error()210 auto __diagnose_error()
211 {
212 if constexpr (!sender_in<_Sender, __env>)
213 {
214 using _Completions = __completion_signatures_of_t<_Sender, __env>;
215 if constexpr (__merror<_Completions>)
216 {
217 return _Completions();
218 }
219 else
220 {
221 constexpr __mstring __diag =
222 "The stdexec::sender_in<Sender, Environment> concept check has failed."_mstr;
223 return __sync_wait_error<__diag, _Sender>();
224 }
225 }
226 else if constexpr (!__valid_sync_wait_argument<_Sender>)
227 {
228 return __sync_wait_error<__too_many_successful_completions_diag,
229 _Sender>();
230 }
231 else if constexpr (!sender_to<_Sender, __sync_receiver_for_t<_Sender>>)
232 {
233 constexpr __mstring __diag =
234 "Failed to connect the given sender to sync_wait's internal receiver. "
235 "The stdexec::connect(Sender, Receiver) expression is ill-formed."_mstr;
236 return __sync_wait_error<__diag, _Sender>();
237 }
238 else
239 {
240 constexpr __mstring __diag = "Unknown concept check failure."_mstr;
241 return __sync_wait_error<__diag, _Sender>();
242 }
243 }
244
245 template <class _Sender>
246 using __error_description_t =
247 decltype(__sync_wait::__diagnose_error<_Sender>());
248 #endif
249
250 ////////////////////////////////////////////////////////////////////////////
251 // [execution.senders.consumers.sync_wait]
252 struct sync_wait_t
253 {
254 template <sender_in<__env> _Sender>
255 requires __valid_sync_wait_argument<_Sender> &&
256 __has_implementation_for<
257 sync_wait_t, __early_domain_of_t<_Sender>, _Sender>
operator ()stdexec::__sync_wait::sync_wait_t258 auto operator()(_Sender&& __sndr) const
259 -> std::optional<__value_tuple_for_t<_Sender>>
260 {
261 auto __domain = __get_early_domain(__sndr);
262 return stdexec::apply_sender(__domain, *this,
263 static_cast<_Sender&&>(__sndr));
264 }
265
266 #if STDEXEC_NVHPC()
267 // This is needed to get sensible diagnostics from nvc++
268 template <class _Sender, class _Error = __error_description_t<_Sender>>
269 auto operator()(_Sender&&, [[maybe_unused]] _Error __diagnostic = {}) const
270 -> std::optional<std::tuple<int>> = delete;
271 #endif
272
273 using _Sender = __0;
274 using __legacy_customizations_t = __types<
275 // For legacy reasons:
276 tag_invoke_t(
277 sync_wait_t,
278 get_completion_scheduler_t<set_value_t>(get_env_t(const _Sender&)),
279 _Sender),
280 tag_invoke_t(sync_wait_t, _Sender)>;
281
282 // clang-format off
283 /// @brief Synchronously wait for the result of a sender, blocking the
284 /// current thread.
285 ///
286 /// `sync_wait` connects and starts the given sender, and then drives a
287 /// `run_loop` instance until the sender completes. Additional work
288 /// can be delegated to the `run_loop` by scheduling work on the
289 /// scheduler returned by calling `get_delegatee_scheduler` on the
290 /// receiver's environment.
291 ///
292 /// @pre The sender must have a exactly one value completion signature. That
293 /// is, it can only complete successfully in one way, with a single
294 /// set of values.
295 ///
296 /// @retval success Returns an engaged `std::optional` containing the result
297 /// values in a `std::tuple`.
298 /// @retval canceled Returns an empty `std::optional`.
299 /// @retval error Throws the error.
300 ///
301 /// @throws std::rethrow_exception(error) if the error has type
302 /// `std::exception_ptr`.
303 /// @throws std::system_error(error) if the error has type
304 /// `std::error_code`.
305 /// @throws error otherwise
306 // clang-format on
307 template <sender_in<__env> _Sender>
apply_senderstdexec::__sync_wait::sync_wait_t308 auto apply_sender(_Sender&& __sndr) const
309 -> std::optional<__sync_wait_result_t<_Sender>>
310 {
311 __state __local{};
312 std::optional<__sync_wait_result_t<_Sender>> __result{};
313
314 // Launch the sender with a continuation that will fill in the __result
315 // optional or set the exception_ptr in __local.
316 auto __op_state = connect(static_cast<_Sender&&>(__sndr),
317 __receiver_t<_Sender>{&__local, &__result});
318 stdexec::start(__op_state);
319
320 // Wait for the variant to be filled in.
321 __local.__loop_.run();
322
323 if (__local.__eptr_)
324 {
325 std::rethrow_exception(
326 static_cast<std::exception_ptr&&>(__local.__eptr_));
327 }
328
329 return __result;
330 }
331 };
332
333 ////////////////////////////////////////////////////////////////////////////
334 // [execution.senders.consumers.sync_wait_with_variant]
335 struct sync_wait_with_variant_t
336 {
337 struct __impl;
338
339 template <sender_in<__env> _Sender>
340 requires __callable<apply_sender_t, __early_domain_of_t<_Sender>,
341 sync_wait_with_variant_t, _Sender>
operator ()stdexec::__sync_wait::sync_wait_with_variant_t342 auto operator()(_Sender&& __sndr) const -> decltype(auto)
343 {
344 using __result_t =
345 __call_result_t<apply_sender_t, __early_domain_of_t<_Sender>,
346 sync_wait_with_variant_t, _Sender>;
347 static_assert(__is_instance_of<__result_t, std::optional>);
348 using __variant_t = typename __result_t::value_type;
349 static_assert(__is_instance_of<__variant_t, std::variant>);
350
351 auto __domain = __get_early_domain(__sndr);
352 return stdexec::apply_sender(__domain, *this,
353 static_cast<_Sender&&>(__sndr));
354 }
355
356 #if STDEXEC_NVHPC()
357 template <class _Sender, class _Error = __error_description_t<
358 __result_of<into_variant, _Sender>>>
359 auto operator()(_Sender&&, [[maybe_unused]] _Error __diagnostic = {}) const
360 -> std::optional<std::tuple<std::variant<std::tuple<>>>> = delete;
361 #endif
362
363 using _Sender = __0;
364 using __legacy_customizations_t = __types<
365 // For legacy reasons:
366 tag_invoke_t(
367 sync_wait_with_variant_t,
368 get_completion_scheduler_t<set_value_t>(get_env_t(const _Sender&)),
369 _Sender),
370 tag_invoke_t(sync_wait_with_variant_t, _Sender)>;
371
372 template <class _Sender>
373 requires __callable<sync_wait_t, __result_of<into_variant, _Sender>>
apply_senderstdexec::__sync_wait::sync_wait_with_variant_t374 auto apply_sender(_Sender&& __sndr) const
375 -> std::optional<__variant_for_t<_Sender>>
376 {
377 if (auto __opt_values =
378 sync_wait_t()(into_variant(static_cast<_Sender&&>(__sndr))))
379 {
380 return std::move(std::get<0>(*__opt_values));
381 }
382 return std::nullopt;
383 }
384 };
385 } // namespace __sync_wait
386
387 using __sync_wait::sync_wait_t;
388 inline constexpr sync_wait_t sync_wait{};
389
390 using __sync_wait::sync_wait_with_variant_t;
391 inline constexpr sync_wait_with_variant_t sync_wait_with_variant{};
392 } // namespace stdexec
393