1 /* 2 * Copyright (c) 2021-2022 NVIDIA Corporation 3 * 4 * Licensed under the Apache License Version 2.0 with LLVM Exceptions 5 * (the "License"); you may not use this file except in compliance with 6 * the License. You may obtain a copy of the License at 7 * 8 * https://llvm.org/LICENSE.txt 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 #pragma once 17 18 #include "../stdexec/__detail/__intrusive_queue.hpp" 19 #include "../stdexec/execution.hpp" 20 #include "../stdexec/stop_token.hpp" 21 #include "env.hpp" 22 23 #include <mutex> 24 25 namespace exec 26 { 27 ///////////////////////////////////////////////////////////////////////////// 28 // async_scope 29 namespace __scope 30 { 31 using namespace stdexec; 32 33 struct __impl; 34 struct async_scope; 35 36 struct __task : __immovable 37 { 38 const __impl* __scope_; 39 void (*__notify_waiter)(__task*) noexcept; 40 __task* __next_ = nullptr; 41 }; 42 43 template <class _BaseEnv> 44 using __env_t = 45 make_env_t<_BaseEnv, with_t<get_stop_token_t, in_place_stop_token>>; 46 47 struct __impl 48 { 49 in_place_stop_source __stop_source_{}; 50 mutable std::mutex __lock_{}; 51 mutable std::ptrdiff_t __active_ = 0; 52 mutable __intrusive_queue<&__task::__next_> __waiters_{}; 53 ~__implexec::__scope::__impl54 ~__impl() 55 { 56 std::unique_lock __guard{__lock_}; 57 STDEXEC_ASSERT(__active_ == 0); 58 STDEXEC_ASSERT(__waiters_.empty()); 59 } 60 }; 61 62 //////////////////////////////////////////////////////////////////////////// 63 // async_scope::when_empty implementation 64 template <class _ConstrainedId, class _ReceiverId> 65 struct __when_empty_op 66 { 67 using _Constrained = __cvref_t<_ConstrainedId>; 68 using _Receiver = stdexec::__t<_ReceiverId>; 69 70 struct __t : __task 71 { 72 using __id = __when_empty_op; 73 __texec::__scope::__when_empty_op::__t74 explicit __t(const __impl* __scope, _Constrained&& __sndr, 75 _Receiver __rcvr) : 76 __task{{}, __scope, __notify_waiter}, 77 __op_(stdexec::connect(static_cast<_Constrained&&>(__sndr), 78 static_cast<_Receiver&&>(__rcvr))) 79 {} 80 81 private: __notify_waiterexec::__scope::__when_empty_op::__t82 static void __notify_waiter(__task* __self) noexcept 83 { 84 start(static_cast<__t*>(__self)->__op_); 85 } 86 __start_exec::__scope::__when_empty_op::__t87 void __start_() noexcept 88 { 89 std::unique_lock __guard{this->__scope_->__lock_}; 90 auto& __active = this->__scope_->__active_; 91 auto& __waiters = this->__scope_->__waiters_; 92 if (__active != 0) 93 { 94 __waiters.push_back(this); 95 return; 96 } 97 __guard.unlock(); 98 start(this->__op_); 99 } 100 tag_invokeexec::__scope::__when_empty_op101 friend void tag_invoke(start_t, __t& __self) noexcept 102 { 103 return __self.__start_(); 104 } 105 106 STDEXEC_IMMOVABLE_NO_UNIQUE_ADDRESS 107 connect_result_t<_Constrained, _Receiver> __op_; 108 }; 109 }; 110 111 template <class _ConstrainedId> 112 struct __when_empty_sender 113 { 114 using _Constrained = stdexec::__t<_ConstrainedId>; 115 116 struct __t 117 { 118 using __id = __when_empty_sender; 119 using sender_concept = stdexec::sender_t; 120 121 template <class _Self, class _Receiver> 122 using __when_empty_op_t = 123 stdexec::__t<__when_empty_op<__cvref_id<_Self, _Constrained>, 124 stdexec::__id<_Receiver>>>; 125 126 template <__decays_to<__t> _Self, receiver _Receiver> 127 requires sender_to<__copy_cvref_t<_Self, _Constrained>, _Receiver> tag_invokeexec::__scope::__when_empty_sender128 [[nodiscard]] friend auto tag_invoke(connect_t, _Self&& __self, 129 _Receiver __rcvr) 130 -> __when_empty_op_t<_Self, _Receiver> 131 { 132 return __when_empty_op_t<_Self, _Receiver>{ 133 __self.__scope_, static_cast<_Self&&>(__self).__c_, 134 static_cast<_Receiver&&>(__rcvr)}; 135 } 136 137 template <__decays_to<__t> _Self, class _Env> tag_invokeexec::__scope::__when_empty_sender138 friend auto tag_invoke(get_completion_signatures_t, _Self&&, _Env&&) 139 -> completion_signatures_of_t<__copy_cvref_t<_Self, _Constrained>, 140 __env_t<_Env>> 141 { 142 return {}; 143 } 144 tag_invokeexec::__scope::__when_empty_sender145 friend auto tag_invoke(get_env_t, const __t&) noexcept -> empty_env 146 { 147 return {}; 148 } 149 150 const __impl* __scope_; 151 STDEXEC_ATTRIBUTE((no_unique_address)) 152 _Constrained __c_; 153 }; 154 }; 155 156 template <class _Constrained> 157 using __when_empty_sender_t = 158 stdexec::__t<__when_empty_sender<__id<__decay_t<_Constrained>>>>; 159 160 //////////////////////////////////////////////////////////////////////////// 161 // async_scope::nest implementation 162 template <class _ReceiverId> 163 struct __nest_op_base : __immovable 164 { 165 using _Receiver = stdexec::__t<_ReceiverId>; 166 const __impl* __scope_; 167 STDEXEC_ATTRIBUTE((no_unique_address)) 168 _Receiver __rcvr_; 169 }; 170 171 template <class _ReceiverId> 172 struct __nest_rcvr 173 { 174 using _Receiver = stdexec::__t<_ReceiverId>; 175 176 struct __t 177 { 178 using __id = __nest_rcvr; 179 using receiver_concept = stdexec::receiver_t; 180 __nest_op_base<_ReceiverId>* __op_; 181 __completeexec::__scope::__nest_rcvr::__t182 static void __complete(const __impl* __scope) noexcept 183 { 184 std::unique_lock __guard{__scope->__lock_}; 185 auto& __active = __scope->__active_; 186 if (--__active == 0) 187 { 188 auto __local = std::move(__scope->__waiters_); 189 __guard.unlock(); 190 __scope = nullptr; 191 // do not access __scope 192 while (!__local.empty()) 193 { 194 auto* __next = __local.pop_front(); 195 __next->__notify_waiter(__next); 196 // __scope must be considered deleted 197 } 198 } 199 } 200 201 template <__completion_tag _Tag, class... _As> 202 requires __callable<_Tag, _Receiver, _As...> tag_invokeexec::__scope::__nest_rcvr203 friend void tag_invoke(_Tag, __t&& __self, _As&&... __as) noexcept 204 { 205 auto __scope = __self.__op_->__scope_; 206 _Tag{}(std::move(__self.__op_->__rcvr_), 207 static_cast<_As&&>(__as)...); 208 // do not access __op_ 209 // do not access this 210 __complete(__scope); 211 } 212 tag_invokeexec::__scope::__nest_rcvr213 friend auto tag_invoke(get_env_t, const __t& __self) noexcept 214 -> __env_t<env_of_t<_Receiver>> 215 { 216 return make_env( 217 get_env(__self.__op_->__rcvr_), 218 with(get_stop_token, 219 __self.__op_->__scope_->__stop_source_.get_token())); 220 } 221 }; 222 }; 223 224 template <class _ConstrainedId, class _ReceiverId> 225 struct __nest_op 226 { 227 using _Constrained = stdexec::__t<_ConstrainedId>; 228 using _Receiver = stdexec::__t<_ReceiverId>; 229 230 struct __t : __nest_op_base<_ReceiverId> 231 { 232 using __id = __nest_op; 233 using __nest_rcvr_t = stdexec::__t<__nest_rcvr<_ReceiverId>>; 234 STDEXEC_IMMOVABLE_NO_UNIQUE_ADDRESS 235 connect_result_t<_Constrained, __nest_rcvr_t> __op_; 236 237 template <__decays_to<_Constrained> _Sender, 238 __decays_to<_Receiver> _Rcvr> __texec::__scope::__nest_op::__t239 explicit __t(const __impl* __scope, _Sender&& __c, _Rcvr&& __rcvr) : 240 __nest_op_base<_ReceiverId>{ 241 {}, __scope, static_cast<_Rcvr&&>(__rcvr)}, 242 __op_(stdexec::connect(static_cast<_Sender&&>(__c), 243 __nest_rcvr_t{this})) 244 {} 245 246 private: __start_exec::__scope::__nest_op::__t247 void __start_() noexcept 248 { 249 STDEXEC_ASSERT(this->__scope_); 250 std::unique_lock __guard{this->__scope_->__lock_}; 251 auto& __active = this->__scope_->__active_; 252 ++__active; 253 __guard.unlock(); 254 start(__op_); 255 } 256 tag_invokeexec::__scope::__nest_op257 friend void tag_invoke(start_t, __t& __self) noexcept 258 { 259 return __self.__start_(); 260 } 261 }; 262 }; 263 264 template <class _ConstrainedId> 265 struct __nest_sender 266 { 267 using _Constrained = stdexec::__t<_ConstrainedId>; 268 269 struct __t 270 { 271 using __id = __nest_sender; 272 using sender_concept = stdexec::sender_t; 273 274 const __impl* __scope_; 275 STDEXEC_ATTRIBUTE((no_unique_address)) 276 _Constrained __c_; 277 278 template <class _Receiver> 279 using __nest_operation_t = 280 stdexec::__t<__nest_op<_ConstrainedId, stdexec::__id<_Receiver>>>; 281 template <class _Receiver> 282 using __nest_receiver_t = 283 stdexec::__t<__nest_rcvr<stdexec::__id<_Receiver>>>; 284 285 template <__decays_to<__t> _Self, receiver _Receiver> 286 requires sender_to<__copy_cvref_t<_Self, _Constrained>, 287 __nest_receiver_t<_Receiver>> tag_invokeexec::__scope::__nest_sender288 [[nodiscard]] friend auto tag_invoke(connect_t, _Self&& __self, 289 _Receiver __rcvr) 290 -> __nest_operation_t<_Receiver> 291 { 292 return __nest_operation_t<_Receiver>{ 293 __self.__scope_, static_cast<_Self&&>(__self).__c_, 294 static_cast<_Receiver&&>(__rcvr)}; 295 } 296 297 template <__decays_to<__t> _Self, class _Env> tag_invokeexec::__scope::__nest_sender298 friend auto tag_invoke(get_completion_signatures_t, _Self&&, _Env&&) 299 -> completion_signatures_of_t<__copy_cvref_t<_Self, _Constrained>, 300 __env_t<_Env>> 301 { 302 return {}; 303 } 304 tag_invokeexec::__scope::__nest_sender305 friend auto tag_invoke(get_env_t, const __t&) noexcept -> empty_env 306 { 307 return {}; 308 } 309 }; 310 }; 311 312 template <class _Constrained> 313 using __nest_sender_t = 314 stdexec::__t<__nest_sender<__id<__decay_t<_Constrained>>>>; 315 316 //////////////////////////////////////////////////////////////////////////// 317 // async_scope::spawn_future implementation 318 enum class __future_step 319 { 320 __invalid = 0, 321 __created, 322 __future, 323 __no_future, 324 __deleted 325 }; 326 327 template <class _Sender, class _Env> 328 struct __future_state; 329 330 struct __forward_stopped 331 { 332 in_place_stop_source* __stop_source_; 333 operator ()exec::__scope::__forward_stopped334 void operator()() noexcept 335 { 336 __stop_source_->request_stop(); 337 } 338 }; 339 340 struct __subscription : __immovable 341 { 342 void (*__complete_)(__subscription*) noexcept = nullptr; 343 __completeexec::__scope::__subscription344 void __complete() noexcept 345 { 346 __complete_(this); 347 } 348 349 __subscription* __next_ = nullptr; 350 }; 351 352 template <class _SenderId, class _EnvId, class _ReceiverId> 353 struct __future_op 354 { 355 using _Sender = stdexec::__t<_SenderId>; 356 using _Env = stdexec::__t<_EnvId>; 357 using _Receiver = stdexec::__t<_ReceiverId>; 358 359 class __t : __subscription 360 { 361 using __forward_consumer = typename stop_token_of_t< 362 env_of_t<_Receiver>>::template callback_type<__forward_stopped>; 363 tag_invokeexec::__scope::__future_op364 friend void tag_invoke(start_t, __t& __self) noexcept 365 { 366 __self.__start_(); 367 } 368 __complete_()369 void __complete_() noexcept 370 { 371 try 372 { 373 auto __state = std::move(__state_); 374 STDEXEC_ASSERT(__state != nullptr); 375 std::unique_lock __guard{__state->__mutex_}; 376 // either the future is still in use or it has passed ownership 377 // to __state->__no_future_ 378 if (__state->__no_future_.get() != nullptr || 379 __state->__step_ != __future_step::__future) 380 { 381 // invalid state - there is a code bug in the state machine 382 std::terminate(); 383 } 384 else if (get_stop_token(get_env(__rcvr_)).stop_requested()) 385 { 386 __guard.unlock(); 387 set_stopped(static_cast<_Receiver&&>(__rcvr_)); 388 __guard.lock(); 389 } 390 else 391 { 392 std::visit( 393 [this, &__guard]<class _Tup>(_Tup& __tup) { 394 if constexpr (same_as<_Tup, std::monostate>) 395 { 396 std::terminate(); 397 } 398 else 399 { 400 std::apply( 401 [this, &__guard]<class... _As>(auto tag, 402 _As&... __as) { 403 __guard.unlock(); 404 tag(static_cast<_Receiver&&>(__rcvr_), 405 static_cast<_As&&>(__as)...); 406 __guard.lock(); 407 }, 408 __tup); 409 } 410 }, 411 __state->__data_); 412 } 413 } 414 catch (...) 415 { 416 set_error(static_cast<_Receiver&&>(__rcvr_), 417 std::current_exception()); 418 } 419 } 420 __start_()421 void __start_() noexcept 422 { 423 try 424 { 425 if (!!__state_) 426 { 427 std::unique_lock __guard{__state_->__mutex_}; 428 if (__state_->__data_.index() != 0) 429 { 430 __guard.unlock(); 431 __complete_(); 432 } 433 else 434 { 435 __state_->__subscribers_.push_back(this); 436 } 437 } 438 } 439 catch (...) 440 { 441 set_error(static_cast<_Receiver&&>(__rcvr_), 442 std::current_exception()); 443 } 444 } 445 446 STDEXEC_ATTRIBUTE((no_unique_address)) 447 _Receiver __rcvr_; 448 std::unique_ptr<__future_state<_Sender, _Env>> __state_; 449 STDEXEC_ATTRIBUTE((no_unique_address)) 450 __forward_consumer __forward_consumer_; 451 452 public: 453 using __id = __future_op; 454 ~__t()455 ~__t() noexcept 456 { 457 if (__state_ != nullptr) 458 { 459 auto __raw_state = __state_.get(); 460 std::unique_lock __guard{__raw_state->__mutex_}; 461 if (__raw_state->__data_.index() > 0) 462 { 463 // completed given sender 464 // state is no longer needed 465 return; 466 } 467 __raw_state->__no_future_ = std::move(__state_); 468 __raw_state->__step_from_to_(__guard, __future_step::__future, 469 __future_step::__no_future); 470 } 471 } 472 473 template <class _Receiver2> __t(_Receiver2 && __rcvr,std::unique_ptr<__future_state<_Sender,_Env>> __state)474 explicit __t(_Receiver2&& __rcvr, 475 std::unique_ptr<__future_state<_Sender, _Env>> __state) : 476 __subscription{{}, 477 [](__subscription* __self) noexcept -> void { 478 static_cast<__t*>(__self)->__complete_(); 479 }}, 480 __rcvr_(static_cast<_Receiver2&&>(__rcvr)), 481 __state_(std::move(__state)), 482 __forward_consumer_(get_stop_token(get_env(__rcvr_)), 483 __forward_stopped{&__state_->__stop_source_}) 484 {} 485 }; 486 }; 487 488 #if STDEXEC_NVHPC() 489 template <class _Fn> 490 struct __completion_as_tuple2_; 491 492 template <class _Tag, class... _Ts> 493 struct __completion_as_tuple2_<_Tag(_Ts&&...)> 494 { 495 using __t = std::tuple<_Tag, _Ts...>; 496 }; 497 template <class _Fn> 498 using __completion_as_tuple_t = stdexec::__t<__completion_as_tuple2_<_Fn>>; 499 500 #else 501 502 template <class _Tag, class... _Ts> 503 auto __completion_as_tuple_(_Tag (*)(_Ts&&...)) -> std::tuple<_Tag, _Ts...>; 504 template <class _Fn> 505 using __completion_as_tuple_t = 506 decltype(__scope::__completion_as_tuple_(static_cast<_Fn*>(nullptr))); 507 #endif 508 509 template <class... _Ts> 510 using __decay_values_t = 511 completion_signatures<set_value_t(__decay_t<_Ts>&&...)>; 512 513 template <class _Ty> 514 using __decay_error_t = completion_signatures<set_error_t(__decay_t<_Ty>&&)>; 515 516 template <class _Sender, class _Env> 517 using __future_completions_t = // 518 make_completion_signatures< 519 _Sender, __env_t<_Env>, 520 completion_signatures<set_stopped_t(), 521 set_error_t(std::exception_ptr&&)>, 522 __decay_values_t, __decay_error_t>; 523 524 template <class _Completions> 525 using __completions_as_variant = // 526 __mapply<__transform<__q<__completion_as_tuple_t>, 527 __mbind_front_q<std::variant, std::monostate>>, 528 _Completions>; 529 530 template <class _Ty> 531 struct __dynamic_delete 532 { __anonf62d8bc60202exec::__scope::__dynamic_delete533 __dynamic_delete() : __delete_([](_Ty* __p) { delete __p; }) {} 534 535 template <class _Uy> 536 requires convertible_to<_Uy*, _Ty*> __dynamic_deleteexec::__scope::__dynamic_delete537 __dynamic_delete(std::default_delete<_Uy>) : 538 __delete_([](_Ty* __p) { delete static_cast<_Uy*>(__p); }) 539 {} 540 541 template <class _Uy> 542 requires convertible_to<_Uy*, _Ty*> operator =exec::__scope::__dynamic_delete543 auto operator=(std::default_delete<_Uy> __d) -> __dynamic_delete& 544 { 545 __delete_ = __dynamic_delete{__d}.__delete_; 546 return *this; 547 } 548 operator ()exec::__scope::__dynamic_delete549 void operator()(_Ty* __p) 550 { 551 __delete_(__p); 552 } 553 554 void (*__delete_)(_Ty*); 555 }; 556 557 template <class _Completions, class _Env> 558 struct __future_state_base 559 { __future_state_baseexec::__scope::__future_state_base560 __future_state_base(_Env __env, const __impl* __scope) : 561 __forward_scope_{std::in_place, __scope->__stop_source_.get_token(), 562 __forward_stopped{&__stop_source_}}, 563 __env_( 564 make_env(static_cast<_Env&&>(__env), 565 with(get_stop_token, __scope->__stop_source_.get_token()))) 566 {} 567 ~__future_state_baseexec::__scope::__future_state_base568 ~__future_state_base() 569 { 570 std::unique_lock __guard{__mutex_}; 571 if (__step_ == __future_step::__created) 572 { 573 // exception during connect() will end up here 574 __step_from_to_(__guard, __future_step::__created, 575 __future_step::__deleted); 576 } 577 else if (__step_ != __future_step::__deleted) 578 { 579 // completing the given sender before the future is dropped will end 580 // here 581 __step_from_to_(__guard, __future_step::__future, 582 __future_step::__deleted); 583 } 584 } 585 __step_from_to_exec::__scope::__future_state_base586 void __step_from_to_(std::unique_lock<std::mutex>& __guard, 587 __future_step __from, __future_step __to) 588 { 589 STDEXEC_ASSERT(__guard.owns_lock()); 590 auto actual = std::exchange(__step_, __to); 591 STDEXEC_ASSERT(actual == __from); 592 } 593 594 in_place_stop_source __stop_source_; 595 std::optional<in_place_stop_callback<__forward_stopped>> __forward_scope_; 596 std::mutex __mutex_; 597 __future_step __step_ = __future_step::__created; 598 std::unique_ptr<__future_state_base, __dynamic_delete<__future_state_base>> 599 __no_future_; 600 __completions_as_variant<_Completions> __data_; 601 __intrusive_queue<&__subscription::__next_> __subscribers_; 602 __env_t<_Env> __env_; 603 }; 604 605 template <class _Completions, class _EnvId> 606 struct __future_rcvr 607 { 608 using _Env = stdexec::__t<_EnvId>; 609 610 struct __t 611 { 612 using __id = __future_rcvr; 613 using receiver_concept = stdexec::receiver_t; 614 __future_state_base<_Completions, _Env>* __state_; 615 const __impl* __scope_; 616 __dispatch_result_exec::__scope::__future_rcvr::__t617 void __dispatch_result_() noexcept 618 { 619 auto& __state = *__state_; 620 std::unique_lock __guard{__state.__mutex_}; 621 auto __local = std::move(__state.__subscribers_); 622 __state.__forward_scope_ = std::nullopt; 623 if (__state.__no_future_.get() != nullptr) 624 { 625 // nobody is waiting for the results 626 // delete this and return 627 __state.__step_from_to_(__guard, __future_step::__no_future, 628 __future_step::__deleted); 629 __guard.unlock(); 630 __state.__no_future_.reset(); 631 return; 632 } 633 __guard.unlock(); 634 while (!__local.empty()) 635 { 636 auto* __sub = __local.pop_front(); 637 __sub->__complete(); 638 } 639 } 640 641 template <__completion_tag _Tag, __movable_value... _As> tag_invokeexec::__scope::__future_rcvr642 friend void tag_invoke(_Tag, __t&& __self, _As&&... __as) noexcept 643 { 644 auto& __state = *__self.__state_; 645 try 646 { 647 std::unique_lock __guard{__state.__mutex_}; 648 using _Tuple = __decayed_tuple<_Tag, _As...>; 649 __state.__data_.template emplace<_Tuple>( 650 _Tag{}, static_cast<_As&&>(__as)...); 651 __guard.unlock(); 652 __self.__dispatch_result_(); 653 } 654 catch (...) 655 { 656 using _Tuple = std::tuple<set_error_t, std::exception_ptr>; 657 __state.__data_.template emplace<_Tuple>( 658 set_error_t{}, std::current_exception()); 659 } 660 } 661 tag_invokeexec::__scope::__future_rcvr662 friend auto tag_invoke(get_env_t, const __t& __self) noexcept 663 -> const __env_t<_Env>& 664 { 665 return __self.__state_->__env_; 666 } 667 }; 668 }; 669 670 template <class _Sender, class _Env> 671 using __future_receiver_t = 672 __t<__future_rcvr<__future_completions_t<_Sender, _Env>, __id<_Env>>>; 673 674 template <class _Sender, class _Env> 675 struct __future_state : 676 __future_state_base<__future_completions_t<_Sender, _Env>, _Env> 677 { 678 using _Completions = __future_completions_t<_Sender, _Env>; 679 __future_stateexec::__scope::__future_state680 __future_state(_Sender __sndr, _Env __env, const __impl* __scope) : 681 __future_state_base<_Completions, _Env>(static_cast<_Env&&>(__env), 682 __scope), 683 __op_( 684 stdexec::connect(static_cast<_Sender&&>(__sndr), 685 __future_receiver_t<_Sender, _Env>{this, __scope})) 686 {} 687 688 connect_result_t<_Sender, __future_receiver_t<_Sender, _Env>> __op_; 689 }; 690 691 template <class _SenderId, class _EnvId> 692 struct __future 693 { 694 using _Sender = stdexec::__t<_SenderId>; 695 using _Env = stdexec::__t<_EnvId>; 696 697 struct __t 698 { 699 using __id = __future; 700 using sender_concept = stdexec::sender_t; 701 702 __t(__t&&) = default; 703 auto operator=(__t&&) -> __t& = default; 704 ~__texec::__scope::__future::__t705 ~__t() noexcept 706 { 707 if (__state_ != nullptr) 708 { 709 auto __raw_state = __state_.get(); 710 std::unique_lock __guard{__raw_state->__mutex_}; 711 if (__raw_state->__data_.index() != 0) 712 { 713 // completed given sender 714 // state is no longer needed 715 return; 716 } 717 __raw_state->__no_future_ = std::move(__state_); 718 __raw_state->__step_from_to_(__guard, __future_step::__future, 719 __future_step::__no_future); 720 } 721 } 722 723 private: 724 friend struct async_scope; 725 template <class _Self> 726 using __completions_t = 727 __future_completions_t<__mfront<_Sender, _Self>, _Env>; 728 729 template <class _Receiver> 730 using __future_op_t = stdexec::__t< 731 __future_op<_SenderId, _EnvId, stdexec::__id<_Receiver>>>; 732 __texec::__scope::__future::__t733 explicit __t( 734 std::unique_ptr<__future_state<_Sender, _Env>> __state) noexcept : 735 __state_(std::move(__state)) 736 { 737 std::unique_lock __guard{__state_->__mutex_}; 738 __state_->__step_from_to_(__guard, __future_step::__created, 739 __future_step::__future); 740 } 741 742 template <__decays_to<__t> _Self, receiver _Receiver> 743 requires receiver_of<_Receiver, __completions_t<_Self>> tag_invokeexec::__scope::__future744 friend auto tag_invoke(connect_t, _Self&& __self, _Receiver __rcvr) 745 -> __future_op_t<_Receiver> 746 { 747 return __future_op_t<_Receiver>{static_cast<_Receiver&&>(__rcvr), 748 std::move(__self.__state_)}; 749 } 750 751 template <__decays_to<__t> _Self, class _OtherEnv> tag_invokeexec::__scope::__future752 friend auto tag_invoke(get_completion_signatures_t, _Self&&, 753 _OtherEnv&&) -> __completions_t<_Self> 754 { 755 return {}; 756 } 757 tag_invokeexec::__scope::__future758 friend auto tag_invoke(get_env_t, const __t&) noexcept -> empty_env 759 { 760 return {}; 761 } 762 763 std::unique_ptr<__future_state<_Sender, _Env>> __state_; 764 }; 765 }; 766 767 template <class _Sender, class _Env> 768 using __future_t = stdexec::__t< 769 __future<__id<__nest_sender_t<_Sender>>, __id<__decay_t<_Env>>>>; 770 771 //////////////////////////////////////////////////////////////////////////// 772 // async_scope::spawn implementation 773 template <class _Env> 774 using __spawn_env_t = 775 __env::__join_t<_Env, __env::__with<in_place_stop_token, get_stop_token_t>, 776 __env::__with<__inln::__scheduler, get_scheduler_t>>; 777 778 template <class _EnvId> 779 struct __spawn_op_base 780 { 781 using _Env = stdexec::__t<_EnvId>; 782 __spawn_env_t<_Env> __env_; 783 void (*__delete_)(__spawn_op_base*); 784 }; 785 786 template <class _EnvId> 787 struct __spawn_rcvr 788 { 789 using _Env = stdexec::__t<_EnvId>; 790 791 struct __t 792 { 793 using __id = __spawn_rcvr; 794 using receiver_concept = stdexec::receiver_t; 795 __spawn_op_base<_EnvId>* __op_; 796 797 template <__one_of<set_value_t, set_stopped_t> _Tag> tag_invokeexec::__scope::__spawn_rcvr798 friend void tag_invoke(_Tag, __t&& __self) noexcept 799 { 800 __self.__op_->__delete_(__self.__op_); 801 } 802 803 // BUGBUG NOT TO SPEC spawn shouldn't accept senders that can fail. 804 template <same_as<set_error_t> _Tag> tag_invokeexec::__scope::__spawn_rcvr805 [[noreturn]] friend void tag_invoke(_Tag, __t&&, 806 std::exception_ptr __eptr) noexcept 807 { 808 std::rethrow_exception(std::move(__eptr)); 809 } 810 tag_invokeexec::__scope::__spawn_rcvr811 friend auto tag_invoke(get_env_t, const __t& __self) noexcept 812 -> const __spawn_env_t<_Env>& 813 { 814 return __self.__op_->__env_; 815 } 816 }; 817 }; 818 819 template <class _Env> 820 using __spawn_receiver_t = stdexec::__t<__spawn_rcvr<__id<_Env>>>; 821 822 template <class _SenderId, class _EnvId> 823 struct __spawn_op 824 { 825 using _Env = stdexec::__t<_EnvId>; 826 using _Sender = stdexec::__t<_SenderId>; 827 828 struct __t : __spawn_op_base<_EnvId> 829 { 830 template <__decays_to<_Sender> _Sndr> __texec::__scope::__spawn_op::__t831 __t(_Sndr&& __sndr, _Env __env, const __impl* __scope) : 832 __spawn_op_base<_EnvId>{ 833 __env::__join( 834 static_cast<_Env&&>(__env), 835 __env::__with(__scope->__stop_source_.get_token(), 836 get_stop_token), 837 __env::__with(__inln::__scheduler{}, get_scheduler)), 838 [](__spawn_op_base<_EnvId>* __op) { 839 delete static_cast<__t*>(__op); 840 }}, 841 __op_(stdexec::connect(static_cast<_Sndr&&>(__sndr), 842 __spawn_receiver_t<_Env>{this})) 843 {} 844 __start_exec::__scope::__spawn_op::__t845 void __start_() noexcept 846 { 847 start(__op_); 848 } 849 tag_invokeexec::__scope::__spawn_op850 friend void tag_invoke(start_t, __t& __self) noexcept 851 { 852 return __self.__start_(); 853 } 854 855 connect_result_t<_Sender, __spawn_receiver_t<_Env>> __op_; 856 }; 857 }; 858 859 template <class _Sender, class _Env> 860 using __spawn_operation_t = stdexec::__t<__spawn_op<__id<_Sender>, __id<_Env>>>; 861 862 //////////////////////////////////////////////////////////////////////////// 863 // async_scope 864 struct async_scope : __immovable 865 { 866 async_scope() = default; 867 868 template <sender _Constrained> when_emptyexec::__scope::async_scope869 [[nodiscard]] auto when_empty(_Constrained&& __c) const 870 -> __when_empty_sender_t<_Constrained> 871 { 872 return __when_empty_sender_t<_Constrained>{ 873 &__impl_, static_cast<_Constrained&&>(__c)}; 874 } 875 on_emptyexec::__scope::async_scope876 [[nodiscard]] auto on_empty() const 877 { 878 return when_empty(just()); 879 } 880 881 template <sender _Constrained> 882 using nest_result_t = __nest_sender_t<_Constrained>; 883 884 template <sender _Constrained> nestexec::__scope::async_scope885 [[nodiscard]] auto nest(_Constrained&& __c) -> nest_result_t<_Constrained> 886 { 887 return nest_result_t<_Constrained>{&__impl_, 888 static_cast<_Constrained&&>(__c)}; 889 } 890 891 template <__movable_value _Env = empty_env, 892 sender_in<__spawn_env_t<_Env>> _Sender> 893 requires sender_to<nest_result_t<_Sender>, __spawn_receiver_t<_Env>> spawnexec::__scope::async_scope894 void spawn(_Sender&& __sndr, _Env __env = {}) 895 { 896 using __op_t = __spawn_operation_t<nest_result_t<_Sender>, _Env>; 897 // start is noexcept so we can assume that the operation will complete 898 // after this, which means we can rely on its self-ownership to ensure 899 // that it is eventually deleted 900 stdexec::start(*new __op_t{nest(static_cast<_Sender&&>(__sndr)), 901 static_cast<_Env&&>(__env), &__impl_}); 902 } 903 904 template <__movable_value _Env = empty_env, 905 sender_in<__env_t<_Env>> _Sender> spawn_futureexec::__scope::async_scope906 auto spawn_future(_Sender&& __sndr, _Env __env = {}) 907 -> __future_t<_Sender, _Env> 908 { 909 using __state_t = __future_state<nest_result_t<_Sender>, _Env>; 910 auto __state = 911 std::make_unique<__state_t>(nest(static_cast<_Sender&&>(__sndr)), 912 static_cast<_Env&&>(__env), &__impl_); 913 stdexec::start(__state->__op_); 914 return __future_t<_Sender, _Env>{std::move(__state)}; 915 } 916 get_stop_sourceexec::__scope::async_scope917 auto get_stop_source() noexcept -> in_place_stop_source& 918 { 919 return __impl_.__stop_source_; 920 } 921 get_stop_tokenexec::__scope::async_scope922 auto get_stop_token() const noexcept -> in_place_stop_token 923 { 924 return __impl_.__stop_source_.get_token(); 925 } 926 request_stopexec::__scope::async_scope927 auto request_stop() noexcept -> bool 928 { 929 return __impl_.__stop_source_.request_stop(); 930 } 931 932 private: 933 __impl __impl_; 934 }; 935 } // namespace __scope 936 937 using __scope::async_scope; 938 } // namespace exec 939