1 /*
2  * Copyright (c) 2021-2024 NVIDIA Corporation
3  *
4  * Licensed under the Apache License Version 2.0 with LLVM Exceptions
5  * (the "License"); you may not use this file except in compliance with
6  * the License. You may obtain a copy of the License at
7  *
8  *   https://llvm.org/LICENSE.txt
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
18 #include "../stdexec/__detail/__intrusive_queue.hpp"
19 #include "../stdexec/execution.hpp"
20 #include "../stdexec/stop_token.hpp"
21 #include "env.hpp"
22 
23 #include <mutex>
24 
25 namespace exec
26 {
27 /////////////////////////////////////////////////////////////////////////////
28 // async_scope
29 namespace __scope
30 {
31 using namespace stdexec;
32 
33 struct __impl;
34 struct async_scope;
35 
36 template <class _A>
37 concept __async_scope = requires(_A& __a) {
38                             {
39                                 __a.nest(stdexec::just())
40                             } -> sender_of<stdexec::set_value_t()>;
41                         };
42 
43 struct __task : __immovable
44 {
45     const __impl* __scope_;
46     void (*__notify_waiter)(__task*) noexcept;
47     __task* __next_ = nullptr;
48 };
49 
50 template <class _BaseEnv>
51 using __env_t =
52     make_env_t<_BaseEnv, prop<get_stop_token_t, inplace_stop_token>>;
53 
54 struct __impl
55 {
56     inplace_stop_source __stop_source_{};
57     mutable std::mutex __lock_{};
58     mutable std::ptrdiff_t __active_ = 0;
59     mutable __intrusive_queue<&__task::__next_> __waiters_{};
60 
~__implexec::__scope::__impl61     ~__impl()
62     {
63         std::unique_lock __guard{__lock_};
64         STDEXEC_ASSERT(__active_ == 0);
65         STDEXEC_ASSERT(__waiters_.empty());
66     }
67 };
68 
69 ////////////////////////////////////////////////////////////////////////////
70 // async_scope::when_empty implementation
71 template <class _ConstrainedId, class _ReceiverId>
72 struct __when_empty_op
73 {
74     using _Constrained = __cvref_t<_ConstrainedId>;
75     using _Receiver = stdexec::__t<_ReceiverId>;
76 
77     struct __t : __task
78     {
79         using __id = __when_empty_op;
80 
__texec::__scope::__when_empty_op::__t81         explicit __t(const __impl* __scope, _Constrained&& __sndr,
82                      _Receiver __rcvr) :
83             __task{{}, __scope, __notify_waiter},
84             __op_(stdexec::connect(static_cast<_Constrained&&>(__sndr),
85                                    static_cast<_Receiver&&>(__rcvr)))
86         {}
87 
startexec::__scope::__when_empty_op::__t88         void start() & noexcept
89         {
90             std::unique_lock __guard{this->__scope_->__lock_};
91             auto& __active = this->__scope_->__active_;
92             auto& __waiters = this->__scope_->__waiters_;
93             if (__active != 0)
94             {
95                 __waiters.push_back(this);
96                 return;
97             }
98             __guard.unlock();
99             stdexec::start(this->__op_);
100         }
101 
102       private:
__notify_waiterexec::__scope::__when_empty_op::__t103         static void __notify_waiter(__task* __self) noexcept
104         {
105             stdexec::start(static_cast<__t*>(__self)->__op_);
106         }
107 
108         STDEXEC_IMMOVABLE_NO_UNIQUE_ADDRESS
109         connect_result_t<_Constrained, _Receiver> __op_;
110     };
111 };
112 
113 template <class _ConstrainedId>
114 struct __when_empty_sender
115 {
116     using _Constrained = stdexec::__t<_ConstrainedId>;
117 
118     struct __t
119     {
120         using __id = __when_empty_sender;
121         using sender_concept = stdexec::sender_t;
122 
123         template <class _Self, class _Receiver>
124         using __when_empty_op_t =
125             stdexec::__t<__when_empty_op<__cvref_id<_Self, _Constrained>,
126                                          stdexec::__id<_Receiver>>>;
127 
128         template <__decays_to<__t> _Self, receiver _Receiver>
129             requires sender_to<__copy_cvref_t<_Self, _Constrained>, _Receiver>
connectexec::__scope::__when_empty_sender::__t130         [[nodiscard]] static auto connect(_Self&& __self, _Receiver __rcvr) //
131             -> __when_empty_op_t<_Self, _Receiver>
132         {
133             return __when_empty_op_t<_Self, _Receiver>{
134                 __self.__scope_, static_cast<_Self&&>(__self).__c_,
135                 static_cast<_Receiver&&>(__rcvr)};
136         }
137 
138         template <__decays_to<__t> _Self, class... _Env>
get_completion_signaturesexec::__scope::__when_empty_sender::__t139         static auto get_completion_signatures(_Self&&, _Env&&...)
140             -> __completion_signatures_of_t<__copy_cvref_t<_Self, _Constrained>,
141                                             __env_t<_Env>...>
142         {
143             return {};
144         }
145 
146         const __impl* __scope_;
147         STDEXEC_ATTRIBUTE((no_unique_address))
148         _Constrained __c_;
149     };
150 };
151 
152 template <class _Constrained>
153 using __when_empty_sender_t =
154     stdexec::__t<__when_empty_sender<__id<__decay_t<_Constrained>>>>;
155 
156 ////////////////////////////////////////////////////////////////////////////
157 // async_scope::nest implementation
158 template <class _ReceiverId>
159 struct __nest_op_base : __immovable
160 {
161     using _Receiver = stdexec::__t<_ReceiverId>;
162     const __impl* __scope_;
163     STDEXEC_ATTRIBUTE((no_unique_address))
164     _Receiver __rcvr_;
165 };
166 
167 template <class _ReceiverId>
168 struct __nest_rcvr
169 {
170     using _Receiver = stdexec::__t<_ReceiverId>;
171 
172     struct __t
173     {
174         using __id = __nest_rcvr;
175         using receiver_concept = stdexec::receiver_t;
176         __nest_op_base<_ReceiverId>* __op_;
177 
__completeexec::__scope::__nest_rcvr::__t178         static void __complete(const __impl* __scope) noexcept
179         {
180             std::unique_lock __guard{__scope->__lock_};
181             auto& __active = __scope->__active_;
182             if (--__active == 0)
183             {
184                 auto __local = std::move(__scope->__waiters_);
185                 __guard.unlock();
186                 __scope = nullptr;
187                 // do not access __scope
188                 while (!__local.empty())
189                 {
190                     auto* __next = __local.pop_front();
191                     __next->__notify_waiter(__next);
192                     // __scope must be considered deleted
193                 }
194             }
195         }
196 
197         template <class... _As>
198             requires __callable<set_value_t, _Receiver, _As...>
set_valueexec::__scope::__nest_rcvr::__t199         void set_value(_As&&... __as) noexcept
200         {
201             auto __scope = __op_->__scope_;
202             stdexec::set_value(std::move(__op_->__rcvr_),
203                                static_cast<_As&&>(__as)...);
204             // do not access __op_
205             // do not access this
206             __complete(__scope);
207         }
208 
209         template <class _Error>
210             requires __callable<set_error_t, _Receiver, _Error>
set_errorexec::__scope::__nest_rcvr::__t211         void set_error(_Error&& __err) noexcept
212         {
213             auto __scope = __op_->__scope_;
214             stdexec::set_error(std::move(__op_->__rcvr_),
215                                static_cast<_Error&&>(__err));
216             // do not access __op_
217             // do not access this
218             __complete(__scope);
219         }
220 
set_stoppedexec::__scope::__nest_rcvr::__t221         void set_stopped() noexcept
222             requires __callable<set_stopped_t, _Receiver>
223         {
224             auto __scope = __op_->__scope_;
225             stdexec::set_stopped(std::move(__op_->__rcvr_));
226             // do not access __op_
227             // do not access this
228             __complete(__scope);
229         }
230 
get_envexec::__scope::__nest_rcvr::__t231         auto get_env() const noexcept -> __env_t<env_of_t<_Receiver>>
232         {
233             return make_env(
234                 stdexec::get_env(__op_->__rcvr_),
235                 stdexec::prop{get_stop_token,
236                               __op_->__scope_->__stop_source_.get_token()});
237         }
238     };
239 };
240 
241 template <class _ConstrainedId, class _ReceiverId>
242 struct __nest_op
243 {
244     using _Constrained = stdexec::__t<_ConstrainedId>;
245     using _Receiver = stdexec::__t<_ReceiverId>;
246 
247     struct __t : __nest_op_base<_ReceiverId>
248     {
249         using __id = __nest_op;
250         using __nest_rcvr_t = stdexec::__t<__nest_rcvr<_ReceiverId>>;
251         STDEXEC_IMMOVABLE_NO_UNIQUE_ADDRESS
252         connect_result_t<_Constrained, __nest_rcvr_t> __op_;
253 
254         template <__decays_to<_Constrained> _Sender,
255                   __decays_to<_Receiver> _Rcvr>
__texec::__scope::__nest_op::__t256         explicit __t(const __impl* __scope, _Sender&& __c, _Rcvr&& __rcvr) :
257             __nest_op_base<_ReceiverId>{
258                 {}, __scope, static_cast<_Rcvr&&>(__rcvr)},
259             __op_(stdexec::connect(static_cast<_Sender&&>(__c),
260                                    __nest_rcvr_t{this}))
261         {}
262 
startexec::__scope::__nest_op::__t263         void start() & noexcept
264         {
265             STDEXEC_ASSERT(this->__scope_);
266             std::unique_lock __guard{this->__scope_->__lock_};
267             auto& __active = this->__scope_->__active_;
268             ++__active;
269             __guard.unlock();
270             stdexec::start(__op_);
271         }
272     };
273 };
274 
275 template <class _ConstrainedId>
276 struct __nest_sender
277 {
278     using _Constrained = stdexec::__t<_ConstrainedId>;
279 
280     struct __t
281     {
282         using __id = __nest_sender;
283         using sender_concept = stdexec::sender_t;
284 
285         const __impl* __scope_;
286         STDEXEC_ATTRIBUTE((no_unique_address))
287         _Constrained __c_;
288 
289         template <class _Receiver>
290         using __nest_operation_t =
291             stdexec::__t<__nest_op<_ConstrainedId, stdexec::__id<_Receiver>>>;
292 
293         template <class _Receiver>
294         using __nest_receiver_t =
295             stdexec::__t<__nest_rcvr<stdexec::__id<_Receiver>>>;
296 
297         template <__decays_to<__t> _Self, receiver _Receiver>
298             requires sender_to<__copy_cvref_t<_Self, _Constrained>,
299                                __nest_receiver_t<_Receiver>>
connectexec::__scope::__nest_sender::__t300         [[nodiscard]] static auto connect(_Self&& __self, _Receiver __rcvr)
301             -> __nest_operation_t<_Receiver>
302         {
303             return __nest_operation_t<_Receiver>{
304                 __self.__scope_, static_cast<_Self&&>(__self).__c_,
305                 static_cast<_Receiver&&>(__rcvr)};
306         }
307 
308         template <__decays_to<__t> _Self, class... _Env>
get_completion_signaturesexec::__scope::__nest_sender::__t309         static auto get_completion_signatures(_Self&&, _Env&&...)
310             -> __completion_signatures_of_t<__copy_cvref_t<_Self, _Constrained>,
311                                             __env_t<_Env>...>
312         {
313             return {};
314         }
315     };
316 };
317 
318 template <class _Constrained>
319 using __nest_sender_t =
320     stdexec::__t<__nest_sender<__id<__decay_t<_Constrained>>>>;
321 
322 ////////////////////////////////////////////////////////////////////////////
323 // async_scope::spawn_future implementation
324 enum class __future_step
325 {
326     __invalid = 0,
327     __created,
328     __future,
329     __no_future,
330     __deleted
331 };
332 
333 template <class _Sender, class _Env>
334 struct __future_state;
335 
336 struct __forward_stopped
337 {
338     inplace_stop_source* __stop_source_;
339 
operator ()exec::__scope::__forward_stopped340     void operator()() noexcept
341     {
342         __stop_source_->request_stop();
343     }
344 };
345 
346 struct __subscription : __immovable
347 {
348     void (*__complete_)(__subscription*) noexcept = nullptr;
349 
__completeexec::__scope::__subscription350     void __complete() noexcept
351     {
352         __complete_(this);
353     }
354 
355     __subscription* __next_ = nullptr;
356 };
357 
358 template <class _SenderId, class _EnvId, class _ReceiverId>
359 struct __future_op
360 {
361     using _Sender = stdexec::__t<_SenderId>;
362     using _Env = stdexec::__t<_EnvId>;
363     using _Receiver = stdexec::__t<_ReceiverId>;
364 
365     class __t : __subscription
366     {
367         using __forward_consumer = typename stop_token_of_t<
368             env_of_t<_Receiver>>::template callback_type<__forward_stopped>;
369 
__complete_()370         void __complete_() noexcept
371         {
372             try
373             {
374                 auto __state = std::move(__state_);
375                 STDEXEC_ASSERT(__state != nullptr);
376                 std::unique_lock __guard{__state->__mutex_};
377                 // either the future is still in use or it has passed ownership
378                 // to __state->__no_future_
379                 if (__state->__no_future_.get() != nullptr ||
380                     __state->__step_ != __future_step::__future)
381                 {
382                     // invalid state - there is a code bug in the state machine
383                     std::terminate();
384                 }
385                 else if (get_stop_token(get_env(__rcvr_)).stop_requested())
386                 {
387                     __guard.unlock();
388                     stdexec::set_stopped(static_cast<_Receiver&&>(__rcvr_));
389                     __guard.lock();
390                 }
391                 else
392                 {
393                     std::visit(
394                         [this, &__guard]<class _Tup>(_Tup& __tup) {
395                             if constexpr (same_as<_Tup, std::monostate>)
396                             {
397                                 std::terminate();
398                             }
399                             else
400                             {
401                                 std::apply(
402                                     [this, &__guard]<class... _As>(
403                                         auto tag, _As&... __as) {
404                                         __guard.unlock();
405                                         tag(static_cast<_Receiver&&>(__rcvr_),
406                                             static_cast<_As&&>(__as)...);
407                                         __guard.lock();
408                                     },
409                                     __tup);
410                             }
411                         },
412                         __state->__data_);
413                 }
414             }
415             catch (...)
416             {
417                 stdexec::set_error(static_cast<_Receiver&&>(__rcvr_),
418                                    std::current_exception());
419             }
420         }
421 
422         STDEXEC_ATTRIBUTE((no_unique_address))
423         _Receiver __rcvr_;
424         std::unique_ptr<__future_state<_Sender, _Env>> __state_;
425         STDEXEC_ATTRIBUTE((no_unique_address))
426         __forward_consumer __forward_consumer_;
427 
428       public:
429         using __id = __future_op;
430 
~__t()431         ~__t() noexcept
432         {
433             if (__state_ != nullptr)
434             {
435                 auto __raw_state = __state_.get();
436                 std::unique_lock __guard{__raw_state->__mutex_};
437                 if (__raw_state->__data_.index() > 0)
438                 {
439                     // completed given sender
440                     // state is no longer needed
441                     return;
442                 }
443                 __raw_state->__no_future_ = std::move(__state_);
444                 __raw_state->__step_from_to_(__guard, __future_step::__future,
445                                              __future_step::__no_future);
446             }
447         }
448 
449         template <class _Receiver2>
__t(_Receiver2 && __rcvr,std::unique_ptr<__future_state<_Sender,_Env>> __state)450         explicit __t(_Receiver2&& __rcvr,
451                      std::unique_ptr<__future_state<_Sender, _Env>> __state) :
452             __subscription{{},
453                            [](__subscription* __self) noexcept -> void {
454                                static_cast<__t*>(__self)->__complete_();
455                            }},
456             __rcvr_(static_cast<_Receiver2&&>(__rcvr)),
457             __state_(std::move(__state)),
458             __forward_consumer_(get_stop_token(get_env(__rcvr_)),
459                                 __forward_stopped{&__state_->__stop_source_})
460         {}
461 
start()462         void start() & noexcept
463         {
464             try
465             {
466                 if (!!__state_)
467                 {
468                     std::unique_lock __guard{__state_->__mutex_};
469                     if (__state_->__data_.index() != 0)
470                     {
471                         __guard.unlock();
472                         __complete_();
473                     }
474                     else
475                     {
476                         __state_->__subscribers_.push_back(this);
477                     }
478                 }
479             }
480             catch (...)
481             {
482                 stdexec::set_error(static_cast<_Receiver&&>(__rcvr_),
483                                    std::current_exception());
484             }
485         }
486     };
487 };
488 
489 #if STDEXEC_NVHPC()
490 template <class _Fn>
491 struct __completion_as_tuple2_;
492 
493 template <class _Tag, class... _Ts>
494 struct __completion_as_tuple2_<_Tag(_Ts...)>
495 {
496     using __t = std::tuple<_Tag, _Ts...>;
497 };
498 template <class _Fn>
499 using __completion_as_tuple_t = stdexec::__t<__completion_as_tuple2_<_Fn>>;
500 
501 #else
502 
503 template <class _Tag, class... _Ts>
504 auto __completion_as_tuple_(_Tag (*)(_Ts...)) -> std::tuple<_Tag, _Ts...>;
505 
506 template <class _Fn>
507 using __completion_as_tuple_t =
508     decltype(__scope::__completion_as_tuple_(static_cast<_Fn*>(nullptr)));
509 #endif
510 
511 template <class... _Ts>
512 using __decay_values_t = completion_signatures<set_value_t(__decay_t<_Ts>...)>;
513 
514 template <class _Ty>
515 using __decay_error_t = completion_signatures<set_error_t(__decay_t<_Ty>)>;
516 
517 template <class _Sender, class _Env>
518 using __future_completions_t = //
519     transform_completion_signatures_of<
520         _Sender, __env_t<_Env>,
521         completion_signatures<set_stopped_t(), set_error_t(std::exception_ptr)>,
522         __decay_values_t, __decay_error_t>;
523 
524 template <class _Completions>
525 using __completions_as_variant = //
526     __mapply<__mtransform<__q<__completion_as_tuple_t>,
527                           __mbind_front_q<std::variant, std::monostate>>,
528              _Completions>;
529 
530 template <class _Ty>
531 struct __dynamic_delete
532 {
__anonf62d8bc60202exec::__scope::__dynamic_delete533     __dynamic_delete() : __delete_([](_Ty* __p) { delete __p; }) {}
534 
535     template <class _Uy>
536         requires convertible_to<_Uy*, _Ty*>
__dynamic_deleteexec::__scope::__dynamic_delete537     __dynamic_delete(std::default_delete<_Uy>) :
538         __delete_([](_Ty* __p) { delete static_cast<_Uy*>(__p); })
539     {}
540 
541     template <class _Uy>
542         requires convertible_to<_Uy*, _Ty*>
operator =exec::__scope::__dynamic_delete543     auto operator=(std::default_delete<_Uy> __d) -> __dynamic_delete&
544     {
545         __delete_ = __dynamic_delete{__d}.__delete_;
546         return *this;
547     }
548 
operator ()exec::__scope::__dynamic_delete549     void operator()(_Ty* __p)
550     {
551         __delete_(__p);
552     }
553 
554     void (*__delete_)(_Ty*);
555 };
556 
557 template <class _Completions, class _Env>
558 struct __future_state_base
559 {
__future_state_baseexec::__scope::__future_state_base560     __future_state_base(_Env __env, const __impl* __scope) :
561         __forward_scope_{std::in_place, __scope->__stop_source_.get_token(),
562                          __forward_stopped{&__stop_source_}},
563         __env_(make_env(
564             static_cast<_Env&&>(__env),
565             stdexec::prop{get_stop_token, __scope->__stop_source_.get_token()}))
566     {}
567 
~__future_state_baseexec::__scope::__future_state_base568     ~__future_state_base()
569     {
570         std::unique_lock __guard{__mutex_};
571         if (__step_ == __future_step::__created)
572         {
573             // exception during connect() will end up here
574             __step_from_to_(__guard, __future_step::__created,
575                             __future_step::__deleted);
576         }
577         else if (__step_ != __future_step::__deleted)
578         {
579             // completing the given sender before the future is dropped will end
580             // here
581             __step_from_to_(__guard, __future_step::__future,
582                             __future_step::__deleted);
583         }
584     }
585 
__step_from_to_exec::__scope::__future_state_base586     void __step_from_to_(std::unique_lock<std::mutex>& __guard,
587                          __future_step __from, __future_step __to)
588     {
589         STDEXEC_ASSERT(__guard.owns_lock());
590         auto actual = std::exchange(__step_, __to);
591         STDEXEC_ASSERT(actual == __from);
592     }
593 
594     inplace_stop_source __stop_source_;
595     std::optional<inplace_stop_callback<__forward_stopped>> __forward_scope_;
596     std::mutex __mutex_;
597     __future_step __step_ = __future_step::__created;
598     std::unique_ptr<__future_state_base, __dynamic_delete<__future_state_base>>
599         __no_future_;
600     __completions_as_variant<_Completions> __data_;
601     __intrusive_queue<&__subscription::__next_> __subscribers_;
602     __env_t<_Env> __env_;
603 };
604 
605 template <class _Completions, class _EnvId>
606 struct __future_rcvr
607 {
608     using _Env = stdexec::__t<_EnvId>;
609 
610     struct __t
611     {
612         using __id = __future_rcvr;
613         using receiver_concept = stdexec::receiver_t;
614         __future_state_base<_Completions, _Env>* __state_;
615         const __impl* __scope_;
616 
__dispatch_result_exec::__scope::__future_rcvr::__t617         void __dispatch_result_() noexcept
618         {
619             auto& __state = *__state_;
620             std::unique_lock __guard{__state.__mutex_};
621             auto __local = std::move(__state.__subscribers_);
622             __state.__forward_scope_ = std::nullopt;
623             if (__state.__no_future_.get() != nullptr)
624             {
625                 // nobody is waiting for the results
626                 // delete this and return
627                 __state.__step_from_to_(__guard, __future_step::__no_future,
628                                         __future_step::__deleted);
629                 __guard.unlock();
630                 __state.__no_future_.reset();
631                 return;
632             }
633             __guard.unlock();
634             while (!__local.empty())
635             {
636                 auto* __sub = __local.pop_front();
637                 __sub->__complete();
638             }
639         }
640 
641         template <class _Tag, class... _As>
__save_completionexec::__scope::__future_rcvr::__t642         bool __save_completion(_Tag, _As&&... __as) noexcept
643         {
644             auto& __state = *__state_;
645             try
646             {
647                 std::unique_lock __guard{__state.__mutex_};
648                 using _Tuple = __decayed_std_tuple<_Tag, _As...>;
649                 __state.__data_.template emplace<_Tuple>(
650                     _Tag(), static_cast<_As&&>(__as)...);
651                 return true;
652             }
653             catch (...)
654             {
655                 using _Tuple = std::tuple<set_error_t, std::exception_ptr>;
656                 __state.__data_.template emplace<_Tuple>(
657                     set_error_t(), std::current_exception());
658             }
659             return false;
660         }
661 
662         template <__movable_value... _As>
set_valueexec::__scope::__future_rcvr::__t663         void set_value(_As&&... __as) noexcept
664         {
665             if (__save_completion(set_value_t(), static_cast<_As&&>(__as)...))
666             {
667                 __dispatch_result_();
668             }
669         }
670 
671         template <__movable_value _Error>
set_errorexec::__scope::__future_rcvr::__t672         void set_error(_Error&& __err) noexcept
673         {
674             if (__save_completion(set_error_t(), static_cast<_Error&&>(__err)))
675             {
676                 __dispatch_result_();
677             }
678         }
679 
set_stoppedexec::__scope::__future_rcvr::__t680         void set_stopped() noexcept
681         {
682             if (__save_completion(set_stopped_t()))
683             {
684                 __dispatch_result_();
685             }
686         }
687 
get_envexec::__scope::__future_rcvr::__t688         auto get_env() const noexcept -> const __env_t<_Env>&
689         {
690             return __state_->__env_;
691         }
692     };
693 };
694 
695 template <class _Sender, class _Env>
696 using __future_receiver_t =
697     __t<__future_rcvr<__future_completions_t<_Sender, _Env>, __id<_Env>>>;
698 
699 template <class _Sender, class _Env>
700 struct __future_state :
701     __future_state_base<__future_completions_t<_Sender, _Env>, _Env>
702 {
703     using _Completions = __future_completions_t<_Sender, _Env>;
704 
__future_stateexec::__scope::__future_state705     __future_state(_Sender __sndr, _Env __env, const __impl* __scope) :
706         __future_state_base<_Completions, _Env>(static_cast<_Env&&>(__env),
707                                                 __scope),
708         __op_(
709             stdexec::connect(static_cast<_Sender&&>(__sndr),
710                              __future_receiver_t<_Sender, _Env>{this, __scope}))
711     {}
712 
713     connect_result_t<_Sender, __future_receiver_t<_Sender, _Env>> __op_;
714 };
715 
716 template <class _SenderId, class _EnvId>
717 struct __future
718 {
719     using _Sender = stdexec::__t<_SenderId>;
720     using _Env = stdexec::__t<_EnvId>;
721 
722     class __t
723     {
724         template <class _Self>
725         using __completions_t =
726             __future_completions_t<__mfront<_Sender, _Self>, _Env>;
727 
728         template <class _Receiver>
729         using __future_op_t = stdexec::__t<
730             __future_op<_SenderId, _EnvId, stdexec::__id<_Receiver>>>;
731 
732       public:
733         using __id = __future;
734         using sender_concept = stdexec::sender_t;
735 
736         __t(__t&&) = default;
737         auto operator=(__t&&) -> __t& = default;
738 
~__t()739         ~__t() noexcept
740         {
741             if (__state_ != nullptr)
742             {
743                 auto __raw_state = __state_.get();
744                 std::unique_lock __guard{__raw_state->__mutex_};
745                 if (__raw_state->__data_.index() != 0)
746                 {
747                     // completed given sender
748                     // state is no longer needed
749                     return;
750                 }
751                 __raw_state->__no_future_ = std::move(__state_);
752                 __raw_state->__step_from_to_(__guard, __future_step::__future,
753                                              __future_step::__no_future);
754             }
755         }
756 
757         template <__decays_to<__t> _Self, receiver _Receiver>
758             requires receiver_of<_Receiver, __completions_t<_Self>>
connect(_Self && __self,_Receiver __rcvr)759         static auto connect(_Self&& __self,
760                             _Receiver __rcvr) -> __future_op_t<_Receiver>
761         {
762             return __future_op_t<_Receiver>{
763                 static_cast<_Receiver&&>(__rcvr),
764                 static_cast<_Self&&>(__self).__state_};
765         }
766 
767         template <__decays_to<__t> _Self, class... _OtherEnv>
get_completion_signatures(_Self &&,_OtherEnv &&...)768         static auto get_completion_signatures(_Self&&, _OtherEnv&&...)
769             -> __completions_t<_Self>
770         {
771             return {};
772         }
773 
774       private:
775         friend struct async_scope;
776 
__t(std::unique_ptr<__future_state<_Sender,_Env>> __state)777         explicit __t(
778             std::unique_ptr<__future_state<_Sender, _Env>> __state) noexcept :
779             __state_(std::move(__state))
780         {
781             std::unique_lock __guard{__state_->__mutex_};
782             __state_->__step_from_to_(__guard, __future_step::__created,
783                                       __future_step::__future);
784         }
785 
786         std::unique_ptr<__future_state<_Sender, _Env>> __state_;
787     };
788 };
789 
790 template <class _Sender, class _Env>
791 using __future_t = stdexec::__t<
792     __future<__id<__nest_sender_t<_Sender>>, __id<__decay_t<_Env>>>>;
793 
794 ////////////////////////////////////////////////////////////////////////////
795 // async_scope::spawn implementation
796 struct __spawn_env_
797 {
798     inplace_stop_token __token_;
799 
queryexec::__scope::__spawn_env_800     auto query(get_stop_token_t) const noexcept -> inplace_stop_token
801     {
802         return __token_;
803     }
804 
queryexec::__scope::__spawn_env_805     auto query(get_scheduler_t) const noexcept -> __inln::__scheduler
806     {
807         return {};
808     }
809 };
810 
811 template <class _Env>
812 using __spawn_env_t = __env::__join_t<_Env, __spawn_env_>;
813 
814 template <class _EnvId>
815 struct __spawn_op_base
816 {
817     using _Env = stdexec::__t<_EnvId>;
818     __spawn_env_t<_Env> __env_;
819     void (*__delete_)(__spawn_op_base*);
820 };
821 
822 template <class _EnvId>
823 struct __spawn_rcvr
824 {
825     using _Env = stdexec::__t<_EnvId>;
826 
827     struct __t
828     {
829         using __id = __spawn_rcvr;
830         using receiver_concept = stdexec::receiver_t;
831         __spawn_op_base<_EnvId>* __op_;
832 
set_valueexec::__scope::__spawn_rcvr::__t833         void set_value() noexcept
834         {
835             __op_->__delete_(__op_);
836         }
837 
838         // BUGBUG NOT TO SPEC spawn shouldn't accept senders that can fail.
set_errorexec::__scope::__spawn_rcvr::__t839         [[noreturn]] void set_error(std::exception_ptr __eptr) noexcept
840         {
841             std::rethrow_exception(std::move(__eptr));
842         }
843 
set_stoppedexec::__scope::__spawn_rcvr::__t844         void set_stopped() noexcept
845         {
846             __op_->__delete_(__op_);
847         }
848 
get_envexec::__scope::__spawn_rcvr::__t849         auto get_env() const noexcept -> const __spawn_env_t<_Env>&
850         {
851             return __op_->__env_;
852         }
853     };
854 };
855 
856 template <class _Env>
857 using __spawn_receiver_t = stdexec::__t<__spawn_rcvr<__id<_Env>>>;
858 
859 template <class _SenderId, class _EnvId>
860 struct __spawn_op
861 {
862     using _Env = stdexec::__t<_EnvId>;
863     using _Sender = stdexec::__t<_SenderId>;
864 
865     struct __t : __spawn_op_base<_EnvId>
866     {
867         template <__decays_to<_Sender> _Sndr>
__texec::__scope::__spawn_op::__t868         __t(_Sndr&& __sndr, _Env __env, const __impl* __scope) :
869             __spawn_op_base<_EnvId>{
870                 __env::__join(
871                     static_cast<_Env&&>(__env),
872                     __spawn_env_{__scope->__stop_source_.get_token()}),
873                 [](__spawn_op_base<_EnvId>* __op) {
874                     delete static_cast<__t*>(__op);
875                 }},
876             __op_(stdexec::connect(static_cast<_Sndr&&>(__sndr),
877                                    __spawn_receiver_t<_Env>{this}))
878         {}
879 
startexec::__scope::__spawn_op::__t880         void start() & noexcept
881         {
882             stdexec::start(__op_);
883         }
884 
885         connect_result_t<_Sender, __spawn_receiver_t<_Env>> __op_;
886     };
887 };
888 
889 template <class _Sender, class _Env>
890 using __spawn_operation_t = stdexec::__t<__spawn_op<__id<_Sender>, __id<_Env>>>;
891 
892 ////////////////////////////////////////////////////////////////////////////
893 // async_scope
894 struct async_scope : __immovable
895 {
896     async_scope() = default;
897 
898     template <sender _Constrained>
when_emptyexec::__scope::async_scope899     [[nodiscard]] auto when_empty(_Constrained&& __c) const
900         -> __when_empty_sender_t<_Constrained>
901     {
902         return __when_empty_sender_t<_Constrained>{
903             &__impl_, static_cast<_Constrained&&>(__c)};
904     }
905 
on_emptyexec::__scope::async_scope906     [[nodiscard]] auto on_empty() const
907     {
908         return when_empty(just());
909     }
910 
911     template <sender _Constrained>
912     using nest_result_t = __nest_sender_t<_Constrained>;
913 
914     template <sender _Constrained>
nestexec::__scope::async_scope915     [[nodiscard]] auto nest(_Constrained&& __c) -> nest_result_t<_Constrained>
916     {
917         return nest_result_t<_Constrained>{&__impl_,
918                                            static_cast<_Constrained&&>(__c)};
919     }
920 
921     template <__movable_value _Env = empty_env,
922               sender_in<__spawn_env_t<_Env>> _Sender>
923         requires sender_to<nest_result_t<_Sender>, __spawn_receiver_t<_Env>>
spawnexec::__scope::async_scope924     void spawn(_Sender&& __sndr, _Env __env = {})
925     {
926         using __op_t = __spawn_operation_t<nest_result_t<_Sender>, _Env>;
927         // start is noexcept so we can assume that the operation will complete
928         // after this, which means we can rely on its self-ownership to ensure
929         // that it is eventually deleted
930         stdexec::start(*new __op_t{nest(static_cast<_Sender&&>(__sndr)),
931                                    static_cast<_Env&&>(__env), &__impl_});
932     }
933 
934     template <__movable_value _Env = empty_env,
935               sender_in<__env_t<_Env>> _Sender>
spawn_futureexec::__scope::async_scope936     auto spawn_future(_Sender&& __sndr,
937                       _Env __env = {}) -> __future_t<_Sender, _Env>
938     {
939         using __state_t = __future_state<nest_result_t<_Sender>, _Env>;
940         auto __state =
941             std::make_unique<__state_t>(nest(static_cast<_Sender&&>(__sndr)),
942                                         static_cast<_Env&&>(__env), &__impl_);
943         stdexec::start(__state->__op_);
944         return __future_t<_Sender, _Env>{std::move(__state)};
945     }
946 
get_stop_sourceexec::__scope::async_scope947     auto get_stop_source() noexcept -> inplace_stop_source&
948     {
949         return __impl_.__stop_source_;
950     }
951 
get_stop_tokenexec::__scope::async_scope952     auto get_stop_token() const noexcept -> inplace_stop_token
953     {
954         return __impl_.__stop_source_.get_token();
955     }
956 
request_stopexec::__scope::async_scope957     auto request_stop() noexcept -> bool
958     {
959         return __impl_.__stop_source_.request_stop();
960     }
961 
962   private:
963     __impl __impl_;
964 };
965 } // namespace __scope
966 
967 using __scope::async_scope;
968 
969 template <class _AsyncScope, class _Sender>
970 using nest_result_t = decltype(stdexec::__declval<_AsyncScope&>().nest(
971     stdexec::__declval<_Sender&&>()));
972 } // namespace exec
973