1 /*
2  * Copyright (c) 2021-2022 NVIDIA Corporation
3  *
4  * Licensed under the Apache License Version 2.0 with LLVM Exceptions
5  * (the "License"); you may not use this file except in compliance with
6  * the License. You may obtain a copy of the License at
7  *
8  *   https://llvm.org/LICENSE.txt
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
18 #include "../stdexec/__detail/__intrusive_queue.hpp"
19 #include "../stdexec/execution.hpp"
20 #include "../stdexec/stop_token.hpp"
21 #include "env.hpp"
22 
23 #include <mutex>
24 
25 namespace exec
26 {
27 /////////////////////////////////////////////////////////////////////////////
28 // async_scope
29 namespace __scope
30 {
31 using namespace stdexec;
32 
33 struct __impl;
34 struct async_scope;
35 
36 struct __task : __immovable
37 {
38     const __impl* __scope_;
39     void (*__notify_waiter)(__task*) noexcept;
40     __task* __next_ = nullptr;
41 };
42 
43 template <class _BaseEnv>
44 using __env_t =
45     make_env_t<_BaseEnv, with_t<get_stop_token_t, in_place_stop_token>>;
46 
47 struct __impl
48 {
49     in_place_stop_source __stop_source_{};
50     mutable std::mutex __lock_{};
51     mutable std::ptrdiff_t __active_ = 0;
52     mutable __intrusive_queue<&__task::__next_> __waiters_{};
53 
~__implexec::__scope::__impl54     ~__impl()
55     {
56         std::unique_lock __guard{__lock_};
57         STDEXEC_ASSERT(__active_ == 0);
58         STDEXEC_ASSERT(__waiters_.empty());
59     }
60 };
61 
62 ////////////////////////////////////////////////////////////////////////////
63 // async_scope::when_empty implementation
64 template <class _ConstrainedId, class _ReceiverId>
65 struct __when_empty_op
66 {
67     using _Constrained = __cvref_t<_ConstrainedId>;
68     using _Receiver = stdexec::__t<_ReceiverId>;
69 
70     struct __t : __task
71     {
72         using __id = __when_empty_op;
73 
__texec::__scope::__when_empty_op::__t74         explicit __t(const __impl* __scope, _Constrained&& __sndr,
75                      _Receiver __rcvr) :
76             __task{{}, __scope, __notify_waiter},
77             __op_(stdexec::connect(static_cast<_Constrained&&>(__sndr),
78                                    static_cast<_Receiver&&>(__rcvr)))
79         {}
80 
81       private:
__notify_waiterexec::__scope::__when_empty_op::__t82         static void __notify_waiter(__task* __self) noexcept
83         {
84             start(static_cast<__t*>(__self)->__op_);
85         }
86 
__start_exec::__scope::__when_empty_op::__t87         void __start_() noexcept
88         {
89             std::unique_lock __guard{this->__scope_->__lock_};
90             auto& __active = this->__scope_->__active_;
91             auto& __waiters = this->__scope_->__waiters_;
92             if (__active != 0)
93             {
94                 __waiters.push_back(this);
95                 return;
96             }
97             __guard.unlock();
98             start(this->__op_);
99         }
100 
tag_invokeexec::__scope::__when_empty_op101         friend void tag_invoke(start_t, __t& __self) noexcept
102         {
103             return __self.__start_();
104         }
105 
106         STDEXEC_IMMOVABLE_NO_UNIQUE_ADDRESS
107         connect_result_t<_Constrained, _Receiver> __op_;
108     };
109 };
110 
111 template <class _ConstrainedId>
112 struct __when_empty_sender
113 {
114     using _Constrained = stdexec::__t<_ConstrainedId>;
115 
116     struct __t
117     {
118         using __id = __when_empty_sender;
119         using sender_concept = stdexec::sender_t;
120 
121         template <class _Self, class _Receiver>
122         using __when_empty_op_t =
123             stdexec::__t<__when_empty_op<__cvref_id<_Self, _Constrained>,
124                                          stdexec::__id<_Receiver>>>;
125 
126         template <__decays_to<__t> _Self, receiver _Receiver>
127             requires sender_to<__copy_cvref_t<_Self, _Constrained>, _Receiver>
tag_invokeexec::__scope::__when_empty_sender128         [[nodiscard]] friend auto tag_invoke(connect_t, _Self&& __self,
129                                              _Receiver __rcvr)
130             -> __when_empty_op_t<_Self, _Receiver>
131         {
132             return __when_empty_op_t<_Self, _Receiver>{
133                 __self.__scope_, static_cast<_Self&&>(__self).__c_,
134                 static_cast<_Receiver&&>(__rcvr)};
135         }
136 
137         template <__decays_to<__t> _Self, class _Env>
tag_invokeexec::__scope::__when_empty_sender138         friend auto tag_invoke(get_completion_signatures_t, _Self&&, _Env&&)
139             -> completion_signatures_of_t<__copy_cvref_t<_Self, _Constrained>,
140                                           __env_t<_Env>>
141         {
142             return {};
143         }
144 
tag_invokeexec::__scope::__when_empty_sender145         friend auto tag_invoke(get_env_t, const __t&) noexcept -> empty_env
146         {
147             return {};
148         }
149 
150         const __impl* __scope_;
151         STDEXEC_ATTRIBUTE((no_unique_address))
152         _Constrained __c_;
153     };
154 };
155 
156 template <class _Constrained>
157 using __when_empty_sender_t =
158     stdexec::__t<__when_empty_sender<__id<__decay_t<_Constrained>>>>;
159 
160 ////////////////////////////////////////////////////////////////////////////
161 // async_scope::nest implementation
162 template <class _ReceiverId>
163 struct __nest_op_base : __immovable
164 {
165     using _Receiver = stdexec::__t<_ReceiverId>;
166     const __impl* __scope_;
167     STDEXEC_ATTRIBUTE((no_unique_address))
168     _Receiver __rcvr_;
169 };
170 
171 template <class _ReceiverId>
172 struct __nest_rcvr
173 {
174     using _Receiver = stdexec::__t<_ReceiverId>;
175 
176     struct __t
177     {
178         using __id = __nest_rcvr;
179         using receiver_concept = stdexec::receiver_t;
180         __nest_op_base<_ReceiverId>* __op_;
181 
__completeexec::__scope::__nest_rcvr::__t182         static void __complete(const __impl* __scope) noexcept
183         {
184             std::unique_lock __guard{__scope->__lock_};
185             auto& __active = __scope->__active_;
186             if (--__active == 0)
187             {
188                 auto __local = std::move(__scope->__waiters_);
189                 __guard.unlock();
190                 __scope = nullptr;
191                 // do not access __scope
192                 while (!__local.empty())
193                 {
194                     auto* __next = __local.pop_front();
195                     __next->__notify_waiter(__next);
196                     // __scope must be considered deleted
197                 }
198             }
199         }
200 
201         template <__completion_tag _Tag, class... _As>
202             requires __callable<_Tag, _Receiver, _As...>
tag_invokeexec::__scope::__nest_rcvr203         friend void tag_invoke(_Tag, __t&& __self, _As&&... __as) noexcept
204         {
205             auto __scope = __self.__op_->__scope_;
206             _Tag{}(std::move(__self.__op_->__rcvr_),
207                    static_cast<_As&&>(__as)...);
208             // do not access __op_
209             // do not access this
210             __complete(__scope);
211         }
212 
tag_invokeexec::__scope::__nest_rcvr213         friend auto tag_invoke(get_env_t, const __t& __self) noexcept
214             -> __env_t<env_of_t<_Receiver>>
215         {
216             return make_env(
217                 get_env(__self.__op_->__rcvr_),
218                 with(get_stop_token,
219                      __self.__op_->__scope_->__stop_source_.get_token()));
220         }
221     };
222 };
223 
224 template <class _ConstrainedId, class _ReceiverId>
225 struct __nest_op
226 {
227     using _Constrained = stdexec::__t<_ConstrainedId>;
228     using _Receiver = stdexec::__t<_ReceiverId>;
229 
230     struct __t : __nest_op_base<_ReceiverId>
231     {
232         using __id = __nest_op;
233         using __nest_rcvr_t = stdexec::__t<__nest_rcvr<_ReceiverId>>;
234         STDEXEC_IMMOVABLE_NO_UNIQUE_ADDRESS
235         connect_result_t<_Constrained, __nest_rcvr_t> __op_;
236 
237         template <__decays_to<_Constrained> _Sender,
238                   __decays_to<_Receiver> _Rcvr>
__texec::__scope::__nest_op::__t239         explicit __t(const __impl* __scope, _Sender&& __c, _Rcvr&& __rcvr) :
240             __nest_op_base<_ReceiverId>{
241                 {}, __scope, static_cast<_Rcvr&&>(__rcvr)},
242             __op_(stdexec::connect(static_cast<_Sender&&>(__c),
243                                    __nest_rcvr_t{this}))
244         {}
245 
246       private:
__start_exec::__scope::__nest_op::__t247         void __start_() noexcept
248         {
249             STDEXEC_ASSERT(this->__scope_);
250             std::unique_lock __guard{this->__scope_->__lock_};
251             auto& __active = this->__scope_->__active_;
252             ++__active;
253             __guard.unlock();
254             start(__op_);
255         }
256 
tag_invokeexec::__scope::__nest_op257         friend void tag_invoke(start_t, __t& __self) noexcept
258         {
259             return __self.__start_();
260         }
261     };
262 };
263 
264 template <class _ConstrainedId>
265 struct __nest_sender
266 {
267     using _Constrained = stdexec::__t<_ConstrainedId>;
268 
269     struct __t
270     {
271         using __id = __nest_sender;
272         using sender_concept = stdexec::sender_t;
273 
274         const __impl* __scope_;
275         STDEXEC_ATTRIBUTE((no_unique_address))
276         _Constrained __c_;
277 
278         template <class _Receiver>
279         using __nest_operation_t =
280             stdexec::__t<__nest_op<_ConstrainedId, stdexec::__id<_Receiver>>>;
281         template <class _Receiver>
282         using __nest_receiver_t =
283             stdexec::__t<__nest_rcvr<stdexec::__id<_Receiver>>>;
284 
285         template <__decays_to<__t> _Self, receiver _Receiver>
286             requires sender_to<__copy_cvref_t<_Self, _Constrained>,
287                                __nest_receiver_t<_Receiver>>
tag_invokeexec::__scope::__nest_sender288         [[nodiscard]] friend auto tag_invoke(connect_t, _Self&& __self,
289                                              _Receiver __rcvr)
290             -> __nest_operation_t<_Receiver>
291         {
292             return __nest_operation_t<_Receiver>{
293                 __self.__scope_, static_cast<_Self&&>(__self).__c_,
294                 static_cast<_Receiver&&>(__rcvr)};
295         }
296 
297         template <__decays_to<__t> _Self, class _Env>
tag_invokeexec::__scope::__nest_sender298         friend auto tag_invoke(get_completion_signatures_t, _Self&&, _Env&&)
299             -> completion_signatures_of_t<__copy_cvref_t<_Self, _Constrained>,
300                                           __env_t<_Env>>
301         {
302             return {};
303         }
304 
tag_invokeexec::__scope::__nest_sender305         friend auto tag_invoke(get_env_t, const __t&) noexcept -> empty_env
306         {
307             return {};
308         }
309     };
310 };
311 
312 template <class _Constrained>
313 using __nest_sender_t =
314     stdexec::__t<__nest_sender<__id<__decay_t<_Constrained>>>>;
315 
316 ////////////////////////////////////////////////////////////////////////////
317 // async_scope::spawn_future implementation
318 enum class __future_step
319 {
320     __invalid = 0,
321     __created,
322     __future,
323     __no_future,
324     __deleted
325 };
326 
327 template <class _Sender, class _Env>
328 struct __future_state;
329 
330 struct __forward_stopped
331 {
332     in_place_stop_source* __stop_source_;
333 
operator ()exec::__scope::__forward_stopped334     void operator()() noexcept
335     {
336         __stop_source_->request_stop();
337     }
338 };
339 
340 struct __subscription : __immovable
341 {
342     void (*__complete_)(__subscription*) noexcept = nullptr;
343 
__completeexec::__scope::__subscription344     void __complete() noexcept
345     {
346         __complete_(this);
347     }
348 
349     __subscription* __next_ = nullptr;
350 };
351 
352 template <class _SenderId, class _EnvId, class _ReceiverId>
353 struct __future_op
354 {
355     using _Sender = stdexec::__t<_SenderId>;
356     using _Env = stdexec::__t<_EnvId>;
357     using _Receiver = stdexec::__t<_ReceiverId>;
358 
359     class __t : __subscription
360     {
361         using __forward_consumer = typename stop_token_of_t<
362             env_of_t<_Receiver>>::template callback_type<__forward_stopped>;
363 
tag_invokeexec::__scope::__future_op364         friend void tag_invoke(start_t, __t& __self) noexcept
365         {
366             __self.__start_();
367         }
368 
__complete_()369         void __complete_() noexcept
370         {
371             try
372             {
373                 auto __state = std::move(__state_);
374                 STDEXEC_ASSERT(__state != nullptr);
375                 std::unique_lock __guard{__state->__mutex_};
376                 // either the future is still in use or it has passed ownership
377                 // to __state->__no_future_
378                 if (__state->__no_future_.get() != nullptr ||
379                     __state->__step_ != __future_step::__future)
380                 {
381                     // invalid state - there is a code bug in the state machine
382                     std::terminate();
383                 }
384                 else if (get_stop_token(get_env(__rcvr_)).stop_requested())
385                 {
386                     __guard.unlock();
387                     set_stopped(static_cast<_Receiver&&>(__rcvr_));
388                     __guard.lock();
389                 }
390                 else
391                 {
392                     std::visit(
393                         [this, &__guard]<class _Tup>(_Tup& __tup) {
394                         if constexpr (same_as<_Tup, std::monostate>)
395                         {
396                             std::terminate();
397                         }
398                         else
399                         {
400                             std::apply(
401                                 [this, &__guard]<class... _As>(auto tag,
402                                                                _As&... __as) {
403                                 __guard.unlock();
404                                 tag(static_cast<_Receiver&&>(__rcvr_),
405                                     static_cast<_As&&>(__as)...);
406                                 __guard.lock();
407                             },
408                                 __tup);
409                         }
410                     },
411                         __state->__data_);
412                 }
413             }
414             catch (...)
415             {
416                 set_error(static_cast<_Receiver&&>(__rcvr_),
417                           std::current_exception());
418             }
419         }
420 
__start_()421         void __start_() noexcept
422         {
423             try
424             {
425                 if (!!__state_)
426                 {
427                     std::unique_lock __guard{__state_->__mutex_};
428                     if (__state_->__data_.index() != 0)
429                     {
430                         __guard.unlock();
431                         __complete_();
432                     }
433                     else
434                     {
435                         __state_->__subscribers_.push_back(this);
436                     }
437                 }
438             }
439             catch (...)
440             {
441                 set_error(static_cast<_Receiver&&>(__rcvr_),
442                           std::current_exception());
443             }
444         }
445 
446         STDEXEC_ATTRIBUTE((no_unique_address))
447         _Receiver __rcvr_;
448         std::unique_ptr<__future_state<_Sender, _Env>> __state_;
449         STDEXEC_ATTRIBUTE((no_unique_address))
450         __forward_consumer __forward_consumer_;
451 
452       public:
453         using __id = __future_op;
454 
~__t()455         ~__t() noexcept
456         {
457             if (__state_ != nullptr)
458             {
459                 auto __raw_state = __state_.get();
460                 std::unique_lock __guard{__raw_state->__mutex_};
461                 if (__raw_state->__data_.index() > 0)
462                 {
463                     // completed given sender
464                     // state is no longer needed
465                     return;
466                 }
467                 __raw_state->__no_future_ = std::move(__state_);
468                 __raw_state->__step_from_to_(__guard, __future_step::__future,
469                                              __future_step::__no_future);
470             }
471         }
472 
473         template <class _Receiver2>
__t(_Receiver2 && __rcvr,std::unique_ptr<__future_state<_Sender,_Env>> __state)474         explicit __t(_Receiver2&& __rcvr,
475                      std::unique_ptr<__future_state<_Sender, _Env>> __state) :
476             __subscription{{},
477                            [](__subscription* __self) noexcept -> void {
478             static_cast<__t*>(__self)->__complete_();
479         }},
480             __rcvr_(static_cast<_Receiver2&&>(__rcvr)),
481             __state_(std::move(__state)),
482             __forward_consumer_(get_stop_token(get_env(__rcvr_)),
483                                 __forward_stopped{&__state_->__stop_source_})
484         {}
485     };
486 };
487 
488 #if STDEXEC_NVHPC()
489 template <class _Fn>
490 struct __completion_as_tuple2_;
491 
492 template <class _Tag, class... _Ts>
493 struct __completion_as_tuple2_<_Tag(_Ts&&...)>
494 {
495     using __t = std::tuple<_Tag, _Ts...>;
496 };
497 template <class _Fn>
498 using __completion_as_tuple_t = stdexec::__t<__completion_as_tuple2_<_Fn>>;
499 
500 #else
501 
502 template <class _Tag, class... _Ts>
503 auto __completion_as_tuple_(_Tag (*)(_Ts&&...)) -> std::tuple<_Tag, _Ts...>;
504 template <class _Fn>
505 using __completion_as_tuple_t =
506     decltype(__scope::__completion_as_tuple_(static_cast<_Fn*>(nullptr)));
507 #endif
508 
509 template <class... _Ts>
510 using __decay_values_t =
511     completion_signatures<set_value_t(__decay_t<_Ts>&&...)>;
512 
513 template <class _Ty>
514 using __decay_error_t = completion_signatures<set_error_t(__decay_t<_Ty>&&)>;
515 
516 template <class _Sender, class _Env>
517 using __future_completions_t = //
518     make_completion_signatures<
519         _Sender, __env_t<_Env>,
520         completion_signatures<set_stopped_t(),
521                               set_error_t(std::exception_ptr&&)>,
522         __decay_values_t, __decay_error_t>;
523 
524 template <class _Completions>
525 using __completions_as_variant = //
526     __mapply<__transform<__q<__completion_as_tuple_t>,
527                          __mbind_front_q<std::variant, std::monostate>>,
528              _Completions>;
529 
530 template <class _Ty>
531 struct __dynamic_delete
532 {
__anonf62d8bc60202exec::__scope::__dynamic_delete533     __dynamic_delete() : __delete_([](_Ty* __p) { delete __p; }) {}
534 
535     template <class _Uy>
536         requires convertible_to<_Uy*, _Ty*>
__dynamic_deleteexec::__scope::__dynamic_delete537     __dynamic_delete(std::default_delete<_Uy>) :
538         __delete_([](_Ty* __p) { delete static_cast<_Uy*>(__p); })
539     {}
540 
541     template <class _Uy>
542         requires convertible_to<_Uy*, _Ty*>
operator =exec::__scope::__dynamic_delete543     auto operator=(std::default_delete<_Uy> __d) -> __dynamic_delete&
544     {
545         __delete_ = __dynamic_delete{__d}.__delete_;
546         return *this;
547     }
548 
operator ()exec::__scope::__dynamic_delete549     void operator()(_Ty* __p)
550     {
551         __delete_(__p);
552     }
553 
554     void (*__delete_)(_Ty*);
555 };
556 
557 template <class _Completions, class _Env>
558 struct __future_state_base
559 {
__future_state_baseexec::__scope::__future_state_base560     __future_state_base(_Env __env, const __impl* __scope) :
561         __forward_scope_{std::in_place, __scope->__stop_source_.get_token(),
562                          __forward_stopped{&__stop_source_}},
563         __env_(
564             make_env(static_cast<_Env&&>(__env),
565                      with(get_stop_token, __scope->__stop_source_.get_token())))
566     {}
567 
~__future_state_baseexec::__scope::__future_state_base568     ~__future_state_base()
569     {
570         std::unique_lock __guard{__mutex_};
571         if (__step_ == __future_step::__created)
572         {
573             // exception during connect() will end up here
574             __step_from_to_(__guard, __future_step::__created,
575                             __future_step::__deleted);
576         }
577         else if (__step_ != __future_step::__deleted)
578         {
579             // completing the given sender before the future is dropped will end
580             // here
581             __step_from_to_(__guard, __future_step::__future,
582                             __future_step::__deleted);
583         }
584     }
585 
__step_from_to_exec::__scope::__future_state_base586     void __step_from_to_(std::unique_lock<std::mutex>& __guard,
587                          __future_step __from, __future_step __to)
588     {
589         STDEXEC_ASSERT(__guard.owns_lock());
590         auto actual = std::exchange(__step_, __to);
591         STDEXEC_ASSERT(actual == __from);
592     }
593 
594     in_place_stop_source __stop_source_;
595     std::optional<in_place_stop_callback<__forward_stopped>> __forward_scope_;
596     std::mutex __mutex_;
597     __future_step __step_ = __future_step::__created;
598     std::unique_ptr<__future_state_base, __dynamic_delete<__future_state_base>>
599         __no_future_;
600     __completions_as_variant<_Completions> __data_;
601     __intrusive_queue<&__subscription::__next_> __subscribers_;
602     __env_t<_Env> __env_;
603 };
604 
605 template <class _Completions, class _EnvId>
606 struct __future_rcvr
607 {
608     using _Env = stdexec::__t<_EnvId>;
609 
610     struct __t
611     {
612         using __id = __future_rcvr;
613         using receiver_concept = stdexec::receiver_t;
614         __future_state_base<_Completions, _Env>* __state_;
615         const __impl* __scope_;
616 
__dispatch_result_exec::__scope::__future_rcvr::__t617         void __dispatch_result_() noexcept
618         {
619             auto& __state = *__state_;
620             std::unique_lock __guard{__state.__mutex_};
621             auto __local = std::move(__state.__subscribers_);
622             __state.__forward_scope_ = std::nullopt;
623             if (__state.__no_future_.get() != nullptr)
624             {
625                 // nobody is waiting for the results
626                 // delete this and return
627                 __state.__step_from_to_(__guard, __future_step::__no_future,
628                                         __future_step::__deleted);
629                 __guard.unlock();
630                 __state.__no_future_.reset();
631                 return;
632             }
633             __guard.unlock();
634             while (!__local.empty())
635             {
636                 auto* __sub = __local.pop_front();
637                 __sub->__complete();
638             }
639         }
640 
641         template <__completion_tag _Tag, __movable_value... _As>
tag_invokeexec::__scope::__future_rcvr642         friend void tag_invoke(_Tag, __t&& __self, _As&&... __as) noexcept
643         {
644             auto& __state = *__self.__state_;
645             try
646             {
647                 std::unique_lock __guard{__state.__mutex_};
648                 using _Tuple = __decayed_tuple<_Tag, _As...>;
649                 __state.__data_.template emplace<_Tuple>(
650                     _Tag{}, static_cast<_As&&>(__as)...);
651                 __guard.unlock();
652                 __self.__dispatch_result_();
653             }
654             catch (...)
655             {
656                 using _Tuple = std::tuple<set_error_t, std::exception_ptr>;
657                 __state.__data_.template emplace<_Tuple>(
658                     set_error_t{}, std::current_exception());
659             }
660         }
661 
tag_invokeexec::__scope::__future_rcvr662         friend auto tag_invoke(get_env_t, const __t& __self) noexcept
663             -> const __env_t<_Env>&
664         {
665             return __self.__state_->__env_;
666         }
667     };
668 };
669 
670 template <class _Sender, class _Env>
671 using __future_receiver_t =
672     __t<__future_rcvr<__future_completions_t<_Sender, _Env>, __id<_Env>>>;
673 
674 template <class _Sender, class _Env>
675 struct __future_state :
676     __future_state_base<__future_completions_t<_Sender, _Env>, _Env>
677 {
678     using _Completions = __future_completions_t<_Sender, _Env>;
679 
__future_stateexec::__scope::__future_state680     __future_state(_Sender __sndr, _Env __env, const __impl* __scope) :
681         __future_state_base<_Completions, _Env>(static_cast<_Env&&>(__env),
682                                                 __scope),
683         __op_(
684             stdexec::connect(static_cast<_Sender&&>(__sndr),
685                              __future_receiver_t<_Sender, _Env>{this, __scope}))
686     {}
687 
688     connect_result_t<_Sender, __future_receiver_t<_Sender, _Env>> __op_;
689 };
690 
691 template <class _SenderId, class _EnvId>
692 struct __future
693 {
694     using _Sender = stdexec::__t<_SenderId>;
695     using _Env = stdexec::__t<_EnvId>;
696 
697     struct __t
698     {
699         using __id = __future;
700         using sender_concept = stdexec::sender_t;
701 
702         __t(__t&&) = default;
703         auto operator=(__t&&) -> __t& = default;
704 
~__texec::__scope::__future::__t705         ~__t() noexcept
706         {
707             if (__state_ != nullptr)
708             {
709                 auto __raw_state = __state_.get();
710                 std::unique_lock __guard{__raw_state->__mutex_};
711                 if (__raw_state->__data_.index() != 0)
712                 {
713                     // completed given sender
714                     // state is no longer needed
715                     return;
716                 }
717                 __raw_state->__no_future_ = std::move(__state_);
718                 __raw_state->__step_from_to_(__guard, __future_step::__future,
719                                              __future_step::__no_future);
720             }
721         }
722 
723       private:
724         friend struct async_scope;
725         template <class _Self>
726         using __completions_t =
727             __future_completions_t<__mfront<_Sender, _Self>, _Env>;
728 
729         template <class _Receiver>
730         using __future_op_t = stdexec::__t<
731             __future_op<_SenderId, _EnvId, stdexec::__id<_Receiver>>>;
732 
__texec::__scope::__future::__t733         explicit __t(
734             std::unique_ptr<__future_state<_Sender, _Env>> __state) noexcept :
735             __state_(std::move(__state))
736         {
737             std::unique_lock __guard{__state_->__mutex_};
738             __state_->__step_from_to_(__guard, __future_step::__created,
739                                       __future_step::__future);
740         }
741 
742         template <__decays_to<__t> _Self, receiver _Receiver>
743             requires receiver_of<_Receiver, __completions_t<_Self>>
tag_invokeexec::__scope::__future744         friend auto tag_invoke(connect_t, _Self&& __self, _Receiver __rcvr)
745             -> __future_op_t<_Receiver>
746         {
747             return __future_op_t<_Receiver>{static_cast<_Receiver&&>(__rcvr),
748                                             std::move(__self.__state_)};
749         }
750 
751         template <__decays_to<__t> _Self, class _OtherEnv>
tag_invokeexec::__scope::__future752         friend auto tag_invoke(get_completion_signatures_t, _Self&&,
753                                _OtherEnv&&) -> __completions_t<_Self>
754         {
755             return {};
756         }
757 
tag_invokeexec::__scope::__future758         friend auto tag_invoke(get_env_t, const __t&) noexcept -> empty_env
759         {
760             return {};
761         }
762 
763         std::unique_ptr<__future_state<_Sender, _Env>> __state_;
764     };
765 };
766 
767 template <class _Sender, class _Env>
768 using __future_t = stdexec::__t<
769     __future<__id<__nest_sender_t<_Sender>>, __id<__decay_t<_Env>>>>;
770 
771 ////////////////////////////////////////////////////////////////////////////
772 // async_scope::spawn implementation
773 template <class _Env>
774 using __spawn_env_t =
775     __env::__join_t<_Env, __env::__with<in_place_stop_token, get_stop_token_t>,
776                     __env::__with<__inln::__scheduler, get_scheduler_t>>;
777 
778 template <class _EnvId>
779 struct __spawn_op_base
780 {
781     using _Env = stdexec::__t<_EnvId>;
782     __spawn_env_t<_Env> __env_;
783     void (*__delete_)(__spawn_op_base*);
784 };
785 
786 template <class _EnvId>
787 struct __spawn_rcvr
788 {
789     using _Env = stdexec::__t<_EnvId>;
790 
791     struct __t
792     {
793         using __id = __spawn_rcvr;
794         using receiver_concept = stdexec::receiver_t;
795         __spawn_op_base<_EnvId>* __op_;
796 
797         template <__one_of<set_value_t, set_stopped_t> _Tag>
tag_invokeexec::__scope::__spawn_rcvr798         friend void tag_invoke(_Tag, __t&& __self) noexcept
799         {
800             __self.__op_->__delete_(__self.__op_);
801         }
802 
803         // BUGBUG NOT TO SPEC spawn shouldn't accept senders that can fail.
804         template <same_as<set_error_t> _Tag>
tag_invokeexec::__scope::__spawn_rcvr805         [[noreturn]] friend void tag_invoke(_Tag, __t&&,
806                                             std::exception_ptr __eptr) noexcept
807         {
808             std::rethrow_exception(std::move(__eptr));
809         }
810 
tag_invokeexec::__scope::__spawn_rcvr811         friend auto tag_invoke(get_env_t, const __t& __self) noexcept
812             -> const __spawn_env_t<_Env>&
813         {
814             return __self.__op_->__env_;
815         }
816     };
817 };
818 
819 template <class _Env>
820 using __spawn_receiver_t = stdexec::__t<__spawn_rcvr<__id<_Env>>>;
821 
822 template <class _SenderId, class _EnvId>
823 struct __spawn_op
824 {
825     using _Env = stdexec::__t<_EnvId>;
826     using _Sender = stdexec::__t<_SenderId>;
827 
828     struct __t : __spawn_op_base<_EnvId>
829     {
830         template <__decays_to<_Sender> _Sndr>
__texec::__scope::__spawn_op::__t831         __t(_Sndr&& __sndr, _Env __env, const __impl* __scope) :
832             __spawn_op_base<_EnvId>{
833                 __env::__join(
834                     static_cast<_Env&&>(__env),
835                     __env::__with(__scope->__stop_source_.get_token(),
836                                   get_stop_token),
837                     __env::__with(__inln::__scheduler{}, get_scheduler)),
838                 [](__spawn_op_base<_EnvId>* __op) {
839             delete static_cast<__t*>(__op);
840         }},
841             __op_(stdexec::connect(static_cast<_Sndr&&>(__sndr),
842                                    __spawn_receiver_t<_Env>{this}))
843         {}
844 
__start_exec::__scope::__spawn_op::__t845         void __start_() noexcept
846         {
847             start(__op_);
848         }
849 
tag_invokeexec::__scope::__spawn_op850         friend void tag_invoke(start_t, __t& __self) noexcept
851         {
852             return __self.__start_();
853         }
854 
855         connect_result_t<_Sender, __spawn_receiver_t<_Env>> __op_;
856     };
857 };
858 
859 template <class _Sender, class _Env>
860 using __spawn_operation_t = stdexec::__t<__spawn_op<__id<_Sender>, __id<_Env>>>;
861 
862 ////////////////////////////////////////////////////////////////////////////
863 // async_scope
864 struct async_scope : __immovable
865 {
866     async_scope() = default;
867 
868     template <sender _Constrained>
when_emptyexec::__scope::async_scope869     [[nodiscard]] auto when_empty(_Constrained&& __c) const
870         -> __when_empty_sender_t<_Constrained>
871     {
872         return __when_empty_sender_t<_Constrained>{
873             &__impl_, static_cast<_Constrained&&>(__c)};
874     }
875 
on_emptyexec::__scope::async_scope876     [[nodiscard]] auto on_empty() const
877     {
878         return when_empty(just());
879     }
880 
881     template <sender _Constrained>
882     using nest_result_t = __nest_sender_t<_Constrained>;
883 
884     template <sender _Constrained>
nestexec::__scope::async_scope885     [[nodiscard]] auto nest(_Constrained&& __c) -> nest_result_t<_Constrained>
886     {
887         return nest_result_t<_Constrained>{&__impl_,
888                                            static_cast<_Constrained&&>(__c)};
889     }
890 
891     template <__movable_value _Env = empty_env,
892               sender_in<__spawn_env_t<_Env>> _Sender>
893         requires sender_to<nest_result_t<_Sender>, __spawn_receiver_t<_Env>>
spawnexec::__scope::async_scope894     void spawn(_Sender&& __sndr, _Env __env = {})
895     {
896         using __op_t = __spawn_operation_t<nest_result_t<_Sender>, _Env>;
897         // start is noexcept so we can assume that the operation will complete
898         // after this, which means we can rely on its self-ownership to ensure
899         // that it is eventually deleted
900         stdexec::start(*new __op_t{nest(static_cast<_Sender&&>(__sndr)),
901                                    static_cast<_Env&&>(__env), &__impl_});
902     }
903 
904     template <__movable_value _Env = empty_env,
905               sender_in<__env_t<_Env>> _Sender>
spawn_futureexec::__scope::async_scope906     auto spawn_future(_Sender&& __sndr, _Env __env = {})
907         -> __future_t<_Sender, _Env>
908     {
909         using __state_t = __future_state<nest_result_t<_Sender>, _Env>;
910         auto __state =
911             std::make_unique<__state_t>(nest(static_cast<_Sender&&>(__sndr)),
912                                         static_cast<_Env&&>(__env), &__impl_);
913         stdexec::start(__state->__op_);
914         return __future_t<_Sender, _Env>{std::move(__state)};
915     }
916 
get_stop_sourceexec::__scope::async_scope917     auto get_stop_source() noexcept -> in_place_stop_source&
918     {
919         return __impl_.__stop_source_;
920     }
921 
get_stop_tokenexec::__scope::async_scope922     auto get_stop_token() const noexcept -> in_place_stop_token
923     {
924         return __impl_.__stop_source_.get_token();
925     }
926 
request_stopexec::__scope::async_scope927     auto request_stop() noexcept -> bool
928     {
929         return __impl_.__stop_source_.request_stop();
930     }
931 
932   private:
933     __impl __impl_;
934 };
935 } // namespace __scope
936 
937 using __scope::async_scope;
938 } // namespace exec
939