1 /* 2 * Copyright (c) 2021-2024 NVIDIA Corporation 3 * 4 * Licensed under the Apache License Version 2.0 with LLVM Exceptions 5 * (the "License"); you may not use this file except in compliance with 6 * the License. You may obtain a copy of the License at 7 * 8 * https://llvm.org/LICENSE.txt 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 #pragma once 17 18 #include "../stdexec/execution.hpp" 19 #include "../stdexec/stop_token.hpp" 20 #include "../stdexec/__detail/__intrusive_queue.hpp" 21 #include "../stdexec/__detail/__optional.hpp" 22 #include "env.hpp" 23 24 #include <atomic> 25 #include <mutex> 26 27 namespace exec { 28 ///////////////////////////////////////////////////////////////////////////// 29 // async_scope 30 namespace __scope { 31 using namespace stdexec; 32 33 struct __impl; 34 struct async_scope; 35 36 template <class _A> 37 concept __async_scope = requires(_A& __a) { 38 { __a.nest(stdexec::just()) } -> sender_of<stdexec::set_value_t()>; 39 }; 40 41 struct __task : __immovable { 42 const __impl* __scope_; 43 void (*__notify_waiter)(__task*) noexcept; 44 __task* __next_ = nullptr; 45 }; 46 47 template <class _BaseEnv> 48 using __env_t = make_env_t<_BaseEnv, prop<get_stop_token_t, inplace_stop_token>>; 49 50 struct __impl { 51 inplace_stop_source __stop_source_{}; 52 mutable std::mutex __lock_{}; 53 mutable std::atomic_ptrdiff_t __active_ = 0; 54 mutable __intrusive_queue<&__task::__next_> __waiters_{}; 55 ~__implexec::__scope::__impl56 ~__impl() { 57 std::unique_lock __guard{__lock_}; 58 STDEXEC_ASSERT(__active_ == 0); 59 STDEXEC_ASSERT(__waiters_.empty()); 60 } 61 }; 62 63 //////////////////////////////////////////////////////////////////////////// 64 // async_scope::when_empty implementation 65 template <class _ConstrainedId, class _ReceiverId> 66 struct __when_empty_op { 67 using _Constrained = __cvref_t<_ConstrainedId>; 68 using _Receiver = stdexec::__t<_ReceiverId>; 69 70 struct __t : __task { 71 using __id = __when_empty_op; 72 __texec::__scope::__when_empty_op::__t73 explicit __t(const __impl* __scope, _Constrained&& __sndr, _Receiver __rcvr) 74 : __task{{}, __scope, __notify_waiter} 75 , __op_( 76 stdexec::connect( 77 static_cast<_Constrained&&>(__sndr), 78 static_cast<_Receiver&&>(__rcvr))) { 79 } 80 startexec::__scope::__when_empty_op::__t81 void start() & noexcept { 82 // must get lock before checking __active, or if the __active is drained before 83 // the waiter is queued but after __active is checked, the waiter will never be notified 84 std::unique_lock __guard{this->__scope_->__lock_}; 85 auto& __active = this->__scope_->__active_; 86 auto& __waiters = this->__scope_->__waiters_; 87 if (__active.load(std::memory_order_acquire) != 0) { 88 __waiters.push_back(this); 89 return; 90 } 91 __guard.unlock(); 92 stdexec::start(this->__op_); 93 } 94 95 private: __notify_waiterexec::__scope::__when_empty_op::__t96 static void __notify_waiter(__task* __self) noexcept { 97 stdexec::start(static_cast<__t*>(__self)->__op_); 98 } 99 100 STDEXEC_IMMOVABLE_NO_UNIQUE_ADDRESS 101 connect_result_t<_Constrained, _Receiver> __op_; 102 }; 103 }; 104 105 template <class _ConstrainedId> 106 struct __when_empty_sender { 107 using _Constrained = stdexec::__t<_ConstrainedId>; 108 109 struct __t { 110 using __id = __when_empty_sender; 111 using sender_concept = stdexec::sender_t; 112 113 template <class _Self, class _Receiver> 114 using __when_empty_op_t = 115 stdexec::__t<__when_empty_op<__cvref_id<_Self, _Constrained>, stdexec::__id<_Receiver>>>; 116 117 template <__decays_to<__t> _Self, receiver _Receiver> 118 requires sender_to<__copy_cvref_t<_Self, _Constrained>, _Receiver> 119 [[nodiscard]] 120 static auto connectexec::__scope::__when_empty_sender::__t121 connect(_Self&& __self, _Receiver __rcvr) -> __when_empty_op_t<_Self, _Receiver> { 122 return __when_empty_op_t<_Self, _Receiver>{ 123 __self.__scope_, static_cast<_Self&&>(__self).__c_, static_cast<_Receiver&&>(__rcvr)}; 124 } 125 126 template <__decays_to<__t> _Self, class... _Env> get_completion_signaturesexec::__scope::__when_empty_sender::__t127 static auto get_completion_signatures(_Self&&, _Env&&...) 128 -> __completion_signatures_of_t<__copy_cvref_t<_Self, _Constrained>, __env_t<_Env>...> { 129 return {}; 130 } 131 132 const __impl* __scope_; 133 STDEXEC_ATTRIBUTE(no_unique_address) _Constrained __c_; 134 }; 135 }; 136 137 template <class _Constrained> 138 using __when_empty_sender_t = stdexec::__t<__when_empty_sender<__id<__decay_t<_Constrained>>>>; 139 140 //////////////////////////////////////////////////////////////////////////// 141 // async_scope::nest implementation 142 template <class _ReceiverId> 143 struct __nest_op_base : __immovable { 144 using _Receiver = stdexec::__t<_ReceiverId>; 145 const __impl* __scope_; 146 STDEXEC_ATTRIBUTE(no_unique_address) _Receiver __rcvr_; 147 }; 148 149 template <class _ReceiverId> 150 struct __nest_rcvr { 151 using _Receiver = stdexec::__t<_ReceiverId>; 152 153 struct __t { 154 using __id = __nest_rcvr; 155 using receiver_concept = stdexec::receiver_t; 156 __nest_op_base<_ReceiverId>* __op_; 157 __completeexec::__scope::__nest_rcvr::__t158 static void __complete(const __impl* __scope) noexcept { 159 auto& __active = __scope->__active_; 160 if (__active.fetch_sub(1, std::memory_order_acq_rel) == 1) { 161 std::unique_lock __guard{__scope->__lock_}; 162 auto __local_waiters = std::move(__scope->__waiters_); 163 __guard.unlock(); 164 __scope = nullptr; 165 // do not access __scope 166 while (!__local_waiters.empty()) { 167 auto* __next = __local_waiters.pop_front(); 168 __next->__notify_waiter(__next); 169 // __scope must be considered deleted 170 } 171 } 172 } 173 174 template <class... _As> 175 requires __callable<set_value_t, _Receiver, _As...> set_valueexec::__scope::__nest_rcvr::__t176 void set_value(_As&&... __as) noexcept { 177 auto __scope = __op_->__scope_; 178 stdexec::set_value(std::move(__op_->__rcvr_), static_cast<_As&&>(__as)...); 179 // do not access __op_ 180 // do not access this 181 __complete(__scope); 182 } 183 184 template <class _Error> 185 requires __callable<set_error_t, _Receiver, _Error> set_errorexec::__scope::__nest_rcvr::__t186 void set_error(_Error&& __err) noexcept { 187 auto __scope = __op_->__scope_; 188 stdexec::set_error(std::move(__op_->__rcvr_), static_cast<_Error&&>(__err)); 189 // do not access __op_ 190 // do not access this 191 __complete(__scope); 192 } 193 set_stoppedexec::__scope::__nest_rcvr::__t194 void set_stopped() noexcept 195 requires __callable<set_stopped_t, _Receiver> 196 { 197 auto __scope = __op_->__scope_; 198 stdexec::set_stopped(std::move(__op_->__rcvr_)); 199 // do not access __op_ 200 // do not access this 201 __complete(__scope); 202 } 203 get_envexec::__scope::__nest_rcvr::__t204 auto get_env() const noexcept -> __env_t<env_of_t<_Receiver>> { 205 return make_env( 206 stdexec::get_env(__op_->__rcvr_), 207 stdexec::prop{get_stop_token, __op_->__scope_->__stop_source_.get_token()}); 208 } 209 }; 210 }; 211 212 template <class _ConstrainedId, class _ReceiverId> 213 struct __nest_op { 214 using _Constrained = stdexec::__t<_ConstrainedId>; 215 using _Receiver = stdexec::__t<_ReceiverId>; 216 217 struct __t : __nest_op_base<_ReceiverId> { 218 using __id = __nest_op; 219 using __nest_rcvr_t = stdexec::__t<__nest_rcvr<_ReceiverId>>; 220 STDEXEC_IMMOVABLE_NO_UNIQUE_ADDRESS 221 connect_result_t<_Constrained, __nest_rcvr_t> __op_; 222 223 template <__decays_to<_Constrained> _Sender, __decays_to<_Receiver> _Rcvr> __texec::__scope::__nest_op::__t224 explicit __t(const __impl* __scope, _Sender&& __c, _Rcvr&& __rcvr) 225 : __nest_op_base<_ReceiverId>{{}, __scope, static_cast<_Rcvr&&>(__rcvr)} 226 , __op_(stdexec::connect(static_cast<_Sender&&>(__c), __nest_rcvr_t{this})) { 227 } 228 startexec::__scope::__nest_op::__t229 void start() & noexcept { 230 STDEXEC_ASSERT(this->__scope_); 231 auto& __active = this->__scope_->__active_; 232 __active.fetch_add(1, std::memory_order_relaxed); 233 stdexec::start(__op_); 234 } 235 }; 236 }; 237 238 template <class _ConstrainedId> 239 struct __nest_sender { 240 using _Constrained = stdexec::__t<_ConstrainedId>; 241 242 struct __t { 243 using __id = __nest_sender; 244 using sender_concept = stdexec::sender_t; 245 246 const __impl* __scope_; 247 STDEXEC_ATTRIBUTE(no_unique_address) _Constrained __c_; 248 249 template <class _Receiver> 250 using __nest_operation_t = 251 stdexec::__t<__nest_op<_ConstrainedId, stdexec::__id<_Receiver>>>; 252 253 template <class _Receiver> 254 using __nest_receiver_t = stdexec::__t<__nest_rcvr<stdexec::__id<_Receiver>>>; 255 256 template <__decays_to<__t> _Self, receiver _Receiver> 257 requires sender_to<__copy_cvref_t<_Self, _Constrained>, __nest_receiver_t<_Receiver>> 258 [[nodiscard]] connectexec::__scope::__nest_sender::__t259 static auto connect(_Self&& __self, _Receiver __rcvr) -> __nest_operation_t<_Receiver> { 260 return __nest_operation_t<_Receiver>{ 261 __self.__scope_, static_cast<_Self&&>(__self).__c_, static_cast<_Receiver&&>(__rcvr)}; 262 } 263 264 template <__decays_to<__t> _Self, class... _Env> get_completion_signaturesexec::__scope::__nest_sender::__t265 static auto get_completion_signatures(_Self&&, _Env&&...) 266 -> __completion_signatures_of_t<__copy_cvref_t<_Self, _Constrained>, __env_t<_Env>...> { 267 return {}; 268 } 269 }; 270 }; 271 272 template <class _Constrained> 273 using __nest_sender_t = stdexec::__t<__nest_sender<__id<__decay_t<_Constrained>>>>; 274 275 //////////////////////////////////////////////////////////////////////////// 276 // async_scope::spawn_future implementation 277 enum class __future_step { 278 __invalid = 0, 279 __created, 280 __future, 281 __no_future, 282 __deleted 283 }; 284 285 template <class _Sender, class _Env> 286 struct __future_state; 287 288 struct __forward_stopped { 289 inplace_stop_source* __stop_source_; 290 operator ()exec::__scope::__forward_stopped291 void operator()() noexcept { 292 __stop_source_->request_stop(); 293 } 294 }; 295 296 struct __subscription : __immovable { 297 void (*__complete_)(__subscription*) noexcept = nullptr; 298 __completeexec::__scope::__subscription299 void __complete() noexcept { 300 __complete_(this); 301 } 302 303 __subscription* __next_ = nullptr; 304 }; 305 306 template <class _SenderId, class _EnvId, class _ReceiverId> 307 struct __future_op { 308 using _Sender = stdexec::__t<_SenderId>; 309 using _Env = stdexec::__t<_EnvId>; 310 using _Receiver = stdexec::__t<_ReceiverId>; 311 312 class __t : __subscription { 313 using __forward_consumer = 314 stop_token_of_t<env_of_t<_Receiver>>::template callback_type<__forward_stopped>; 315 __complete_()316 void __complete_() noexcept { 317 STDEXEC_TRY { 318 __forward_consumer_.reset(); 319 auto __state = std::move(__state_); 320 STDEXEC_ASSERT(__state != nullptr); 321 std::unique_lock __guard{__state->__mutex_}; 322 // either the future is still in use or it has passed ownership to __state->__no_future_ 323 if ( 324 __state->__no_future_.get() != nullptr 325 || __state->__step_ != __future_step::__future) { 326 // invalid state - there is a code bug in the state machine 327 std::terminate(); 328 } else if (get_stop_token(get_env(__rcvr_)).stop_requested()) { 329 330 __guard.unlock(); 331 stdexec::set_stopped(static_cast<_Receiver&&>(__rcvr_)); 332 __guard.lock(); 333 } else { 334 std::visit( 335 [this, &__guard]<class _Tup>(_Tup& __tup) { 336 if constexpr (same_as<_Tup, std::monostate>) { 337 std::terminate(); 338 } else { 339 std::apply( 340 [this, &__guard]<class... _As>(auto tag, _As&... __as) { 341 __guard.unlock(); 342 tag(static_cast<_Receiver&&>(__rcvr_), static_cast<_As&&>(__as)...); 343 __guard.lock(); 344 }, 345 __tup); 346 } 347 }, 348 __state->__data_); 349 } 350 } 351 STDEXEC_CATCH_ALL { 352 353 stdexec::set_error(static_cast<_Receiver&&>(__rcvr_), std::current_exception()); 354 } 355 } 356 357 STDEXEC_ATTRIBUTE(no_unique_address) _Receiver __rcvr_; 358 std::unique_ptr<__future_state<_Sender, _Env>> __state_; 359 STDEXEC_ATTRIBUTE(no_unique_address) 360 stdexec::__optional<__forward_consumer> __forward_consumer_; 361 362 public: 363 using __id = __future_op; 364 ~__t()365 ~__t() noexcept { 366 if (__state_ != nullptr) { 367 auto __raw_state = __state_.get(); 368 std::unique_lock __guard{__raw_state->__mutex_}; 369 if (__raw_state->__data_.index() > 0) { 370 // completed given sender 371 // state is no longer needed 372 return; 373 } 374 __raw_state->__no_future_ = std::move(__state_); 375 __raw_state 376 ->__step_from_to_(__guard, __future_step::__future, __future_step::__no_future); 377 } 378 } 379 380 template <class _Receiver2> __t(_Receiver2 && __rcvr,std::unique_ptr<__future_state<_Sender,_Env>> __state)381 explicit __t( 382 _Receiver2&& __rcvr, std::unique_ptr<__future_state<_Sender, _Env>> __state) 383 : __subscription{{}, 384 [](__subscription* __self) noexcept -> void { 385 static_cast<__t*>(__self)->__complete_(); 386 }} 387 , __rcvr_(static_cast<_Receiver2&&>(__rcvr)) 388 , __state_(std::move(__state)) 389 , __forward_consumer_(std::in_place, get_stop_token(get_env(__rcvr_)), 390 __forward_stopped{&__state_->__stop_source_}) { 391 } 392 start()393 void start() & noexcept { 394 STDEXEC_TRY { 395 if (!!__state_) { 396 std::unique_lock __guard{__state_->__mutex_}; 397 if (__state_->__data_.index() != 0) { 398 __guard.unlock(); 399 __complete_(); 400 } else { 401 __state_->__subscribers_.push_back(this); 402 } 403 } 404 } 405 STDEXEC_CATCH_ALL { 406 stdexec::set_error(static_cast<_Receiver&&>(__rcvr_), std::current_exception()); 407 } 408 } 409 }; 410 }; 411 412 #if STDEXEC_EDG() 413 template <class _Fn> 414 struct __completion_as_tuple2_; 415 416 template <class _Tag, class... _Ts> 417 struct __completion_as_tuple2_<_Tag(_Ts...)> { 418 using __t = std::tuple<_Tag, _Ts...>; 419 }; 420 template <class _Fn> 421 using __completion_as_tuple_t = stdexec::__t<__completion_as_tuple2_<_Fn>>; 422 423 #else 424 425 template <class _Tag, class... _Ts> 426 auto __completion_as_tuple_(_Tag (*)(_Ts...)) -> std::tuple<_Tag, _Ts...>; 427 428 template <class _Fn> 429 using __completion_as_tuple_t = decltype(__scope::__completion_as_tuple_( 430 static_cast<_Fn*>(nullptr))); 431 #endif 432 433 template <class... _Ts> 434 using __decay_values_t = completion_signatures<set_value_t(__decay_t<_Ts>...)>; 435 436 template <class _Ty> 437 using __decay_error_t = completion_signatures<set_error_t(__decay_t<_Ty>)>; 438 439 template <class _Sender, class _Env> 440 using __future_completions_t = transform_completion_signatures_of< 441 _Sender, 442 __env_t<_Env>, 443 completion_signatures<set_stopped_t(), set_error_t(std::exception_ptr)>, 444 __decay_values_t, 445 __decay_error_t 446 >; 447 448 template <class _Completions> 449 using __completions_as_variant = __mapply< 450 __mtransform<__q<__completion_as_tuple_t>, __mbind_front_q<std::variant, std::monostate>>, 451 _Completions 452 >; 453 454 template <class _Ty> 455 struct __dynamic_delete { __dynamic_deleteexec::__scope::__dynamic_delete456 __dynamic_delete() 457 : __delete_([](_Ty* __p) { delete __p; }) { 458 } 459 460 template <class _Uy> 461 requires convertible_to<_Uy*, _Ty*> __dynamic_deleteexec::__scope::__dynamic_delete462 __dynamic_delete(std::default_delete<_Uy>) 463 : __delete_([](_Ty* __p) { delete static_cast<_Uy*>(__p); }) { 464 } 465 466 template <class _Uy> 467 requires convertible_to<_Uy*, _Ty*> operator =exec::__scope::__dynamic_delete468 auto operator=(std::default_delete<_Uy> __d) -> __dynamic_delete& { 469 __delete_ = __dynamic_delete{__d}.__delete_; 470 return *this; 471 } 472 operator ()exec::__scope::__dynamic_delete473 void operator()(_Ty* __p) { 474 __delete_(__p); 475 } 476 477 void (*__delete_)(_Ty*); 478 }; 479 480 template <class _Completions, class _Env> 481 struct __future_state_base { __future_state_baseexec::__scope::__future_state_base482 __future_state_base(_Env __env, const __impl* __scope) 483 : __forward_scope_{std::in_place, __scope->__stop_source_.get_token(), __forward_stopped{&__stop_source_}} 484 , __env_(make_env( 485 static_cast<_Env&&>(__env), 486 stdexec::prop{get_stop_token, __scope->__stop_source_.get_token()})) { 487 } 488 ~__future_state_baseexec::__scope::__future_state_base489 ~__future_state_base() { 490 std::unique_lock __guard{__mutex_}; 491 if (__step_ == __future_step::__created) { 492 // exception during connect() will end up here 493 __step_from_to_(__guard, __future_step::__created, __future_step::__deleted); 494 } else if (__step_ != __future_step::__deleted) { 495 // completing the given sender before the future is dropped will end here 496 __step_from_to_(__guard, __future_step::__future, __future_step::__deleted); 497 } 498 } 499 __step_from_to_exec::__scope::__future_state_base500 void __step_from_to_( 501 std::unique_lock<std::mutex>& __guard, 502 __future_step __from, 503 __future_step __to) { 504 STDEXEC_ASSERT(__guard.owns_lock()); 505 auto actual = std::exchange(__step_, __to); 506 STDEXEC_ASSERT(actual == __from); 507 } 508 509 inplace_stop_source __stop_source_; 510 stdexec::__optional<inplace_stop_callback<__forward_stopped>> __forward_scope_; 511 std::mutex __mutex_; 512 __future_step __step_ = __future_step::__created; 513 std::unique_ptr<__future_state_base, __dynamic_delete<__future_state_base>> __no_future_; 514 __completions_as_variant<_Completions> __data_; 515 __intrusive_queue<&__subscription::__next_> __subscribers_; 516 __env_t<_Env> __env_; 517 }; 518 519 template <class _Completions, class _EnvId> 520 struct __future_rcvr { 521 using _Env = stdexec::__t<_EnvId>; 522 523 struct __t { 524 using __id = __future_rcvr; 525 using receiver_concept = stdexec::receiver_t; 526 __future_state_base<_Completions, _Env>* __state_; 527 const __impl* __scope_; 528 __dispatch_result_exec::__scope::__future_rcvr::__t529 void __dispatch_result_(std::unique_lock<std::mutex>& __guard) noexcept { 530 auto& __state = *__state_; 531 auto __local_subscribers = std::move(__state.__subscribers_); 532 __state.__forward_scope_.reset(); 533 if (__state.__no_future_.get() != nullptr) { 534 // nobody is waiting for the results 535 // delete this and return 536 __state.__step_from_to_(__guard, __future_step::__no_future, __future_step::__deleted); 537 __guard.unlock(); 538 __state.__no_future_.reset(); 539 return; 540 } 541 __guard.unlock(); 542 while (!__local_subscribers.empty()) { 543 auto* __sub = __local_subscribers.pop_front(); 544 __sub->__complete(); 545 } 546 } 547 548 template <class _Tag, class... _As> __save_completionexec::__scope::__future_rcvr::__t549 void __save_completion(_Tag, _As&&... __as) noexcept { 550 auto& __state = *__state_; 551 STDEXEC_TRY { 552 using _Tuple = __decayed_std_tuple<_Tag, _As...>; 553 __state.__data_.template emplace<_Tuple>(_Tag(), static_cast<_As&&>(__as)...); 554 } 555 STDEXEC_CATCH_ALL { 556 using _Tuple = std::tuple<set_error_t, std::exception_ptr>; 557 __state.__data_.template emplace<_Tuple>(set_error_t(), std::current_exception()); 558 } 559 } 560 561 template <__movable_value... _As> set_valueexec::__scope::__future_rcvr::__t562 void set_value(_As&&... __as) noexcept { 563 auto& __state = *__state_; 564 std::unique_lock __guard{__state.__mutex_}; 565 __save_completion(set_value_t(), static_cast<_As&&>(__as)...); 566 __dispatch_result_(__guard); 567 } 568 569 template <__movable_value _Error> set_errorexec::__scope::__future_rcvr::__t570 void set_error(_Error&& __err) noexcept { 571 auto& __state = *__state_; 572 std::unique_lock __guard{__state.__mutex_}; 573 __save_completion(set_error_t(), static_cast<_Error&&>(__err)); 574 __dispatch_result_(__guard); 575 } 576 set_stoppedexec::__scope::__future_rcvr::__t577 void set_stopped() noexcept { 578 auto& __state = *__state_; 579 std::unique_lock __guard{__state.__mutex_}; 580 __save_completion(set_stopped_t()); 581 __dispatch_result_(__guard); 582 } 583 get_envexec::__scope::__future_rcvr::__t584 auto get_env() const noexcept -> const __env_t<_Env>& { 585 return __state_->__env_; 586 } 587 }; 588 }; 589 590 template <class _Sender, class _Env> 591 using __future_receiver_t = 592 __t<__future_rcvr<__future_completions_t<_Sender, _Env>, __id<_Env>>>; 593 594 template <class _Sender, class _Env> 595 struct __future_state : __future_state_base<__future_completions_t<_Sender, _Env>, _Env> { 596 using _Completions = __future_completions_t<_Sender, _Env>; 597 __future_stateexec::__scope::__future_state598 __future_state(connect_t, _Sender&& __sndr, _Env __env, const __impl* __scope) 599 : __future_state_base<_Completions, _Env>(static_cast<_Env&&>(__env), __scope) 600 , __op_(static_cast<_Sender&&>(__sndr), __future_receiver_t<_Sender, _Env>{this, __scope}) { 601 } 602 __future_stateexec::__scope::__future_state603 __future_state(_Sender __sndr, _Env __env, const __impl* __scope) 604 : __future_state( 605 stdexec::connect, 606 static_cast<_Sender&&>(__sndr), 607 static_cast<_Env&&>(__env), 608 __scope) { 609 // If the operation completes synchronously, then the following line will cause 610 // the destruction of *this, which is not a problem because we used a delegating 611 // constructor, so *this is considered fully constructed. 612 __op_.submit( 613 static_cast<_Sender&&>(__sndr), __future_receiver_t<_Sender, _Env>{this, __scope}); 614 } 615 STDEXEC_ATTRIBUTEexec::__scope::__future_state616 STDEXEC_ATTRIBUTE(no_unique_address) 617 submit_result<_Sender, __future_receiver_t<_Sender, _Env>> __op_{}; 618 }; 619 620 template <class _SenderId, class _EnvId> 621 struct __future { 622 using _Sender = stdexec::__t<_SenderId>; 623 using _Env = stdexec::__t<_EnvId>; 624 625 class __t { 626 template <class _Self> 627 using __completions_t = __future_completions_t<__mfront<_Sender, _Self>, _Env>; 628 629 template <class _Receiver> 630 using __future_op_t = 631 stdexec::__t<__future_op<_SenderId, _EnvId, stdexec::__id<_Receiver>>>; 632 633 public: 634 using __id = __future; 635 using sender_concept = stdexec::sender_t; 636 637 __t(__t&&) = default; 638 auto operator=(__t&&) -> __t& = default; 639 ~__t()640 ~__t() noexcept { 641 if (__state_ != nullptr) { 642 auto __raw_state = __state_.get(); 643 std::unique_lock __guard{__raw_state->__mutex_}; 644 if (__raw_state->__data_.index() != 0) { 645 // completed given sender 646 // state is no longer needed 647 return; 648 } 649 __raw_state->__no_future_ = std::move(__state_); 650 __raw_state 651 ->__step_from_to_(__guard, __future_step::__future, __future_step::__no_future); 652 } 653 } 654 655 template <__decays_to<__t> _Self, receiver _Receiver> 656 requires receiver_of<_Receiver, __completions_t<_Self>> connect(_Self && __self,_Receiver __rcvr)657 static auto connect(_Self&& __self, _Receiver __rcvr) -> __future_op_t<_Receiver> { 658 return __future_op_t<_Receiver>{ 659 static_cast<_Receiver&&>(__rcvr), static_cast<_Self&&>(__self).__state_}; 660 } 661 662 template <__decays_to<__t> _Self, class... _OtherEnv> get_completion_signatures(_Self &&,_OtherEnv &&...)663 static auto get_completion_signatures(_Self&&, _OtherEnv&&...) -> __completions_t<_Self> { 664 return {}; 665 } 666 667 private: 668 friend struct async_scope; 669 __t(std::unique_ptr<__future_state<_Sender,_Env>> __state)670 explicit __t(std::unique_ptr<__future_state<_Sender, _Env>> __state) noexcept 671 : __state_(std::move(__state)) { 672 std::unique_lock __guard{__state_->__mutex_}; 673 __state_->__step_from_to_(__guard, __future_step::__created, __future_step::__future); 674 } 675 676 std::unique_ptr<__future_state<_Sender, _Env>> __state_; 677 }; 678 }; 679 680 template <class _Sender, class _Env> 681 using __future_t = 682 stdexec::__t<__future<__id<__nest_sender_t<_Sender>>, __id<__decay_t<_Env>>>>; 683 684 //////////////////////////////////////////////////////////////////////////// 685 // async_scope::spawn implementation 686 struct __spawn_env_ { 687 inplace_stop_token __token_; 688 689 [[nodiscard]] queryexec::__scope::__spawn_env_690 auto query(get_stop_token_t) const noexcept -> inplace_stop_token { 691 return __token_; 692 } 693 694 [[nodiscard]] queryexec::__scope::__spawn_env_695 auto query(get_scheduler_t) const noexcept -> stdexec::inline_scheduler { 696 return {}; 697 } 698 }; 699 700 template <class _Env> 701 using __spawn_env_t = __join_env_t<_Env, __spawn_env_>; 702 703 template <class _EnvId> 704 struct __spawn_op_base { 705 using _Env = stdexec::__t<_EnvId>; 706 __spawn_env_t<_Env> __env_; 707 void (*__delete_)(__spawn_op_base*); 708 }; 709 710 template <class _EnvId> 711 struct __spawn_rcvr { 712 using _Env = stdexec::__t<_EnvId>; 713 714 struct __t { 715 using __id = __spawn_rcvr; 716 using receiver_concept = stdexec::receiver_t; 717 __spawn_op_base<_EnvId>* __op_; 718 set_valueexec::__scope::__spawn_rcvr::__t719 void set_value() noexcept { 720 __op_->__delete_(__op_); 721 } 722 723 // BUGBUG NOT TO SPEC spawn shouldn't accept senders that can fail. 724 [[noreturn]] set_errorexec::__scope::__spawn_rcvr::__t725 void set_error(std::exception_ptr __eptr) noexcept { 726 std::rethrow_exception(std::move(__eptr)); 727 } 728 set_stoppedexec::__scope::__spawn_rcvr::__t729 void set_stopped() noexcept { 730 __op_->__delete_(__op_); 731 } 732 get_envexec::__scope::__spawn_rcvr::__t733 auto get_env() const noexcept -> const __spawn_env_t<_Env>& { 734 return __op_->__env_; 735 } 736 }; 737 }; 738 739 template <class _Env> 740 using __spawn_receiver_t = stdexec::__t<__spawn_rcvr<__id<_Env>>>; 741 742 template <class _SenderId, class _EnvId> 743 struct __spawn_op { 744 using _Env = stdexec::__t<_EnvId>; 745 using _Sender = stdexec::__t<_SenderId>; 746 747 struct __t : __spawn_op_base<_EnvId> { __texec::__scope::__spawn_op::__t748 __t(connect_t, _Sender&& __sndr, _Env __env, const __impl* __scope) 749 : __spawn_op_base< 750 _EnvId 751 >{__env::__join( 752 static_cast<_Env&&>(__env), 753 __spawn_env_{__scope->__stop_source_.get_token()}), 754 [](__spawn_op_base<_EnvId>* __op) { delete static_cast<__t*>(__op); }} 755 , __data_(static_cast<_Sender&&>(__sndr), __spawn_receiver_t<_Env>{this}) { 756 } 757 __texec::__scope::__spawn_op::__t758 __t(_Sender __sndr, _Env __env, const __impl* __scope) 759 : __t( 760 stdexec::connect, 761 static_cast<_Sender&&>(__sndr), 762 static_cast<_Env&&>(__env), 763 __scope) { 764 // If the operation completes synchronously, then the following line will cause 765 // the destruction of *this, which is not a problem because we used a delegating 766 // constructor, so *this is considered fully constructed. 767 __data_.submit(static_cast<_Sender&&>(__sndr), __spawn_receiver_t<_Env>{this}); 768 } 769 770 STDEXEC_ATTRIBUTE(no_unique_address) 771 submit_result<_Sender, __spawn_receiver_t<_Env>> __data_; 772 }; 773 }; 774 775 template <class _Sender, class _Env> 776 using __spawn_operation_t = stdexec::__t<__spawn_op<__id<_Sender>, __id<_Env>>>; 777 778 //////////////////////////////////////////////////////////////////////////// 779 // async_scope 780 struct async_scope : __immovable { 781 async_scope() = default; 782 783 template <sender _Constrained> 784 [[nodiscard]] when_emptyexec::__scope::async_scope785 auto when_empty(_Constrained&& __c) const -> __when_empty_sender_t<_Constrained> { 786 return __when_empty_sender_t<_Constrained>{&__impl_, static_cast<_Constrained&&>(__c)}; 787 } 788 789 [[nodiscard]] on_emptyexec::__scope::async_scope790 auto on_empty() const { 791 return when_empty(just()); 792 } 793 794 template <sender _Constrained> 795 using nest_result_t = __nest_sender_t<_Constrained>; 796 797 template <sender _Constrained> 798 [[nodiscard]] nestexec::__scope::async_scope799 auto nest(_Constrained&& __c) -> nest_result_t<_Constrained> { 800 return nest_result_t<_Constrained>{&__impl_, static_cast<_Constrained&&>(__c)}; 801 } 802 803 template <__movable_value _Env = env<>, sender_in<__spawn_env_t<_Env>> _Sender> 804 requires sender_to<nest_result_t<_Sender>, __spawn_receiver_t<_Env>> spawnexec::__scope::async_scope805 void spawn(_Sender&& __sndr, _Env __env = {}) { 806 using __op_t = __spawn_operation_t<nest_result_t<_Sender>, _Env>; 807 // this will connect and start the operation, after which the operation state is 808 // responsible for deleting itself after it completes. 809 [[maybe_unused]] 810 auto* __op = 811 new __op_t{nest(static_cast<_Sender&&>(__sndr)), static_cast<_Env&&>(__env), &__impl_}; 812 } 813 814 template <__movable_value _Env = env<>, sender_in<__env_t<_Env>> _Sender> spawn_futureexec::__scope::async_scope815 auto spawn_future(_Sender&& __sndr, _Env __env = {}) -> __future_t<_Sender, _Env> { 816 using __state_t = __future_state<nest_result_t<_Sender>, _Env>; 817 auto __state = std::make_unique<__state_t>( 818 nest(static_cast<_Sender&&>(__sndr)), static_cast<_Env&&>(__env), &__impl_); 819 return __future_t<_Sender, _Env>{std::move(__state)}; 820 } 821 get_stop_sourceexec::__scope::async_scope822 auto get_stop_source() noexcept -> inplace_stop_source& { 823 return __impl_.__stop_source_; 824 } 825 get_stop_tokenexec::__scope::async_scope826 auto get_stop_token() const noexcept -> inplace_stop_token { 827 return __impl_.__stop_source_.get_token(); 828 } 829 request_stopexec::__scope::async_scope830 auto request_stop() noexcept -> bool { 831 return __impl_.__stop_source_.request_stop(); 832 } 833 834 private: 835 __impl __impl_; 836 }; 837 } // namespace __scope 838 839 using __scope::async_scope; 840 841 template <class _AsyncScope, class _Sender> 842 using nest_result_t = decltype(stdexec::__declval<_AsyncScope&>() 843 .nest(stdexec::__declval<_Sender&&>())); 844 } // namespace exec 845