1 /* 2 * Copyright (c) 2021-2024 NVIDIA Corporation 3 * 4 * Licensed under the Apache License Version 2.0 with LLVM Exceptions 5 * (the "License"); you may not use this file except in compliance with 6 * the License. You may obtain a copy of the License at 7 * 8 * https://llvm.org/LICENSE.txt 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 #pragma once 17 18 #include "__execution_fwd.hpp" 19 20 // include these after __execution_fwd.hpp 21 #include "__concepts.hpp" 22 #include "__debug.hpp" // IWYU pragma: keep 23 #include "__diagnostics.hpp" 24 #include "__domain.hpp" 25 #include "__env.hpp" 26 #include "__into_variant.hpp" 27 #include "__meta.hpp" 28 #include "__senders.hpp" 29 #include "__receivers.hpp" 30 #include "__transform_completion_signatures.hpp" 31 #include "__transform_sender.hpp" 32 #include "__run_loop.hpp" 33 #include "__type_traits.hpp" 34 35 #include <exception> 36 #include <system_error> 37 #include <optional> 38 #include <tuple> 39 #include <variant> 40 41 namespace stdexec { 42 ///////////////////////////////////////////////////////////////////////////// 43 // [execution.senders.consumers.sync_wait] 44 // [execution.senders.consumers.sync_wait_with_variant] 45 namespace __sync_wait { 46 struct __env { 47 using __t = __env; 48 using __id = __env; 49 50 run_loop* __loop_ = nullptr; 51 52 [[nodiscard]] querystdexec::__sync_wait::__env53 auto query(get_scheduler_t) const noexcept -> run_loop::__scheduler { 54 return __loop_->get_scheduler(); 55 } 56 57 [[nodiscard]] querystdexec::__sync_wait::__env58 auto query(get_delegation_scheduler_t) const noexcept -> run_loop::__scheduler { 59 return __loop_->get_scheduler(); 60 } 61 62 [[nodiscard]] querystdexec::__sync_wait::__env63 constexpr auto query(__root_t) const noexcept -> bool { 64 return true; 65 } 66 67 // static constexpr auto query(__debug::__is_debug_env_t) noexcept -> bool 68 // { 69 // return true; 70 // } 71 }; 72 73 // What should sync_wait(just_stopped()) return? 74 template <class _Sender, class _Continuation> 75 using __sync_wait_result_impl = __value_types_of_t< 76 _Sender, 77 __env, 78 __mtransform<__q<__decay_t>, _Continuation>, 79 __q<__msingle> 80 >; 81 82 template <class _Sender> 83 using __sync_wait_result_t = __mtry_eval<__sync_wait_result_impl, _Sender, __qq<std::tuple>>; 84 85 template <class _Sender> 86 using __sync_wait_with_variant_result_t = 87 __mtry_eval<__sync_wait_result_impl, __result_of<into_variant, _Sender>, __q<__midentity>>; 88 89 struct __state { 90 std::exception_ptr __eptr_; 91 run_loop __loop_; 92 }; 93 94 template <class... _Values> 95 struct __receiver { 96 struct __t { 97 using receiver_concept = receiver_t; 98 using __id = __receiver; 99 __state* __state_; 100 std::optional<std::tuple<_Values...>>* __values_; 101 102 template <class... _As> 103 requires constructible_from<std::tuple<_Values...>, _As...> set_valuestdexec::__sync_wait::__receiver::__t104 void set_value(_As&&... __as) noexcept { 105 STDEXEC_TRY { 106 __values_->emplace(static_cast<_As&&>(__as)...); 107 } 108 STDEXEC_CATCH_ALL { 109 __state_->__eptr_ = std::current_exception(); 110 } 111 __state_->__loop_.finish(); 112 } 113 114 template <class _Error> set_errorstdexec::__sync_wait::__receiver::__t115 void set_error(_Error __err) noexcept { 116 if constexpr (__same_as<_Error, std::exception_ptr>) { 117 STDEXEC_ASSERT(__err != nullptr); // std::exception_ptr must not be null. 118 __state_->__eptr_ = static_cast<_Error&&>(__err); 119 } else if constexpr (__same_as<_Error, std::error_code>) { 120 __state_->__eptr_ = std::make_exception_ptr(std::system_error(__err)); 121 } else { 122 __state_->__eptr_ = std::make_exception_ptr(static_cast<_Error&&>(__err)); 123 } 124 __state_->__loop_.finish(); 125 } 126 set_stoppedstdexec::__sync_wait::__receiver::__t127 void set_stopped() noexcept { 128 __state_->__loop_.finish(); 129 } 130 131 [[nodiscard]] get_envstdexec::__sync_wait::__receiver::__t132 auto get_env() const noexcept -> __env { 133 return __env{&__state_->__loop_}; 134 } 135 }; 136 }; 137 138 template <class _Sender> 139 using __receiver_t = __t<__sync_wait_result_impl<_Sender, __q<__receiver>>>; 140 141 // These are for hiding the metaprogramming in diagnostics 142 template <class _Sender> 143 struct __sync_receiver_for { 144 using __t = __receiver_t<_Sender>; 145 }; 146 template <class _Sender> 147 using __sync_receiver_for_t = __t<__sync_receiver_for<_Sender>>; 148 149 template <class _Sender> 150 struct __value_tuple_for { 151 using __t = __sync_wait_result_t<_Sender>; 152 }; 153 template <class _Sender> 154 using __value_tuple_for_t = __t<__value_tuple_for<_Sender>>; 155 156 template <class _Sender> 157 struct __variant_for { 158 using __t = __sync_wait_with_variant_result_t<_Sender>; 159 }; 160 template <class _Sender> 161 using __variant_for_t = __t<__variant_for<_Sender>>; 162 163 inline constexpr __mstring __sync_wait_context_diag = "In stdexec::sync_wait()..."_mstr; 164 inline constexpr __mstring __too_many_successful_completions_diag = 165 "The argument to stdexec::sync_wait() is a sender that can complete successfully in more " 166 "than one way. Use stdexec::sync_wait_with_variant() instead."_mstr; 167 168 template <__mstring _Context, __mstring _Diagnostic> 169 struct _INVALID_ARGUMENT_TO_SYNC_WAIT_; 170 171 template <__mstring _Diagnostic> 172 using __invalid_argument_to_sync_wait = 173 _INVALID_ARGUMENT_TO_SYNC_WAIT_<__sync_wait_context_diag, _Diagnostic>; 174 175 template <__mstring _Diagnostic, class _Sender, class _Env = __env> 176 using __sync_wait_error = __mexception< 177 __invalid_argument_to_sync_wait<_Diagnostic>, 178 _WITH_SENDER_<_Sender>, 179 _WITH_ENVIRONMENT_<_Env> 180 >; 181 182 template <class _Sender, class> 183 using __too_many_successful_completions_error = 184 __sync_wait_error<__too_many_successful_completions_diag, _Sender>; 185 186 template <class _Sender> 187 concept __valid_sync_wait_argument = __ok<__minvoke< 188 __mtry_catch_q<__single_value_variant_sender_t, __q<__too_many_successful_completions_error>>, 189 _Sender, 190 __env 191 >>; 192 193 //////////////////////////////////////////////////////////////////////////// 194 // [execution.senders.consumers.sync_wait] 195 struct sync_wait_t { 196 template <class _Sender> operator ()stdexec::__sync_wait::sync_wait_t197 auto operator()(_Sender&& __sndr) const { 198 if constexpr (!sender_in<_Sender, __env>) { 199 stdexec::__diagnose_sender_concept_failure<_Sender, __env>(); 200 } else { 201 using __early_domain_t = __early_domain_of_t<_Sender>; 202 using __domain_t = __late_domain_of_t<_Sender, __env, __early_domain_t>; 203 constexpr auto __success_completion_count = 204 __v<value_types_of_t<_Sender, __env, __types, __msize::__f>>; 205 static_assert( 206 __success_completion_count != 0, 207 "The argument to stdexec::sync_wait() is a sender that cannot complete successfully. " 208 "stdexec::sync_wait() requires a sender that can complete successfully in exactly one " 209 "way. In other words, the sender's completion signatures must include exactly one " 210 "signature of the form `set_value_t(value-types...)`."); 211 static_assert( 212 __success_completion_count <= 1, 213 "The sender passed to stdexec::sync_wait() can complete successfully in " 214 "more than one way. Use stdexec::sync_wait_with_variant() instead."); 215 if constexpr (1 == __success_completion_count) { 216 using __sync_wait_receiver = __receiver_t<_Sender>; 217 constexpr bool __no_custom_sync_wait = __same_as<__domain_t, default_domain>; 218 if constexpr (__no_custom_sync_wait && sender_to<_Sender, __sync_wait_receiver>) { 219 // using __connect_result = connect_result_t<_Sender, __sync_wait_receiver>; 220 // if constexpr (!operation_state<__connect_result>) { 221 // static_assert( 222 // operation_state<__connect_result>, 223 // "The `connect` member function of the sender passed to stdexec::sync_wait() does " 224 // "not return an operation state. An operation state is required to have a " 225 // "no-throw .start() member function."); 226 // } else 227 { 228 // success path, dispatch to the default domain's sync_wait 229 return default_domain().apply_sender(*this, static_cast<_Sender&&>(__sndr)); 230 } 231 } else if constexpr (__no_custom_sync_wait) { 232 static_assert( 233 sender_to<_Sender, __sync_wait_receiver>, 234 STDEXEC_ERROR_SYNC_WAIT_CANNOT_CONNECT_SENDER_TO_RECEIVER); 235 } else if constexpr (!__has_implementation_for<sync_wait_t, __domain_t, _Sender>) { 236 static_assert( 237 __has_implementation_for<sync_wait_t, __domain_t, _Sender>, 238 "The sender passed to stdexec::sync_wait() has a domain that does not provide a " 239 "usable implementation for sync_wait()."); 240 } else { 241 // success path, dispatch to the custom domain's sync_wait 242 return stdexec::apply_sender(__domain_t(), *this, static_cast<_Sender&&>(__sndr)); 243 } 244 } 245 } 246 } 247 248 // clang-format off 249 /// @brief Synchronously wait for the result of a sender, blocking the 250 /// current thread. 251 /// 252 /// `sync_wait` connects and starts the given sender, and then drives a 253 /// `run_loop` instance until the sender completes. Additional work 254 /// can be delegated to the `run_loop` by scheduling work on the 255 /// scheduler returned by calling `get_delegation_scheduler` on the 256 /// receiver's environment. 257 /// 258 /// @pre The sender must have a exactly one value completion signature. That 259 /// is, it can only complete successfully in one way, with a single 260 /// set of values. 261 /// 262 /// @retval success Returns an engaged `std::optional` containing the result 263 /// values in a `std::tuple`. 264 /// @retval canceled Returns an empty `std::optional`. 265 /// @retval error Throws the error. 266 /// 267 /// @throws std::rethrow_exception(error) if the error has type 268 /// `std::exception_ptr`. 269 /// @throws std::system_error(error) if the error has type 270 /// `std::error_code`. 271 /// @throws error otherwise 272 // clang-format on 273 274 template <sender_in<__env> _Sender> apply_senderstdexec::__sync_wait::sync_wait_t275 auto apply_sender(_Sender&& __sndr) const -> std::optional<__sync_wait_result_t<_Sender>> { 276 __state __local_state{}; 277 std::optional<__sync_wait_result_t<_Sender>> __result{}; 278 279 // Launch the sender with a continuation that will fill in the __result optional or set the 280 // exception_ptr in __local_state. 281 [[maybe_unused]] 282 auto __op = stdexec::connect( 283 static_cast<_Sender&&>(__sndr), __receiver_t<_Sender>{&__local_state, &__result}); 284 stdexec::start(__op); 285 286 // Wait for the variant to be filled in. 287 __local_state.__loop_.run(); 288 289 if (__local_state.__eptr_) { 290 std::rethrow_exception(static_cast<std::exception_ptr&&>(__local_state.__eptr_)); 291 } 292 293 return __result; 294 } 295 }; 296 297 //////////////////////////////////////////////////////////////////////////// 298 // [execution.senders.consumers.sync_wait_with_variant] 299 struct sync_wait_with_variant_t { 300 struct __impl; 301 302 template <sender_in<__env> _Sender> 303 requires __callable< 304 apply_sender_t, 305 __early_domain_of_t<_Sender>, 306 sync_wait_with_variant_t, 307 _Sender 308 > operator ()stdexec::__sync_wait::sync_wait_with_variant_t309 auto operator()(_Sender&& __sndr) const -> decltype(auto) { 310 using __result_t = __call_result_t< 311 apply_sender_t, 312 __early_domain_of_t<_Sender>, 313 sync_wait_with_variant_t, 314 _Sender 315 >; 316 static_assert(__is_instance_of<__result_t, std::optional>); 317 using __variant_t = __result_t::value_type; 318 static_assert(__is_instance_of<__variant_t, std::variant>); 319 320 using _Domain = __late_domain_of_t<_Sender, __env>; 321 return stdexec::apply_sender(_Domain(), *this, static_cast<_Sender&&>(__sndr)); 322 } 323 324 template <class _Sender> 325 requires __callable<sync_wait_t, __result_of<into_variant, _Sender>> apply_senderstdexec::__sync_wait::sync_wait_with_variant_t326 auto apply_sender(_Sender&& __sndr) const -> std::optional<__variant_for_t<_Sender>> { 327 if (auto __opt_values = sync_wait_t()(into_variant(static_cast<_Sender&&>(__sndr)))) { 328 return std::move(std::get<0>(*__opt_values)); 329 } 330 return std::nullopt; 331 } 332 }; 333 } // namespace __sync_wait 334 335 using __sync_wait::sync_wait_t; 336 inline constexpr sync_wait_t sync_wait{}; 337 338 using __sync_wait::sync_wait_with_variant_t; 339 inline constexpr sync_wait_with_variant_t sync_wait_with_variant{}; 340 } // namespace stdexec 341