1 /* 2 * Copyright (c) 2021-2024 NVIDIA Corporation 3 * 4 * Licensed under the Apache License Version 2.0 with LLVM Exceptions 5 * (the "License"); you may not use this file except in compliance with 6 * the License. You may obtain a copy of the License at 7 * 8 * https://llvm.org/LICENSE.txt 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 #pragma once 17 18 #include "../stdexec/__detail/__intrusive_queue.hpp" 19 #include "../stdexec/execution.hpp" 20 #include "../stdexec/stop_token.hpp" 21 #include "env.hpp" 22 23 #include <mutex> 24 25 namespace exec 26 { 27 ///////////////////////////////////////////////////////////////////////////// 28 // async_scope 29 namespace __scope 30 { 31 using namespace stdexec; 32 33 struct __impl; 34 struct async_scope; 35 36 template <class _A> 37 concept __async_scope = requires(_A& __a) { 38 { 39 __a.nest(stdexec::just()) 40 } -> sender_of<stdexec::set_value_t()>; 41 }; 42 43 struct __task : __immovable 44 { 45 const __impl* __scope_; 46 void (*__notify_waiter)(__task*) noexcept; 47 __task* __next_ = nullptr; 48 }; 49 50 template <class _BaseEnv> 51 using __env_t = 52 make_env_t<_BaseEnv, prop<get_stop_token_t, inplace_stop_token>>; 53 54 struct __impl 55 { 56 inplace_stop_source __stop_source_{}; 57 mutable std::mutex __lock_{}; 58 mutable std::ptrdiff_t __active_ = 0; 59 mutable __intrusive_queue<&__task::__next_> __waiters_{}; 60 ~__implexec::__scope::__impl61 ~__impl() 62 { 63 std::unique_lock __guard{__lock_}; 64 STDEXEC_ASSERT(__active_ == 0); 65 STDEXEC_ASSERT(__waiters_.empty()); 66 } 67 }; 68 69 //////////////////////////////////////////////////////////////////////////// 70 // async_scope::when_empty implementation 71 template <class _ConstrainedId, class _ReceiverId> 72 struct __when_empty_op 73 { 74 using _Constrained = __cvref_t<_ConstrainedId>; 75 using _Receiver = stdexec::__t<_ReceiverId>; 76 77 struct __t : __task 78 { 79 using __id = __when_empty_op; 80 __texec::__scope::__when_empty_op::__t81 explicit __t(const __impl* __scope, _Constrained&& __sndr, 82 _Receiver __rcvr) : 83 __task{{}, __scope, __notify_waiter}, 84 __op_(stdexec::connect(static_cast<_Constrained&&>(__sndr), 85 static_cast<_Receiver&&>(__rcvr))) 86 {} 87 startexec::__scope::__when_empty_op::__t88 void start() & noexcept 89 { 90 std::unique_lock __guard{this->__scope_->__lock_}; 91 auto& __active = this->__scope_->__active_; 92 auto& __waiters = this->__scope_->__waiters_; 93 if (__active != 0) 94 { 95 __waiters.push_back(this); 96 return; 97 } 98 __guard.unlock(); 99 stdexec::start(this->__op_); 100 } 101 102 private: __notify_waiterexec::__scope::__when_empty_op::__t103 static void __notify_waiter(__task* __self) noexcept 104 { 105 stdexec::start(static_cast<__t*>(__self)->__op_); 106 } 107 108 STDEXEC_IMMOVABLE_NO_UNIQUE_ADDRESS 109 connect_result_t<_Constrained, _Receiver> __op_; 110 }; 111 }; 112 113 template <class _ConstrainedId> 114 struct __when_empty_sender 115 { 116 using _Constrained = stdexec::__t<_ConstrainedId>; 117 118 struct __t 119 { 120 using __id = __when_empty_sender; 121 using sender_concept = stdexec::sender_t; 122 123 template <class _Self, class _Receiver> 124 using __when_empty_op_t = 125 stdexec::__t<__when_empty_op<__cvref_id<_Self, _Constrained>, 126 stdexec::__id<_Receiver>>>; 127 128 template <__decays_to<__t> _Self, receiver _Receiver> 129 requires sender_to<__copy_cvref_t<_Self, _Constrained>, _Receiver> connectexec::__scope::__when_empty_sender::__t130 [[nodiscard]] static auto connect(_Self&& __self, _Receiver __rcvr) // 131 -> __when_empty_op_t<_Self, _Receiver> 132 { 133 return __when_empty_op_t<_Self, _Receiver>{ 134 __self.__scope_, static_cast<_Self&&>(__self).__c_, 135 static_cast<_Receiver&&>(__rcvr)}; 136 } 137 138 template <__decays_to<__t> _Self, class... _Env> get_completion_signaturesexec::__scope::__when_empty_sender::__t139 static auto get_completion_signatures(_Self&&, _Env&&...) 140 -> __completion_signatures_of_t<__copy_cvref_t<_Self, _Constrained>, 141 __env_t<_Env>...> 142 { 143 return {}; 144 } 145 146 const __impl* __scope_; 147 STDEXEC_ATTRIBUTE((no_unique_address)) 148 _Constrained __c_; 149 }; 150 }; 151 152 template <class _Constrained> 153 using __when_empty_sender_t = 154 stdexec::__t<__when_empty_sender<__id<__decay_t<_Constrained>>>>; 155 156 //////////////////////////////////////////////////////////////////////////// 157 // async_scope::nest implementation 158 template <class _ReceiverId> 159 struct __nest_op_base : __immovable 160 { 161 using _Receiver = stdexec::__t<_ReceiverId>; 162 const __impl* __scope_; 163 STDEXEC_ATTRIBUTE((no_unique_address)) 164 _Receiver __rcvr_; 165 }; 166 167 template <class _ReceiverId> 168 struct __nest_rcvr 169 { 170 using _Receiver = stdexec::__t<_ReceiverId>; 171 172 struct __t 173 { 174 using __id = __nest_rcvr; 175 using receiver_concept = stdexec::receiver_t; 176 __nest_op_base<_ReceiverId>* __op_; 177 __completeexec::__scope::__nest_rcvr::__t178 static void __complete(const __impl* __scope) noexcept 179 { 180 std::unique_lock __guard{__scope->__lock_}; 181 auto& __active = __scope->__active_; 182 if (--__active == 0) 183 { 184 auto __local = std::move(__scope->__waiters_); 185 __guard.unlock(); 186 __scope = nullptr; 187 // do not access __scope 188 while (!__local.empty()) 189 { 190 auto* __next = __local.pop_front(); 191 __next->__notify_waiter(__next); 192 // __scope must be considered deleted 193 } 194 } 195 } 196 197 template <class... _As> 198 requires __callable<set_value_t, _Receiver, _As...> set_valueexec::__scope::__nest_rcvr::__t199 void set_value(_As&&... __as) noexcept 200 { 201 auto __scope = __op_->__scope_; 202 stdexec::set_value(std::move(__op_->__rcvr_), 203 static_cast<_As&&>(__as)...); 204 // do not access __op_ 205 // do not access this 206 __complete(__scope); 207 } 208 209 template <class _Error> 210 requires __callable<set_error_t, _Receiver, _Error> set_errorexec::__scope::__nest_rcvr::__t211 void set_error(_Error&& __err) noexcept 212 { 213 auto __scope = __op_->__scope_; 214 stdexec::set_error(std::move(__op_->__rcvr_), 215 static_cast<_Error&&>(__err)); 216 // do not access __op_ 217 // do not access this 218 __complete(__scope); 219 } 220 set_stoppedexec::__scope::__nest_rcvr::__t221 void set_stopped() noexcept 222 requires __callable<set_stopped_t, _Receiver> 223 { 224 auto __scope = __op_->__scope_; 225 stdexec::set_stopped(std::move(__op_->__rcvr_)); 226 // do not access __op_ 227 // do not access this 228 __complete(__scope); 229 } 230 get_envexec::__scope::__nest_rcvr::__t231 auto get_env() const noexcept -> __env_t<env_of_t<_Receiver>> 232 { 233 return make_env( 234 stdexec::get_env(__op_->__rcvr_), 235 stdexec::prop{get_stop_token, 236 __op_->__scope_->__stop_source_.get_token()}); 237 } 238 }; 239 }; 240 241 template <class _ConstrainedId, class _ReceiverId> 242 struct __nest_op 243 { 244 using _Constrained = stdexec::__t<_ConstrainedId>; 245 using _Receiver = stdexec::__t<_ReceiverId>; 246 247 struct __t : __nest_op_base<_ReceiverId> 248 { 249 using __id = __nest_op; 250 using __nest_rcvr_t = stdexec::__t<__nest_rcvr<_ReceiverId>>; 251 STDEXEC_IMMOVABLE_NO_UNIQUE_ADDRESS 252 connect_result_t<_Constrained, __nest_rcvr_t> __op_; 253 254 template <__decays_to<_Constrained> _Sender, 255 __decays_to<_Receiver> _Rcvr> __texec::__scope::__nest_op::__t256 explicit __t(const __impl* __scope, _Sender&& __c, _Rcvr&& __rcvr) : 257 __nest_op_base<_ReceiverId>{ 258 {}, __scope, static_cast<_Rcvr&&>(__rcvr)}, 259 __op_(stdexec::connect(static_cast<_Sender&&>(__c), 260 __nest_rcvr_t{this})) 261 {} 262 startexec::__scope::__nest_op::__t263 void start() & noexcept 264 { 265 STDEXEC_ASSERT(this->__scope_); 266 std::unique_lock __guard{this->__scope_->__lock_}; 267 auto& __active = this->__scope_->__active_; 268 ++__active; 269 __guard.unlock(); 270 stdexec::start(__op_); 271 } 272 }; 273 }; 274 275 template <class _ConstrainedId> 276 struct __nest_sender 277 { 278 using _Constrained = stdexec::__t<_ConstrainedId>; 279 280 struct __t 281 { 282 using __id = __nest_sender; 283 using sender_concept = stdexec::sender_t; 284 285 const __impl* __scope_; 286 STDEXEC_ATTRIBUTE((no_unique_address)) 287 _Constrained __c_; 288 289 template <class _Receiver> 290 using __nest_operation_t = 291 stdexec::__t<__nest_op<_ConstrainedId, stdexec::__id<_Receiver>>>; 292 293 template <class _Receiver> 294 using __nest_receiver_t = 295 stdexec::__t<__nest_rcvr<stdexec::__id<_Receiver>>>; 296 297 template <__decays_to<__t> _Self, receiver _Receiver> 298 requires sender_to<__copy_cvref_t<_Self, _Constrained>, 299 __nest_receiver_t<_Receiver>> connectexec::__scope::__nest_sender::__t300 [[nodiscard]] static auto connect(_Self&& __self, _Receiver __rcvr) 301 -> __nest_operation_t<_Receiver> 302 { 303 return __nest_operation_t<_Receiver>{ 304 __self.__scope_, static_cast<_Self&&>(__self).__c_, 305 static_cast<_Receiver&&>(__rcvr)}; 306 } 307 308 template <__decays_to<__t> _Self, class... _Env> get_completion_signaturesexec::__scope::__nest_sender::__t309 static auto get_completion_signatures(_Self&&, _Env&&...) 310 -> __completion_signatures_of_t<__copy_cvref_t<_Self, _Constrained>, 311 __env_t<_Env>...> 312 { 313 return {}; 314 } 315 }; 316 }; 317 318 template <class _Constrained> 319 using __nest_sender_t = 320 stdexec::__t<__nest_sender<__id<__decay_t<_Constrained>>>>; 321 322 //////////////////////////////////////////////////////////////////////////// 323 // async_scope::spawn_future implementation 324 enum class __future_step 325 { 326 __invalid = 0, 327 __created, 328 __future, 329 __no_future, 330 __deleted 331 }; 332 333 template <class _Sender, class _Env> 334 struct __future_state; 335 336 struct __forward_stopped 337 { 338 inplace_stop_source* __stop_source_; 339 operator ()exec::__scope::__forward_stopped340 void operator()() noexcept 341 { 342 __stop_source_->request_stop(); 343 } 344 }; 345 346 struct __subscription : __immovable 347 { 348 void (*__complete_)(__subscription*) noexcept = nullptr; 349 __completeexec::__scope::__subscription350 void __complete() noexcept 351 { 352 __complete_(this); 353 } 354 355 __subscription* __next_ = nullptr; 356 }; 357 358 template <class _SenderId, class _EnvId, class _ReceiverId> 359 struct __future_op 360 { 361 using _Sender = stdexec::__t<_SenderId>; 362 using _Env = stdexec::__t<_EnvId>; 363 using _Receiver = stdexec::__t<_ReceiverId>; 364 365 class __t : __subscription 366 { 367 using __forward_consumer = typename stop_token_of_t< 368 env_of_t<_Receiver>>::template callback_type<__forward_stopped>; 369 __complete_()370 void __complete_() noexcept 371 { 372 try 373 { 374 auto __state = std::move(__state_); 375 STDEXEC_ASSERT(__state != nullptr); 376 std::unique_lock __guard{__state->__mutex_}; 377 // either the future is still in use or it has passed ownership 378 // to __state->__no_future_ 379 if (__state->__no_future_.get() != nullptr || 380 __state->__step_ != __future_step::__future) 381 { 382 // invalid state - there is a code bug in the state machine 383 std::terminate(); 384 } 385 else if (get_stop_token(get_env(__rcvr_)).stop_requested()) 386 { 387 __guard.unlock(); 388 stdexec::set_stopped(static_cast<_Receiver&&>(__rcvr_)); 389 __guard.lock(); 390 } 391 else 392 { 393 std::visit( 394 [this, &__guard]<class _Tup>(_Tup& __tup) { 395 if constexpr (same_as<_Tup, std::monostate>) 396 { 397 std::terminate(); 398 } 399 else 400 { 401 std::apply( 402 [this, &__guard]<class... _As>( 403 auto tag, _As&... __as) { 404 __guard.unlock(); 405 tag(static_cast<_Receiver&&>(__rcvr_), 406 static_cast<_As&&>(__as)...); 407 __guard.lock(); 408 }, 409 __tup); 410 } 411 }, 412 __state->__data_); 413 } 414 } 415 catch (...) 416 { 417 stdexec::set_error(static_cast<_Receiver&&>(__rcvr_), 418 std::current_exception()); 419 } 420 } 421 422 STDEXEC_ATTRIBUTE((no_unique_address)) 423 _Receiver __rcvr_; 424 std::unique_ptr<__future_state<_Sender, _Env>> __state_; 425 STDEXEC_ATTRIBUTE((no_unique_address)) 426 __forward_consumer __forward_consumer_; 427 428 public: 429 using __id = __future_op; 430 ~__t()431 ~__t() noexcept 432 { 433 if (__state_ != nullptr) 434 { 435 auto __raw_state = __state_.get(); 436 std::unique_lock __guard{__raw_state->__mutex_}; 437 if (__raw_state->__data_.index() > 0) 438 { 439 // completed given sender 440 // state is no longer needed 441 return; 442 } 443 __raw_state->__no_future_ = std::move(__state_); 444 __raw_state->__step_from_to_(__guard, __future_step::__future, 445 __future_step::__no_future); 446 } 447 } 448 449 template <class _Receiver2> __t(_Receiver2 && __rcvr,std::unique_ptr<__future_state<_Sender,_Env>> __state)450 explicit __t(_Receiver2&& __rcvr, 451 std::unique_ptr<__future_state<_Sender, _Env>> __state) : 452 __subscription{{}, 453 [](__subscription* __self) noexcept -> void { 454 static_cast<__t*>(__self)->__complete_(); 455 }}, 456 __rcvr_(static_cast<_Receiver2&&>(__rcvr)), 457 __state_(std::move(__state)), 458 __forward_consumer_(get_stop_token(get_env(__rcvr_)), 459 __forward_stopped{&__state_->__stop_source_}) 460 {} 461 start()462 void start() & noexcept 463 { 464 try 465 { 466 if (!!__state_) 467 { 468 std::unique_lock __guard{__state_->__mutex_}; 469 if (__state_->__data_.index() != 0) 470 { 471 __guard.unlock(); 472 __complete_(); 473 } 474 else 475 { 476 __state_->__subscribers_.push_back(this); 477 } 478 } 479 } 480 catch (...) 481 { 482 stdexec::set_error(static_cast<_Receiver&&>(__rcvr_), 483 std::current_exception()); 484 } 485 } 486 }; 487 }; 488 489 #if STDEXEC_NVHPC() 490 template <class _Fn> 491 struct __completion_as_tuple2_; 492 493 template <class _Tag, class... _Ts> 494 struct __completion_as_tuple2_<_Tag(_Ts...)> 495 { 496 using __t = std::tuple<_Tag, _Ts...>; 497 }; 498 template <class _Fn> 499 using __completion_as_tuple_t = stdexec::__t<__completion_as_tuple2_<_Fn>>; 500 501 #else 502 503 template <class _Tag, class... _Ts> 504 auto __completion_as_tuple_(_Tag (*)(_Ts...)) -> std::tuple<_Tag, _Ts...>; 505 506 template <class _Fn> 507 using __completion_as_tuple_t = 508 decltype(__scope::__completion_as_tuple_(static_cast<_Fn*>(nullptr))); 509 #endif 510 511 template <class... _Ts> 512 using __decay_values_t = completion_signatures<set_value_t(__decay_t<_Ts>...)>; 513 514 template <class _Ty> 515 using __decay_error_t = completion_signatures<set_error_t(__decay_t<_Ty>)>; 516 517 template <class _Sender, class _Env> 518 using __future_completions_t = // 519 transform_completion_signatures_of< 520 _Sender, __env_t<_Env>, 521 completion_signatures<set_stopped_t(), set_error_t(std::exception_ptr)>, 522 __decay_values_t, __decay_error_t>; 523 524 template <class _Completions> 525 using __completions_as_variant = // 526 __mapply<__mtransform<__q<__completion_as_tuple_t>, 527 __mbind_front_q<std::variant, std::monostate>>, 528 _Completions>; 529 530 template <class _Ty> 531 struct __dynamic_delete 532 { __anonf62d8bc60202exec::__scope::__dynamic_delete533 __dynamic_delete() : __delete_([](_Ty* __p) { delete __p; }) {} 534 535 template <class _Uy> 536 requires convertible_to<_Uy*, _Ty*> __dynamic_deleteexec::__scope::__dynamic_delete537 __dynamic_delete(std::default_delete<_Uy>) : 538 __delete_([](_Ty* __p) { delete static_cast<_Uy*>(__p); }) 539 {} 540 541 template <class _Uy> 542 requires convertible_to<_Uy*, _Ty*> operator =exec::__scope::__dynamic_delete543 auto operator=(std::default_delete<_Uy> __d) -> __dynamic_delete& 544 { 545 __delete_ = __dynamic_delete{__d}.__delete_; 546 return *this; 547 } 548 operator ()exec::__scope::__dynamic_delete549 void operator()(_Ty* __p) 550 { 551 __delete_(__p); 552 } 553 554 void (*__delete_)(_Ty*); 555 }; 556 557 template <class _Completions, class _Env> 558 struct __future_state_base 559 { __future_state_baseexec::__scope::__future_state_base560 __future_state_base(_Env __env, const __impl* __scope) : 561 __forward_scope_{std::in_place, __scope->__stop_source_.get_token(), 562 __forward_stopped{&__stop_source_}}, 563 __env_(make_env( 564 static_cast<_Env&&>(__env), 565 stdexec::prop{get_stop_token, __scope->__stop_source_.get_token()})) 566 {} 567 ~__future_state_baseexec::__scope::__future_state_base568 ~__future_state_base() 569 { 570 std::unique_lock __guard{__mutex_}; 571 if (__step_ == __future_step::__created) 572 { 573 // exception during connect() will end up here 574 __step_from_to_(__guard, __future_step::__created, 575 __future_step::__deleted); 576 } 577 else if (__step_ != __future_step::__deleted) 578 { 579 // completing the given sender before the future is dropped will end 580 // here 581 __step_from_to_(__guard, __future_step::__future, 582 __future_step::__deleted); 583 } 584 } 585 __step_from_to_exec::__scope::__future_state_base586 void __step_from_to_(std::unique_lock<std::mutex>& __guard, 587 __future_step __from, __future_step __to) 588 { 589 STDEXEC_ASSERT(__guard.owns_lock()); 590 auto actual = std::exchange(__step_, __to); 591 STDEXEC_ASSERT(actual == __from); 592 } 593 594 inplace_stop_source __stop_source_; 595 std::optional<inplace_stop_callback<__forward_stopped>> __forward_scope_; 596 std::mutex __mutex_; 597 __future_step __step_ = __future_step::__created; 598 std::unique_ptr<__future_state_base, __dynamic_delete<__future_state_base>> 599 __no_future_; 600 __completions_as_variant<_Completions> __data_; 601 __intrusive_queue<&__subscription::__next_> __subscribers_; 602 __env_t<_Env> __env_; 603 }; 604 605 template <class _Completions, class _EnvId> 606 struct __future_rcvr 607 { 608 using _Env = stdexec::__t<_EnvId>; 609 610 struct __t 611 { 612 using __id = __future_rcvr; 613 using receiver_concept = stdexec::receiver_t; 614 __future_state_base<_Completions, _Env>* __state_; 615 const __impl* __scope_; 616 __dispatch_result_exec::__scope::__future_rcvr::__t617 void __dispatch_result_() noexcept 618 { 619 auto& __state = *__state_; 620 std::unique_lock __guard{__state.__mutex_}; 621 auto __local = std::move(__state.__subscribers_); 622 __state.__forward_scope_ = std::nullopt; 623 if (__state.__no_future_.get() != nullptr) 624 { 625 // nobody is waiting for the results 626 // delete this and return 627 __state.__step_from_to_(__guard, __future_step::__no_future, 628 __future_step::__deleted); 629 __guard.unlock(); 630 __state.__no_future_.reset(); 631 return; 632 } 633 __guard.unlock(); 634 while (!__local.empty()) 635 { 636 auto* __sub = __local.pop_front(); 637 __sub->__complete(); 638 } 639 } 640 641 template <class _Tag, class... _As> __save_completionexec::__scope::__future_rcvr::__t642 bool __save_completion(_Tag, _As&&... __as) noexcept 643 { 644 auto& __state = *__state_; 645 try 646 { 647 std::unique_lock __guard{__state.__mutex_}; 648 using _Tuple = __decayed_std_tuple<_Tag, _As...>; 649 __state.__data_.template emplace<_Tuple>( 650 _Tag(), static_cast<_As&&>(__as)...); 651 return true; 652 } 653 catch (...) 654 { 655 using _Tuple = std::tuple<set_error_t, std::exception_ptr>; 656 __state.__data_.template emplace<_Tuple>( 657 set_error_t(), std::current_exception()); 658 } 659 return false; 660 } 661 662 template <__movable_value... _As> set_valueexec::__scope::__future_rcvr::__t663 void set_value(_As&&... __as) noexcept 664 { 665 if (__save_completion(set_value_t(), static_cast<_As&&>(__as)...)) 666 { 667 __dispatch_result_(); 668 } 669 } 670 671 template <__movable_value _Error> set_errorexec::__scope::__future_rcvr::__t672 void set_error(_Error&& __err) noexcept 673 { 674 if (__save_completion(set_error_t(), static_cast<_Error&&>(__err))) 675 { 676 __dispatch_result_(); 677 } 678 } 679 set_stoppedexec::__scope::__future_rcvr::__t680 void set_stopped() noexcept 681 { 682 if (__save_completion(set_stopped_t())) 683 { 684 __dispatch_result_(); 685 } 686 } 687 get_envexec::__scope::__future_rcvr::__t688 auto get_env() const noexcept -> const __env_t<_Env>& 689 { 690 return __state_->__env_; 691 } 692 }; 693 }; 694 695 template <class _Sender, class _Env> 696 using __future_receiver_t = 697 __t<__future_rcvr<__future_completions_t<_Sender, _Env>, __id<_Env>>>; 698 699 template <class _Sender, class _Env> 700 struct __future_state : 701 __future_state_base<__future_completions_t<_Sender, _Env>, _Env> 702 { 703 using _Completions = __future_completions_t<_Sender, _Env>; 704 __future_stateexec::__scope::__future_state705 __future_state(_Sender __sndr, _Env __env, const __impl* __scope) : 706 __future_state_base<_Completions, _Env>(static_cast<_Env&&>(__env), 707 __scope), 708 __op_( 709 stdexec::connect(static_cast<_Sender&&>(__sndr), 710 __future_receiver_t<_Sender, _Env>{this, __scope})) 711 {} 712 713 connect_result_t<_Sender, __future_receiver_t<_Sender, _Env>> __op_; 714 }; 715 716 template <class _SenderId, class _EnvId> 717 struct __future 718 { 719 using _Sender = stdexec::__t<_SenderId>; 720 using _Env = stdexec::__t<_EnvId>; 721 722 class __t 723 { 724 template <class _Self> 725 using __completions_t = 726 __future_completions_t<__mfront<_Sender, _Self>, _Env>; 727 728 template <class _Receiver> 729 using __future_op_t = stdexec::__t< 730 __future_op<_SenderId, _EnvId, stdexec::__id<_Receiver>>>; 731 732 public: 733 using __id = __future; 734 using sender_concept = stdexec::sender_t; 735 736 __t(__t&&) = default; 737 auto operator=(__t&&) -> __t& = default; 738 ~__t()739 ~__t() noexcept 740 { 741 if (__state_ != nullptr) 742 { 743 auto __raw_state = __state_.get(); 744 std::unique_lock __guard{__raw_state->__mutex_}; 745 if (__raw_state->__data_.index() != 0) 746 { 747 // completed given sender 748 // state is no longer needed 749 return; 750 } 751 __raw_state->__no_future_ = std::move(__state_); 752 __raw_state->__step_from_to_(__guard, __future_step::__future, 753 __future_step::__no_future); 754 } 755 } 756 757 template <__decays_to<__t> _Self, receiver _Receiver> 758 requires receiver_of<_Receiver, __completions_t<_Self>> connect(_Self && __self,_Receiver __rcvr)759 static auto connect(_Self&& __self, 760 _Receiver __rcvr) -> __future_op_t<_Receiver> 761 { 762 return __future_op_t<_Receiver>{ 763 static_cast<_Receiver&&>(__rcvr), 764 static_cast<_Self&&>(__self).__state_}; 765 } 766 767 template <__decays_to<__t> _Self, class... _OtherEnv> get_completion_signatures(_Self &&,_OtherEnv &&...)768 static auto get_completion_signatures(_Self&&, _OtherEnv&&...) 769 -> __completions_t<_Self> 770 { 771 return {}; 772 } 773 774 private: 775 friend struct async_scope; 776 __t(std::unique_ptr<__future_state<_Sender,_Env>> __state)777 explicit __t( 778 std::unique_ptr<__future_state<_Sender, _Env>> __state) noexcept : 779 __state_(std::move(__state)) 780 { 781 std::unique_lock __guard{__state_->__mutex_}; 782 __state_->__step_from_to_(__guard, __future_step::__created, 783 __future_step::__future); 784 } 785 786 std::unique_ptr<__future_state<_Sender, _Env>> __state_; 787 }; 788 }; 789 790 template <class _Sender, class _Env> 791 using __future_t = stdexec::__t< 792 __future<__id<__nest_sender_t<_Sender>>, __id<__decay_t<_Env>>>>; 793 794 //////////////////////////////////////////////////////////////////////////// 795 // async_scope::spawn implementation 796 struct __spawn_env_ 797 { 798 inplace_stop_token __token_; 799 queryexec::__scope::__spawn_env_800 auto query(get_stop_token_t) const noexcept -> inplace_stop_token 801 { 802 return __token_; 803 } 804 queryexec::__scope::__spawn_env_805 auto query(get_scheduler_t) const noexcept -> __inln::__scheduler 806 { 807 return {}; 808 } 809 }; 810 811 template <class _Env> 812 using __spawn_env_t = __env::__join_t<_Env, __spawn_env_>; 813 814 template <class _EnvId> 815 struct __spawn_op_base 816 { 817 using _Env = stdexec::__t<_EnvId>; 818 __spawn_env_t<_Env> __env_; 819 void (*__delete_)(__spawn_op_base*); 820 }; 821 822 template <class _EnvId> 823 struct __spawn_rcvr 824 { 825 using _Env = stdexec::__t<_EnvId>; 826 827 struct __t 828 { 829 using __id = __spawn_rcvr; 830 using receiver_concept = stdexec::receiver_t; 831 __spawn_op_base<_EnvId>* __op_; 832 set_valueexec::__scope::__spawn_rcvr::__t833 void set_value() noexcept 834 { 835 __op_->__delete_(__op_); 836 } 837 838 // BUGBUG NOT TO SPEC spawn shouldn't accept senders that can fail. set_errorexec::__scope::__spawn_rcvr::__t839 [[noreturn]] void set_error(std::exception_ptr __eptr) noexcept 840 { 841 std::rethrow_exception(std::move(__eptr)); 842 } 843 set_stoppedexec::__scope::__spawn_rcvr::__t844 void set_stopped() noexcept 845 { 846 __op_->__delete_(__op_); 847 } 848 get_envexec::__scope::__spawn_rcvr::__t849 auto get_env() const noexcept -> const __spawn_env_t<_Env>& 850 { 851 return __op_->__env_; 852 } 853 }; 854 }; 855 856 template <class _Env> 857 using __spawn_receiver_t = stdexec::__t<__spawn_rcvr<__id<_Env>>>; 858 859 template <class _SenderId, class _EnvId> 860 struct __spawn_op 861 { 862 using _Env = stdexec::__t<_EnvId>; 863 using _Sender = stdexec::__t<_SenderId>; 864 865 struct __t : __spawn_op_base<_EnvId> 866 { 867 template <__decays_to<_Sender> _Sndr> __texec::__scope::__spawn_op::__t868 __t(_Sndr&& __sndr, _Env __env, const __impl* __scope) : 869 __spawn_op_base<_EnvId>{ 870 __env::__join( 871 static_cast<_Env&&>(__env), 872 __spawn_env_{__scope->__stop_source_.get_token()}), 873 [](__spawn_op_base<_EnvId>* __op) { 874 delete static_cast<__t*>(__op); 875 }}, 876 __op_(stdexec::connect(static_cast<_Sndr&&>(__sndr), 877 __spawn_receiver_t<_Env>{this})) 878 {} 879 startexec::__scope::__spawn_op::__t880 void start() & noexcept 881 { 882 stdexec::start(__op_); 883 } 884 885 connect_result_t<_Sender, __spawn_receiver_t<_Env>> __op_; 886 }; 887 }; 888 889 template <class _Sender, class _Env> 890 using __spawn_operation_t = stdexec::__t<__spawn_op<__id<_Sender>, __id<_Env>>>; 891 892 //////////////////////////////////////////////////////////////////////////// 893 // async_scope 894 struct async_scope : __immovable 895 { 896 async_scope() = default; 897 898 template <sender _Constrained> when_emptyexec::__scope::async_scope899 [[nodiscard]] auto when_empty(_Constrained&& __c) const 900 -> __when_empty_sender_t<_Constrained> 901 { 902 return __when_empty_sender_t<_Constrained>{ 903 &__impl_, static_cast<_Constrained&&>(__c)}; 904 } 905 on_emptyexec::__scope::async_scope906 [[nodiscard]] auto on_empty() const 907 { 908 return when_empty(just()); 909 } 910 911 template <sender _Constrained> 912 using nest_result_t = __nest_sender_t<_Constrained>; 913 914 template <sender _Constrained> nestexec::__scope::async_scope915 [[nodiscard]] auto nest(_Constrained&& __c) -> nest_result_t<_Constrained> 916 { 917 return nest_result_t<_Constrained>{&__impl_, 918 static_cast<_Constrained&&>(__c)}; 919 } 920 921 template <__movable_value _Env = empty_env, 922 sender_in<__spawn_env_t<_Env>> _Sender> 923 requires sender_to<nest_result_t<_Sender>, __spawn_receiver_t<_Env>> spawnexec::__scope::async_scope924 void spawn(_Sender&& __sndr, _Env __env = {}) 925 { 926 using __op_t = __spawn_operation_t<nest_result_t<_Sender>, _Env>; 927 // start is noexcept so we can assume that the operation will complete 928 // after this, which means we can rely on its self-ownership to ensure 929 // that it is eventually deleted 930 stdexec::start(*new __op_t{nest(static_cast<_Sender&&>(__sndr)), 931 static_cast<_Env&&>(__env), &__impl_}); 932 } 933 934 template <__movable_value _Env = empty_env, 935 sender_in<__env_t<_Env>> _Sender> spawn_futureexec::__scope::async_scope936 auto spawn_future(_Sender&& __sndr, 937 _Env __env = {}) -> __future_t<_Sender, _Env> 938 { 939 using __state_t = __future_state<nest_result_t<_Sender>, _Env>; 940 auto __state = 941 std::make_unique<__state_t>(nest(static_cast<_Sender&&>(__sndr)), 942 static_cast<_Env&&>(__env), &__impl_); 943 stdexec::start(__state->__op_); 944 return __future_t<_Sender, _Env>{std::move(__state)}; 945 } 946 get_stop_sourceexec::__scope::async_scope947 auto get_stop_source() noexcept -> inplace_stop_source& 948 { 949 return __impl_.__stop_source_; 950 } 951 get_stop_tokenexec::__scope::async_scope952 auto get_stop_token() const noexcept -> inplace_stop_token 953 { 954 return __impl_.__stop_source_.get_token(); 955 } 956 request_stopexec::__scope::async_scope957 auto request_stop() noexcept -> bool 958 { 959 return __impl_.__stop_source_.request_stop(); 960 } 961 962 private: 963 __impl __impl_; 964 }; 965 } // namespace __scope 966 967 using __scope::async_scope; 968 969 template <class _AsyncScope, class _Sender> 970 using nest_result_t = decltype(stdexec::__declval<_AsyncScope&>().nest( 971 stdexec::__declval<_Sender&&>())); 972 } // namespace exec 973