1 /* 2 * Copyright (c) 2021-2024 NVIDIA Corporation 3 * 4 * Licensed under the Apache License Version 2.0 with LLVM Exceptions 5 * (the "License"); you may not use this file except in compliance with 6 * the License. You may obtain a copy of the License at 7 * 8 * https://llvm.org/LICENSE.txt 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 #pragma once 17 18 #include "../stdexec/__detail/__intrusive_queue.hpp" 19 #include "../stdexec/__detail/__optional.hpp" 20 #include "../stdexec/execution.hpp" 21 #include "../stdexec/stop_token.hpp" 22 #include "env.hpp" 23 24 #include <mutex> 25 26 namespace exec 27 { 28 ///////////////////////////////////////////////////////////////////////////// 29 // async_scope 30 namespace __scope 31 { 32 using namespace stdexec; 33 34 struct __impl; 35 struct async_scope; 36 37 template <class _A> 38 concept __async_scope = requires(_A& __a) { 39 { 40 __a.nest(stdexec::just()) 41 } -> sender_of<stdexec::set_value_t()>; 42 }; 43 44 struct __task : __immovable 45 { 46 const __impl* __scope_; 47 void (*__notify_waiter)(__task*) noexcept; 48 __task* __next_ = nullptr; 49 }; 50 51 template <class _BaseEnv> 52 using __env_t = 53 make_env_t<_BaseEnv, prop<get_stop_token_t, inplace_stop_token>>; 54 55 struct __impl 56 { 57 inplace_stop_source __stop_source_{}; 58 mutable std::mutex __lock_{}; 59 mutable std::ptrdiff_t __active_ = 0; 60 mutable __intrusive_queue<&__task::__next_> __waiters_{}; 61 ~__implexec::__scope::__impl62 ~__impl() 63 { 64 std::unique_lock __guard{__lock_}; 65 STDEXEC_ASSERT(__active_ == 0); 66 STDEXEC_ASSERT(__waiters_.empty()); 67 } 68 }; 69 70 //////////////////////////////////////////////////////////////////////////// 71 // async_scope::when_empty implementation 72 template <class _ConstrainedId, class _ReceiverId> 73 struct __when_empty_op 74 { 75 using _Constrained = __cvref_t<_ConstrainedId>; 76 using _Receiver = stdexec::__t<_ReceiverId>; 77 78 struct __t : __task 79 { 80 using __id = __when_empty_op; 81 __texec::__scope::__when_empty_op::__t82 explicit __t(const __impl* __scope, _Constrained&& __sndr, 83 _Receiver __rcvr) : 84 __task{{}, __scope, __notify_waiter}, 85 __op_(stdexec::connect(static_cast<_Constrained&&>(__sndr), 86 static_cast<_Receiver&&>(__rcvr))) 87 {} 88 startexec::__scope::__when_empty_op::__t89 void start() & noexcept 90 { 91 std::unique_lock __guard{this->__scope_->__lock_}; 92 auto& __active = this->__scope_->__active_; 93 auto& __waiters = this->__scope_->__waiters_; 94 if (__active != 0) 95 { 96 __waiters.push_back(this); 97 return; 98 } 99 __guard.unlock(); 100 stdexec::start(this->__op_); 101 } 102 103 private: __notify_waiterexec::__scope::__when_empty_op::__t104 static void __notify_waiter(__task* __self) noexcept 105 { 106 stdexec::start(static_cast<__t*>(__self)->__op_); 107 } 108 109 STDEXEC_IMMOVABLE_NO_UNIQUE_ADDRESS 110 connect_result_t<_Constrained, _Receiver> __op_; 111 }; 112 }; 113 114 template <class _ConstrainedId> 115 struct __when_empty_sender 116 { 117 using _Constrained = stdexec::__t<_ConstrainedId>; 118 119 struct __t 120 { 121 using __id = __when_empty_sender; 122 using sender_concept = stdexec::sender_t; 123 124 template <class _Self, class _Receiver> 125 using __when_empty_op_t = 126 stdexec::__t<__when_empty_op<__cvref_id<_Self, _Constrained>, 127 stdexec::__id<_Receiver>>>; 128 129 template <__decays_to<__t> _Self, receiver _Receiver> 130 requires sender_to<__copy_cvref_t<_Self, _Constrained>, _Receiver> connectexec::__scope::__when_empty_sender::__t131 [[nodiscard]] static auto connect(_Self&& __self, _Receiver __rcvr) // 132 -> __when_empty_op_t<_Self, _Receiver> 133 { 134 return __when_empty_op_t<_Self, _Receiver>{ 135 __self.__scope_, static_cast<_Self&&>(__self).__c_, 136 static_cast<_Receiver&&>(__rcvr)}; 137 } 138 139 template <__decays_to<__t> _Self, class... _Env> get_completion_signaturesexec::__scope::__when_empty_sender::__t140 static auto get_completion_signatures(_Self&&, _Env&&...) 141 -> __completion_signatures_of_t<__copy_cvref_t<_Self, _Constrained>, 142 __env_t<_Env>...> 143 { 144 return {}; 145 } 146 147 const __impl* __scope_; 148 STDEXEC_ATTRIBUTE((no_unique_address)) 149 _Constrained __c_; 150 }; 151 }; 152 153 template <class _Constrained> 154 using __when_empty_sender_t = 155 stdexec::__t<__when_empty_sender<__id<__decay_t<_Constrained>>>>; 156 157 //////////////////////////////////////////////////////////////////////////// 158 // async_scope::nest implementation 159 template <class _ReceiverId> 160 struct __nest_op_base : __immovable 161 { 162 using _Receiver = stdexec::__t<_ReceiverId>; 163 const __impl* __scope_; 164 STDEXEC_ATTRIBUTE((no_unique_address)) 165 _Receiver __rcvr_; 166 }; 167 168 template <class _ReceiverId> 169 struct __nest_rcvr 170 { 171 using _Receiver = stdexec::__t<_ReceiverId>; 172 173 struct __t 174 { 175 using __id = __nest_rcvr; 176 using receiver_concept = stdexec::receiver_t; 177 __nest_op_base<_ReceiverId>* __op_; 178 __completeexec::__scope::__nest_rcvr::__t179 static void __complete(const __impl* __scope) noexcept 180 { 181 std::unique_lock __guard{__scope->__lock_}; 182 auto& __active = __scope->__active_; 183 if (--__active == 0) 184 { 185 auto __local_waiters = std::move(__scope->__waiters_); 186 __guard.unlock(); 187 __scope = nullptr; 188 // do not access __scope 189 while (!__local_waiters.empty()) 190 { 191 auto* __next = __local_waiters.pop_front(); 192 __next->__notify_waiter(__next); 193 // __scope must be considered deleted 194 } 195 } 196 } 197 198 template <class... _As> 199 requires __callable<set_value_t, _Receiver, _As...> set_valueexec::__scope::__nest_rcvr::__t200 void set_value(_As&&... __as) noexcept 201 { 202 auto __scope = __op_->__scope_; 203 stdexec::set_value(std::move(__op_->__rcvr_), 204 static_cast<_As&&>(__as)...); 205 // do not access __op_ 206 // do not access this 207 __complete(__scope); 208 } 209 210 template <class _Error> 211 requires __callable<set_error_t, _Receiver, _Error> set_errorexec::__scope::__nest_rcvr::__t212 void set_error(_Error&& __err) noexcept 213 { 214 auto __scope = __op_->__scope_; 215 stdexec::set_error(std::move(__op_->__rcvr_), 216 static_cast<_Error&&>(__err)); 217 // do not access __op_ 218 // do not access this 219 __complete(__scope); 220 } 221 set_stoppedexec::__scope::__nest_rcvr::__t222 void set_stopped() noexcept 223 requires __callable<set_stopped_t, _Receiver> 224 { 225 auto __scope = __op_->__scope_; 226 stdexec::set_stopped(std::move(__op_->__rcvr_)); 227 // do not access __op_ 228 // do not access this 229 __complete(__scope); 230 } 231 get_envexec::__scope::__nest_rcvr::__t232 auto get_env() const noexcept -> __env_t<env_of_t<_Receiver>> 233 { 234 return make_env( 235 stdexec::get_env(__op_->__rcvr_), 236 stdexec::prop{get_stop_token, 237 __op_->__scope_->__stop_source_.get_token()}); 238 } 239 }; 240 }; 241 242 template <class _ConstrainedId, class _ReceiverId> 243 struct __nest_op 244 { 245 using _Constrained = stdexec::__t<_ConstrainedId>; 246 using _Receiver = stdexec::__t<_ReceiverId>; 247 248 struct __t : __nest_op_base<_ReceiverId> 249 { 250 using __id = __nest_op; 251 using __nest_rcvr_t = stdexec::__t<__nest_rcvr<_ReceiverId>>; 252 STDEXEC_IMMOVABLE_NO_UNIQUE_ADDRESS 253 connect_result_t<_Constrained, __nest_rcvr_t> __op_; 254 255 template <__decays_to<_Constrained> _Sender, 256 __decays_to<_Receiver> _Rcvr> __texec::__scope::__nest_op::__t257 explicit __t(const __impl* __scope, _Sender&& __c, _Rcvr&& __rcvr) : 258 __nest_op_base<_ReceiverId>{ 259 {}, __scope, static_cast<_Rcvr&&>(__rcvr)}, 260 __op_(stdexec::connect(static_cast<_Sender&&>(__c), 261 __nest_rcvr_t{this})) 262 {} 263 startexec::__scope::__nest_op::__t264 void start() & noexcept 265 { 266 STDEXEC_ASSERT(this->__scope_); 267 std::unique_lock __guard{this->__scope_->__lock_}; 268 auto& __active = this->__scope_->__active_; 269 ++__active; 270 __guard.unlock(); 271 stdexec::start(__op_); 272 } 273 }; 274 }; 275 276 template <class _ConstrainedId> 277 struct __nest_sender 278 { 279 using _Constrained = stdexec::__t<_ConstrainedId>; 280 281 struct __t 282 { 283 using __id = __nest_sender; 284 using sender_concept = stdexec::sender_t; 285 286 const __impl* __scope_; 287 STDEXEC_ATTRIBUTE((no_unique_address)) 288 _Constrained __c_; 289 290 template <class _Receiver> 291 using __nest_operation_t = 292 stdexec::__t<__nest_op<_ConstrainedId, stdexec::__id<_Receiver>>>; 293 294 template <class _Receiver> 295 using __nest_receiver_t = 296 stdexec::__t<__nest_rcvr<stdexec::__id<_Receiver>>>; 297 298 template <__decays_to<__t> _Self, receiver _Receiver> 299 requires sender_to<__copy_cvref_t<_Self, _Constrained>, 300 __nest_receiver_t<_Receiver>> connectexec::__scope::__nest_sender::__t301 [[nodiscard]] static auto connect(_Self&& __self, _Receiver __rcvr) 302 -> __nest_operation_t<_Receiver> 303 { 304 return __nest_operation_t<_Receiver>{ 305 __self.__scope_, static_cast<_Self&&>(__self).__c_, 306 static_cast<_Receiver&&>(__rcvr)}; 307 } 308 309 template <__decays_to<__t> _Self, class... _Env> get_completion_signaturesexec::__scope::__nest_sender::__t310 static auto get_completion_signatures(_Self&&, _Env&&...) 311 -> __completion_signatures_of_t<__copy_cvref_t<_Self, _Constrained>, 312 __env_t<_Env>...> 313 { 314 return {}; 315 } 316 }; 317 }; 318 319 template <class _Constrained> 320 using __nest_sender_t = 321 stdexec::__t<__nest_sender<__id<__decay_t<_Constrained>>>>; 322 323 //////////////////////////////////////////////////////////////////////////// 324 // async_scope::spawn_future implementation 325 enum class __future_step 326 { 327 __invalid = 0, 328 __created, 329 __future, 330 __no_future, 331 __deleted 332 }; 333 334 template <class _Sender, class _Env> 335 struct __future_state; 336 337 struct __forward_stopped 338 { 339 inplace_stop_source* __stop_source_; 340 operator ()exec::__scope::__forward_stopped341 void operator()() noexcept 342 { 343 __stop_source_->request_stop(); 344 } 345 }; 346 347 struct __subscription : __immovable 348 { 349 void (*__complete_)(__subscription*) noexcept = nullptr; 350 __completeexec::__scope::__subscription351 void __complete() noexcept 352 { 353 __complete_(this); 354 } 355 356 __subscription* __next_ = nullptr; 357 }; 358 359 template <class _SenderId, class _EnvId, class _ReceiverId> 360 struct __future_op 361 { 362 using _Sender = stdexec::__t<_SenderId>; 363 using _Env = stdexec::__t<_EnvId>; 364 using _Receiver = stdexec::__t<_ReceiverId>; 365 366 class __t : __subscription 367 { 368 using __forward_consumer = typename stop_token_of_t< 369 env_of_t<_Receiver>>::template callback_type<__forward_stopped>; 370 __complete_()371 void __complete_() noexcept 372 { 373 try 374 { 375 __forward_consumer_.reset(); 376 auto __state = std::move(__state_); 377 STDEXEC_ASSERT(__state != nullptr); 378 std::unique_lock __guard{__state->__mutex_}; 379 // either the future is still in use or it has passed ownership 380 // to __state->__no_future_ 381 if (__state->__no_future_.get() != nullptr || 382 __state->__step_ != __future_step::__future) 383 { 384 // invalid state - there is a code bug in the state machine 385 std::terminate(); 386 } 387 else if (get_stop_token(get_env(__rcvr_)).stop_requested()) 388 { 389 __guard.unlock(); 390 stdexec::set_stopped(static_cast<_Receiver&&>(__rcvr_)); 391 __guard.lock(); 392 } 393 else 394 { 395 std::visit( 396 [this, &__guard]<class _Tup>(_Tup& __tup) { 397 if constexpr (same_as<_Tup, std::monostate>) 398 { 399 std::terminate(); 400 } 401 else 402 { 403 std::apply( 404 [this, &__guard]<class... _As>( 405 auto tag, _As&... __as) { 406 __guard.unlock(); 407 tag(static_cast<_Receiver&&>(__rcvr_), 408 static_cast<_As&&>(__as)...); 409 __guard.lock(); 410 }, 411 __tup); 412 } 413 }, 414 __state->__data_); 415 } 416 } 417 catch (...) 418 { 419 stdexec::set_error(static_cast<_Receiver&&>(__rcvr_), 420 std::current_exception()); 421 } 422 } 423 424 STDEXEC_ATTRIBUTE((no_unique_address)) 425 _Receiver __rcvr_; 426 std::unique_ptr<__future_state<_Sender, _Env>> __state_; 427 STDEXEC_ATTRIBUTE((no_unique_address)) 428 stdexec::__optional<__forward_consumer> __forward_consumer_; 429 430 public: 431 using __id = __future_op; 432 ~__t()433 ~__t() noexcept 434 { 435 if (__state_ != nullptr) 436 { 437 auto __raw_state = __state_.get(); 438 std::unique_lock __guard{__raw_state->__mutex_}; 439 if (__raw_state->__data_.index() > 0) 440 { 441 // completed given sender 442 // state is no longer needed 443 return; 444 } 445 __raw_state->__no_future_ = std::move(__state_); 446 __raw_state->__step_from_to_(__guard, __future_step::__future, 447 __future_step::__no_future); 448 } 449 } 450 451 template <class _Receiver2> __t(_Receiver2 && __rcvr,std::unique_ptr<__future_state<_Sender,_Env>> __state)452 explicit __t(_Receiver2&& __rcvr, 453 std::unique_ptr<__future_state<_Sender, _Env>> __state) : 454 __subscription{{}, 455 [](__subscription* __self) noexcept -> void { 456 static_cast<__t*>(__self)->__complete_(); 457 }}, 458 __rcvr_(static_cast<_Receiver2&&>(__rcvr)), 459 __state_(std::move(__state)), 460 __forward_consumer_(std::in_place, get_stop_token(get_env(__rcvr_)), 461 __forward_stopped{&__state_->__stop_source_}) 462 {} 463 start()464 void start() & noexcept 465 { 466 try 467 { 468 if (!!__state_) 469 { 470 std::unique_lock __guard{__state_->__mutex_}; 471 if (__state_->__data_.index() != 0) 472 { 473 __guard.unlock(); 474 __complete_(); 475 } 476 else 477 { 478 __state_->__subscribers_.push_back(this); 479 } 480 } 481 } 482 catch (...) 483 { 484 stdexec::set_error(static_cast<_Receiver&&>(__rcvr_), 485 std::current_exception()); 486 } 487 } 488 }; 489 }; 490 491 #if STDEXEC_EDG() 492 template <class _Fn> 493 struct __completion_as_tuple2_; 494 495 template <class _Tag, class... _Ts> 496 struct __completion_as_tuple2_<_Tag(_Ts...)> 497 { 498 using __t = std::tuple<_Tag, _Ts...>; 499 }; 500 template <class _Fn> 501 using __completion_as_tuple_t = stdexec::__t<__completion_as_tuple2_<_Fn>>; 502 503 #else 504 505 template <class _Tag, class... _Ts> 506 auto __completion_as_tuple_(_Tag (*)(_Ts...)) -> std::tuple<_Tag, _Ts...>; 507 508 template <class _Fn> 509 using __completion_as_tuple_t = 510 decltype(__scope::__completion_as_tuple_(static_cast<_Fn*>(nullptr))); 511 #endif 512 513 template <class... _Ts> 514 using __decay_values_t = completion_signatures<set_value_t(__decay_t<_Ts>...)>; 515 516 template <class _Ty> 517 using __decay_error_t = completion_signatures<set_error_t(__decay_t<_Ty>)>; 518 519 template <class _Sender, class _Env> 520 using __future_completions_t = // 521 transform_completion_signatures_of< 522 _Sender, __env_t<_Env>, 523 completion_signatures<set_stopped_t(), set_error_t(std::exception_ptr)>, 524 __decay_values_t, __decay_error_t>; 525 526 template <class _Completions> 527 using __completions_as_variant = // 528 __mapply<__mtransform<__q<__completion_as_tuple_t>, 529 __mbind_front_q<std::variant, std::monostate>>, 530 _Completions>; 531 532 template <class _Ty> 533 struct __dynamic_delete 534 { __anonf62d8bc60202exec::__scope::__dynamic_delete535 __dynamic_delete() : __delete_([](_Ty* __p) { delete __p; }) {} 536 537 template <class _Uy> 538 requires convertible_to<_Uy*, _Ty*> __dynamic_deleteexec::__scope::__dynamic_delete539 __dynamic_delete(std::default_delete<_Uy>) : 540 __delete_([](_Ty* __p) { delete static_cast<_Uy*>(__p); }) 541 {} 542 543 template <class _Uy> 544 requires convertible_to<_Uy*, _Ty*> operator =exec::__scope::__dynamic_delete545 auto operator=(std::default_delete<_Uy> __d) -> __dynamic_delete& 546 { 547 __delete_ = __dynamic_delete{__d}.__delete_; 548 return *this; 549 } 550 operator ()exec::__scope::__dynamic_delete551 void operator()(_Ty* __p) 552 { 553 __delete_(__p); 554 } 555 556 void (*__delete_)(_Ty*); 557 }; 558 559 template <class _Completions, class _Env> 560 struct __future_state_base 561 { __future_state_baseexec::__scope::__future_state_base562 __future_state_base(_Env __env, const __impl* __scope) : 563 __forward_scope_{std::in_place, __scope->__stop_source_.get_token(), 564 __forward_stopped{&__stop_source_}}, 565 __env_(make_env( 566 static_cast<_Env&&>(__env), 567 stdexec::prop{get_stop_token, __scope->__stop_source_.get_token()})) 568 {} 569 ~__future_state_baseexec::__scope::__future_state_base570 ~__future_state_base() 571 { 572 std::unique_lock __guard{__mutex_}; 573 if (__step_ == __future_step::__created) 574 { 575 // exception during connect() will end up here 576 __step_from_to_(__guard, __future_step::__created, 577 __future_step::__deleted); 578 } 579 else if (__step_ != __future_step::__deleted) 580 { 581 // completing the given sender before the future is dropped will end 582 // here 583 __step_from_to_(__guard, __future_step::__future, 584 __future_step::__deleted); 585 } 586 } 587 __step_from_to_exec::__scope::__future_state_base588 void __step_from_to_(std::unique_lock<std::mutex>& __guard, 589 __future_step __from, __future_step __to) 590 { 591 STDEXEC_ASSERT(__guard.owns_lock()); 592 auto actual = std::exchange(__step_, __to); 593 STDEXEC_ASSERT(actual == __from); 594 } 595 596 inplace_stop_source __stop_source_; 597 stdexec::__optional<inplace_stop_callback<__forward_stopped>> 598 __forward_scope_; 599 std::mutex __mutex_; 600 __future_step __step_ = __future_step::__created; 601 std::unique_ptr<__future_state_base, __dynamic_delete<__future_state_base>> 602 __no_future_; 603 __completions_as_variant<_Completions> __data_; 604 __intrusive_queue<&__subscription::__next_> __subscribers_; 605 __env_t<_Env> __env_; 606 }; 607 608 template <class _Completions, class _EnvId> 609 struct __future_rcvr 610 { 611 using _Env = stdexec::__t<_EnvId>; 612 613 struct __t 614 { 615 using __id = __future_rcvr; 616 using receiver_concept = stdexec::receiver_t; 617 __future_state_base<_Completions, _Env>* __state_; 618 const __impl* __scope_; 619 __dispatch_result_exec::__scope::__future_rcvr::__t620 void __dispatch_result_(std::unique_lock<std::mutex>& __guard) noexcept 621 { 622 auto& __state = *__state_; 623 auto __local_subscribers = std::move(__state.__subscribers_); 624 __state.__forward_scope_.reset(); 625 if (__state.__no_future_.get() != nullptr) 626 { 627 // nobody is waiting for the results 628 // delete this and return 629 __state.__step_from_to_(__guard, __future_step::__no_future, 630 __future_step::__deleted); 631 __guard.unlock(); 632 __state.__no_future_.reset(); 633 return; 634 } 635 __guard.unlock(); 636 while (!__local_subscribers.empty()) 637 { 638 auto* __sub = __local_subscribers.pop_front(); 639 __sub->__complete(); 640 } 641 } 642 643 template <class _Tag, class... _As> __save_completionexec::__scope::__future_rcvr::__t644 void __save_completion(_Tag, _As&&... __as) noexcept 645 { 646 auto& __state = *__state_; 647 try 648 { 649 using _Tuple = __decayed_std_tuple<_Tag, _As...>; 650 __state.__data_.template emplace<_Tuple>( 651 _Tag(), static_cast<_As&&>(__as)...); 652 } 653 catch (...) 654 { 655 using _Tuple = std::tuple<set_error_t, std::exception_ptr>; 656 __state.__data_.template emplace<_Tuple>( 657 set_error_t(), std::current_exception()); 658 } 659 } 660 661 template <__movable_value... _As> set_valueexec::__scope::__future_rcvr::__t662 void set_value(_As&&... __as) noexcept 663 { 664 auto& __state = *__state_; 665 std::unique_lock __guard{__state.__mutex_}; 666 __save_completion(set_value_t(), static_cast<_As&&>(__as)...); 667 __dispatch_result_(__guard); 668 } 669 670 template <__movable_value _Error> set_errorexec::__scope::__future_rcvr::__t671 void set_error(_Error&& __err) noexcept 672 { 673 auto& __state = *__state_; 674 std::unique_lock __guard{__state.__mutex_}; 675 __save_completion(set_error_t(), static_cast<_Error&&>(__err)); 676 __dispatch_result_(__guard); 677 } 678 set_stoppedexec::__scope::__future_rcvr::__t679 void set_stopped() noexcept 680 { 681 auto& __state = *__state_; 682 std::unique_lock __guard{__state.__mutex_}; 683 __save_completion(set_stopped_t()); 684 __dispatch_result_(__guard); 685 } 686 get_envexec::__scope::__future_rcvr::__t687 auto get_env() const noexcept -> const __env_t<_Env>& 688 { 689 return __state_->__env_; 690 } 691 }; 692 }; 693 694 template <class _Sender, class _Env> 695 using __future_receiver_t = 696 __t<__future_rcvr<__future_completions_t<_Sender, _Env>, __id<_Env>>>; 697 698 template <class _Sender, class _Env> 699 struct __future_state : 700 __future_state_base<__future_completions_t<_Sender, _Env>, _Env> 701 { 702 using _Completions = __future_completions_t<_Sender, _Env>; 703 __future_stateexec::__scope::__future_state704 __future_state(_Sender __sndr, _Env __env, const __impl* __scope) : 705 __future_state_base<_Completions, _Env>(static_cast<_Env&&>(__env), 706 __scope), 707 __op_( 708 stdexec::connect(static_cast<_Sender&&>(__sndr), 709 __future_receiver_t<_Sender, _Env>{this, __scope})) 710 {} 711 712 connect_result_t<_Sender, __future_receiver_t<_Sender, _Env>> __op_; 713 }; 714 715 template <class _SenderId, class _EnvId> 716 struct __future 717 { 718 using _Sender = stdexec::__t<_SenderId>; 719 using _Env = stdexec::__t<_EnvId>; 720 721 class __t 722 { 723 template <class _Self> 724 using __completions_t = 725 __future_completions_t<__mfront<_Sender, _Self>, _Env>; 726 727 template <class _Receiver> 728 using __future_op_t = stdexec::__t< 729 __future_op<_SenderId, _EnvId, stdexec::__id<_Receiver>>>; 730 731 public: 732 using __id = __future; 733 using sender_concept = stdexec::sender_t; 734 735 __t(__t&&) = default; 736 auto operator=(__t&&) -> __t& = default; 737 ~__t()738 ~__t() noexcept 739 { 740 if (__state_ != nullptr) 741 { 742 auto __raw_state = __state_.get(); 743 std::unique_lock __guard{__raw_state->__mutex_}; 744 if (__raw_state->__data_.index() != 0) 745 { 746 // completed given sender 747 // state is no longer needed 748 return; 749 } 750 __raw_state->__no_future_ = std::move(__state_); 751 __raw_state->__step_from_to_(__guard, __future_step::__future, 752 __future_step::__no_future); 753 } 754 } 755 756 template <__decays_to<__t> _Self, receiver _Receiver> 757 requires receiver_of<_Receiver, __completions_t<_Self>> connect(_Self && __self,_Receiver __rcvr)758 static auto connect(_Self&& __self, _Receiver __rcvr) 759 -> __future_op_t<_Receiver> 760 { 761 return __future_op_t<_Receiver>{ 762 static_cast<_Receiver&&>(__rcvr), 763 static_cast<_Self&&>(__self).__state_}; 764 } 765 766 template <__decays_to<__t> _Self, class... _OtherEnv> get_completion_signatures(_Self &&,_OtherEnv &&...)767 static auto get_completion_signatures(_Self&&, _OtherEnv&&...) 768 -> __completions_t<_Self> 769 { 770 return {}; 771 } 772 773 private: 774 friend struct async_scope; 775 __t(std::unique_ptr<__future_state<_Sender,_Env>> __state)776 explicit __t( 777 std::unique_ptr<__future_state<_Sender, _Env>> __state) noexcept : 778 __state_(std::move(__state)) 779 { 780 std::unique_lock __guard{__state_->__mutex_}; 781 __state_->__step_from_to_(__guard, __future_step::__created, 782 __future_step::__future); 783 } 784 785 std::unique_ptr<__future_state<_Sender, _Env>> __state_; 786 }; 787 }; 788 789 template <class _Sender, class _Env> 790 using __future_t = stdexec::__t< 791 __future<__id<__nest_sender_t<_Sender>>, __id<__decay_t<_Env>>>>; 792 793 //////////////////////////////////////////////////////////////////////////// 794 // async_scope::spawn implementation 795 struct __spawn_env_ 796 { 797 inplace_stop_token __token_; 798 queryexec::__scope::__spawn_env_799 auto query(get_stop_token_t) const noexcept -> inplace_stop_token 800 { 801 return __token_; 802 } 803 queryexec::__scope::__spawn_env_804 auto query(get_scheduler_t) const noexcept -> __inln::__scheduler 805 { 806 return {}; 807 } 808 }; 809 810 template <class _Env> 811 using __spawn_env_t = __env::__join_t<_Env, __spawn_env_>; 812 813 template <class _EnvId> 814 struct __spawn_op_base 815 { 816 using _Env = stdexec::__t<_EnvId>; 817 __spawn_env_t<_Env> __env_; 818 void (*__delete_)(__spawn_op_base*); 819 }; 820 821 template <class _EnvId> 822 struct __spawn_rcvr 823 { 824 using _Env = stdexec::__t<_EnvId>; 825 826 struct __t 827 { 828 using __id = __spawn_rcvr; 829 using receiver_concept = stdexec::receiver_t; 830 __spawn_op_base<_EnvId>* __op_; 831 set_valueexec::__scope::__spawn_rcvr::__t832 void set_value() noexcept 833 { 834 __op_->__delete_(__op_); 835 } 836 837 // BUGBUG NOT TO SPEC spawn shouldn't accept senders that can fail. set_errorexec::__scope::__spawn_rcvr::__t838 [[noreturn]] void set_error(std::exception_ptr __eptr) noexcept 839 { 840 std::rethrow_exception(std::move(__eptr)); 841 } 842 set_stoppedexec::__scope::__spawn_rcvr::__t843 void set_stopped() noexcept 844 { 845 __op_->__delete_(__op_); 846 } 847 get_envexec::__scope::__spawn_rcvr::__t848 auto get_env() const noexcept -> const __spawn_env_t<_Env>& 849 { 850 return __op_->__env_; 851 } 852 }; 853 }; 854 855 template <class _Env> 856 using __spawn_receiver_t = stdexec::__t<__spawn_rcvr<__id<_Env>>>; 857 858 template <class _SenderId, class _EnvId> 859 struct __spawn_op 860 { 861 using _Env = stdexec::__t<_EnvId>; 862 using _Sender = stdexec::__t<_SenderId>; 863 864 struct __t : __spawn_op_base<_EnvId> 865 { 866 template <__decays_to<_Sender> _Sndr> __texec::__scope::__spawn_op::__t867 __t(_Sndr&& __sndr, _Env __env, const __impl* __scope) : 868 __spawn_op_base<_EnvId>{ 869 __env::__join( 870 static_cast<_Env&&>(__env), 871 __spawn_env_{__scope->__stop_source_.get_token()}), 872 [](__spawn_op_base<_EnvId>* __op) { 873 delete static_cast<__t*>(__op); 874 }}, 875 __op_(stdexec::connect(static_cast<_Sndr&&>(__sndr), 876 __spawn_receiver_t<_Env>{this})) 877 {} 878 startexec::__scope::__spawn_op::__t879 void start() & noexcept 880 { 881 stdexec::start(__op_); 882 } 883 884 connect_result_t<_Sender, __spawn_receiver_t<_Env>> __op_; 885 }; 886 }; 887 888 template <class _Sender, class _Env> 889 using __spawn_operation_t = stdexec::__t<__spawn_op<__id<_Sender>, __id<_Env>>>; 890 891 //////////////////////////////////////////////////////////////////////////// 892 // async_scope 893 struct async_scope : __immovable 894 { 895 async_scope() = default; 896 897 template <sender _Constrained> when_emptyexec::__scope::async_scope898 [[nodiscard]] auto when_empty(_Constrained&& __c) const 899 -> __when_empty_sender_t<_Constrained> 900 { 901 return __when_empty_sender_t<_Constrained>{ 902 &__impl_, static_cast<_Constrained&&>(__c)}; 903 } 904 on_emptyexec::__scope::async_scope905 [[nodiscard]] auto on_empty() const 906 { 907 return when_empty(just()); 908 } 909 910 template <sender _Constrained> 911 using nest_result_t = __nest_sender_t<_Constrained>; 912 913 template <sender _Constrained> nestexec::__scope::async_scope914 [[nodiscard]] auto nest(_Constrained&& __c) -> nest_result_t<_Constrained> 915 { 916 return nest_result_t<_Constrained>{&__impl_, 917 static_cast<_Constrained&&>(__c)}; 918 } 919 920 template <__movable_value _Env = empty_env, 921 sender_in<__spawn_env_t<_Env>> _Sender> 922 requires sender_to<nest_result_t<_Sender>, __spawn_receiver_t<_Env>> spawnexec::__scope::async_scope923 void spawn(_Sender&& __sndr, _Env __env = {}) 924 { 925 using __op_t = __spawn_operation_t<nest_result_t<_Sender>, _Env>; 926 // start is noexcept so we can assume that the operation will complete 927 // after this, which means we can rely on its self-ownership to ensure 928 // that it is eventually deleted 929 stdexec::start(*(new __op_t{nest(static_cast<_Sender&&>(__sndr)), 930 static_cast<_Env&&>(__env), &__impl_})); 931 } 932 933 template <__movable_value _Env = empty_env, 934 sender_in<__env_t<_Env>> _Sender> spawn_futureexec::__scope::async_scope935 auto spawn_future(_Sender&& __sndr, _Env __env = {}) 936 -> __future_t<_Sender, _Env> 937 { 938 using __state_t = __future_state<nest_result_t<_Sender>, _Env>; 939 auto __state = 940 std::make_unique<__state_t>(nest(static_cast<_Sender&&>(__sndr)), 941 static_cast<_Env&&>(__env), &__impl_); 942 stdexec::start(__state->__op_); 943 return __future_t<_Sender, _Env>{std::move(__state)}; 944 } 945 get_stop_sourceexec::__scope::async_scope946 auto get_stop_source() noexcept -> inplace_stop_source& 947 { 948 return __impl_.__stop_source_; 949 } 950 get_stop_tokenexec::__scope::async_scope951 auto get_stop_token() const noexcept -> inplace_stop_token 952 { 953 return __impl_.__stop_source_.get_token(); 954 } 955 request_stopexec::__scope::async_scope956 auto request_stop() noexcept -> bool 957 { 958 return __impl_.__stop_source_.request_stop(); 959 } 960 961 private: 962 __impl __impl_; 963 }; 964 } // namespace __scope 965 966 using __scope::async_scope; 967 968 template <class _AsyncScope, class _Sender> 969 using nest_result_t = decltype(stdexec::__declval<_AsyncScope&>().nest( 970 stdexec::__declval<_Sender&&>())); 971 } // namespace exec 972