xref: /openbmc/sdbusplus/include/sdbusplus/async/stdexec/async_scope.hpp (revision 10d0b4b7d1498cfd5c3d37edea271a54d1984e41)
1 /*
2  * Copyright (c) 2021-2024 NVIDIA Corporation
3  *
4  * Licensed under the Apache License Version 2.0 with LLVM Exceptions
5  * (the "License"); you may not use this file except in compliance with
6  * the License. You may obtain a copy of the License at
7  *
8  *   https://llvm.org/LICENSE.txt
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
18 #include "../stdexec/execution.hpp"
19 #include "../stdexec/stop_token.hpp"
20 #include "../stdexec/__detail/__intrusive_queue.hpp"
21 #include "../stdexec/__detail/__optional.hpp"
22 #include "env.hpp"
23 
24 #include <atomic>
25 #include <mutex>
26 
27 namespace exec {
28   /////////////////////////////////////////////////////////////////////////////
29   // async_scope
30   namespace __scope {
31     using namespace stdexec;
32 
33     struct __impl;
34     struct async_scope;
35 
36     template <class _A>
37     concept __async_scope = requires(_A& __a) {
38       { __a.nest(stdexec::just()) } -> sender_of<stdexec::set_value_t()>;
39     };
40 
41     struct __task : __immovable {
42       const __impl* __scope_;
43       void (*__notify_waiter)(__task*) noexcept;
44       __task* __next_ = nullptr;
45     };
46 
47     template <class _BaseEnv>
48     using __env_t = make_env_t<_BaseEnv, prop<get_stop_token_t, inplace_stop_token>>;
49 
50     struct __impl {
51       inplace_stop_source __stop_source_{};
52       mutable std::mutex __lock_{};
53       mutable std::atomic_ptrdiff_t __active_ = 0;
54       mutable __intrusive_queue<&__task::__next_> __waiters_{};
55 
~__implexec::__scope::__impl56       ~__impl() {
57         std::unique_lock __guard{__lock_};
58         STDEXEC_ASSERT(__active_ == 0);
59         STDEXEC_ASSERT(__waiters_.empty());
60       }
61     };
62 
63     ////////////////////////////////////////////////////////////////////////////
64     // async_scope::when_empty implementation
65     template <class _ConstrainedId, class _ReceiverId>
66     struct __when_empty_op {
67       using _Constrained = __cvref_t<_ConstrainedId>;
68       using _Receiver = stdexec::__t<_ReceiverId>;
69 
70       struct __t : __task {
71         using __id = __when_empty_op;
72 
__texec::__scope::__when_empty_op::__t73         explicit __t(const __impl* __scope, _Constrained&& __sndr, _Receiver __rcvr)
74           : __task{{}, __scope, __notify_waiter}
75           , __op_(
76               stdexec::connect(
77                 static_cast<_Constrained&&>(__sndr),
78                 static_cast<_Receiver&&>(__rcvr))) {
79         }
80 
startexec::__scope::__when_empty_op::__t81         void start() & noexcept {
82           // must get lock before checking __active, or if the __active is drained before
83           // the waiter is queued but after __active is checked, the waiter will never be notified
84           std::unique_lock __guard{this->__scope_->__lock_};
85           auto& __active = this->__scope_->__active_;
86           auto& __waiters = this->__scope_->__waiters_;
87           if (__active.load(std::memory_order_acquire) != 0) {
88             __waiters.push_back(this);
89             return;
90           }
91           __guard.unlock();
92           stdexec::start(this->__op_);
93         }
94 
95        private:
__notify_waiterexec::__scope::__when_empty_op::__t96         static void __notify_waiter(__task* __self) noexcept {
97           stdexec::start(static_cast<__t*>(__self)->__op_);
98         }
99 
100         STDEXEC_IMMOVABLE_NO_UNIQUE_ADDRESS
101         connect_result_t<_Constrained, _Receiver> __op_;
102       };
103     };
104 
105     template <class _ConstrainedId>
106     struct __when_empty_sender {
107       using _Constrained = stdexec::__t<_ConstrainedId>;
108 
109       struct __t {
110         using __id = __when_empty_sender;
111         using sender_concept = stdexec::sender_t;
112 
113         template <class _Self, class _Receiver>
114         using __when_empty_op_t =
115           stdexec::__t<__when_empty_op<__cvref_id<_Self, _Constrained>, stdexec::__id<_Receiver>>>;
116 
117         template <__decays_to<__t> _Self, receiver _Receiver>
118           requires sender_to<__copy_cvref_t<_Self, _Constrained>, _Receiver>
119         [[nodiscard]]
120         static auto
connectexec::__scope::__when_empty_sender::__t121           connect(_Self&& __self, _Receiver __rcvr) -> __when_empty_op_t<_Self, _Receiver> {
122           return __when_empty_op_t<_Self, _Receiver>{
123             __self.__scope_, static_cast<_Self&&>(__self).__c_, static_cast<_Receiver&&>(__rcvr)};
124         }
125 
126         template <__decays_to<__t> _Self, class... _Env>
get_completion_signaturesexec::__scope::__when_empty_sender::__t127         static auto get_completion_signatures(_Self&&, _Env&&...)
128           -> __completion_signatures_of_t<__copy_cvref_t<_Self, _Constrained>, __env_t<_Env>...> {
129           return {};
130         }
131 
132         const __impl* __scope_;
133         STDEXEC_ATTRIBUTE(no_unique_address) _Constrained __c_;
134       };
135     };
136 
137     template <class _Constrained>
138     using __when_empty_sender_t = stdexec::__t<__when_empty_sender<__id<__decay_t<_Constrained>>>>;
139 
140     ////////////////////////////////////////////////////////////////////////////
141     // async_scope::nest implementation
142     template <class _ReceiverId>
143     struct __nest_op_base : __immovable {
144       using _Receiver = stdexec::__t<_ReceiverId>;
145       const __impl* __scope_;
146       STDEXEC_ATTRIBUTE(no_unique_address) _Receiver __rcvr_;
147     };
148 
149     template <class _ReceiverId>
150     struct __nest_rcvr {
151       using _Receiver = stdexec::__t<_ReceiverId>;
152 
153       struct __t {
154         using __id = __nest_rcvr;
155         using receiver_concept = stdexec::receiver_t;
156         __nest_op_base<_ReceiverId>* __op_;
157 
__completeexec::__scope::__nest_rcvr::__t158         static void __complete(const __impl* __scope) noexcept {
159           auto& __active = __scope->__active_;
160           if (__active.fetch_sub(1, std::memory_order_acq_rel) == 1) {
161             std::unique_lock __guard{__scope->__lock_};
162             auto __local_waiters = std::move(__scope->__waiters_);
163             __guard.unlock();
164             __scope = nullptr;
165             // do not access __scope
166             while (!__local_waiters.empty()) {
167               auto* __next = __local_waiters.pop_front();
168               __next->__notify_waiter(__next);
169               // __scope must be considered deleted
170             }
171           }
172         }
173 
174         template <class... _As>
175           requires __callable<set_value_t, _Receiver, _As...>
set_valueexec::__scope::__nest_rcvr::__t176         void set_value(_As&&... __as) noexcept {
177           auto __scope = __op_->__scope_;
178           stdexec::set_value(std::move(__op_->__rcvr_), static_cast<_As&&>(__as)...);
179           // do not access __op_
180           // do not access this
181           __complete(__scope);
182         }
183 
184         template <class _Error>
185           requires __callable<set_error_t, _Receiver, _Error>
set_errorexec::__scope::__nest_rcvr::__t186         void set_error(_Error&& __err) noexcept {
187           auto __scope = __op_->__scope_;
188           stdexec::set_error(std::move(__op_->__rcvr_), static_cast<_Error&&>(__err));
189           // do not access __op_
190           // do not access this
191           __complete(__scope);
192         }
193 
set_stoppedexec::__scope::__nest_rcvr::__t194         void set_stopped() noexcept
195           requires __callable<set_stopped_t, _Receiver>
196         {
197           auto __scope = __op_->__scope_;
198           stdexec::set_stopped(std::move(__op_->__rcvr_));
199           // do not access __op_
200           // do not access this
201           __complete(__scope);
202         }
203 
get_envexec::__scope::__nest_rcvr::__t204         auto get_env() const noexcept -> __env_t<env_of_t<_Receiver>> {
205           return make_env(
206             stdexec::get_env(__op_->__rcvr_),
207             stdexec::prop{get_stop_token, __op_->__scope_->__stop_source_.get_token()});
208         }
209       };
210     };
211 
212     template <class _ConstrainedId, class _ReceiverId>
213     struct __nest_op {
214       using _Constrained = stdexec::__t<_ConstrainedId>;
215       using _Receiver = stdexec::__t<_ReceiverId>;
216 
217       struct __t : __nest_op_base<_ReceiverId> {
218         using __id = __nest_op;
219         using __nest_rcvr_t = stdexec::__t<__nest_rcvr<_ReceiverId>>;
220         STDEXEC_IMMOVABLE_NO_UNIQUE_ADDRESS
221         connect_result_t<_Constrained, __nest_rcvr_t> __op_;
222 
223         template <__decays_to<_Constrained> _Sender, __decays_to<_Receiver> _Rcvr>
__texec::__scope::__nest_op::__t224         explicit __t(const __impl* __scope, _Sender&& __c, _Rcvr&& __rcvr)
225           : __nest_op_base<_ReceiverId>{{}, __scope, static_cast<_Rcvr&&>(__rcvr)}
226           , __op_(stdexec::connect(static_cast<_Sender&&>(__c), __nest_rcvr_t{this})) {
227         }
228 
startexec::__scope::__nest_op::__t229         void start() & noexcept {
230           STDEXEC_ASSERT(this->__scope_);
231           auto& __active = this->__scope_->__active_;
232           __active.fetch_add(1, std::memory_order_relaxed);
233           stdexec::start(__op_);
234         }
235       };
236     };
237 
238     template <class _ConstrainedId>
239     struct __nest_sender {
240       using _Constrained = stdexec::__t<_ConstrainedId>;
241 
242       struct __t {
243         using __id = __nest_sender;
244         using sender_concept = stdexec::sender_t;
245 
246         const __impl* __scope_;
247         STDEXEC_ATTRIBUTE(no_unique_address) _Constrained __c_;
248 
249         template <class _Receiver>
250         using __nest_operation_t =
251           stdexec::__t<__nest_op<_ConstrainedId, stdexec::__id<_Receiver>>>;
252 
253         template <class _Receiver>
254         using __nest_receiver_t = stdexec::__t<__nest_rcvr<stdexec::__id<_Receiver>>>;
255 
256         template <__decays_to<__t> _Self, receiver _Receiver>
257           requires sender_to<__copy_cvref_t<_Self, _Constrained>, __nest_receiver_t<_Receiver>>
258         [[nodiscard]]
connectexec::__scope::__nest_sender::__t259         static auto connect(_Self&& __self, _Receiver __rcvr) -> __nest_operation_t<_Receiver> {
260           return __nest_operation_t<_Receiver>{
261             __self.__scope_, static_cast<_Self&&>(__self).__c_, static_cast<_Receiver&&>(__rcvr)};
262         }
263 
264         template <__decays_to<__t> _Self, class... _Env>
get_completion_signaturesexec::__scope::__nest_sender::__t265         static auto get_completion_signatures(_Self&&, _Env&&...)
266           -> __completion_signatures_of_t<__copy_cvref_t<_Self, _Constrained>, __env_t<_Env>...> {
267           return {};
268         }
269       };
270     };
271 
272     template <class _Constrained>
273     using __nest_sender_t = stdexec::__t<__nest_sender<__id<__decay_t<_Constrained>>>>;
274 
275     ////////////////////////////////////////////////////////////////////////////
276     // async_scope::spawn_future implementation
277     enum class __future_step {
278       __invalid = 0,
279       __created,
280       __future,
281       __no_future,
282       __deleted
283     };
284 
285     template <class _Sender, class _Env>
286     struct __future_state;
287 
288     struct __forward_stopped {
289       inplace_stop_source* __stop_source_;
290 
operator ()exec::__scope::__forward_stopped291       void operator()() noexcept {
292         __stop_source_->request_stop();
293       }
294     };
295 
296     struct __subscription : __immovable {
297       void (*__complete_)(__subscription*) noexcept = nullptr;
298 
__completeexec::__scope::__subscription299       void __complete() noexcept {
300         __complete_(this);
301       }
302 
303       __subscription* __next_ = nullptr;
304     };
305 
306     template <class _SenderId, class _EnvId, class _ReceiverId>
307     struct __future_op {
308       using _Sender = stdexec::__t<_SenderId>;
309       using _Env = stdexec::__t<_EnvId>;
310       using _Receiver = stdexec::__t<_ReceiverId>;
311 
312       class __t : __subscription {
313         using __forward_consumer =
314           stop_token_of_t<env_of_t<_Receiver>>::template callback_type<__forward_stopped>;
315 
__complete_()316         void __complete_() noexcept {
317           STDEXEC_TRY {
318             __forward_consumer_.reset();
319             auto __state = std::move(__state_);
320             STDEXEC_ASSERT(__state != nullptr);
321             std::unique_lock __guard{__state->__mutex_};
322             // either the future is still in use or it has passed ownership to __state->__no_future_
323             if (
324               __state->__no_future_.get() != nullptr
325               || __state->__step_ != __future_step::__future) {
326               // invalid state - there is a code bug in the state machine
327               std::terminate();
328             } else if (get_stop_token(get_env(__rcvr_)).stop_requested()) {
329 
330               __guard.unlock();
331               stdexec::set_stopped(static_cast<_Receiver&&>(__rcvr_));
332               __guard.lock();
333             } else {
334               std::visit(
335                 [this, &__guard]<class _Tup>(_Tup& __tup) {
336                   if constexpr (same_as<_Tup, std::monostate>) {
337                     std::terminate();
338                   } else {
339                     std::apply(
340                       [this, &__guard]<class... _As>(auto tag, _As&... __as) {
341                         __guard.unlock();
342                         tag(static_cast<_Receiver&&>(__rcvr_), static_cast<_As&&>(__as)...);
343                         __guard.lock();
344                       },
345                       __tup);
346                   }
347                 },
348                 __state->__data_);
349             }
350           }
351           STDEXEC_CATCH_ALL {
352 
353             stdexec::set_error(static_cast<_Receiver&&>(__rcvr_), std::current_exception());
354           }
355         }
356 
357         STDEXEC_ATTRIBUTE(no_unique_address) _Receiver __rcvr_;
358         std::unique_ptr<__future_state<_Sender, _Env>> __state_;
359         STDEXEC_ATTRIBUTE(no_unique_address)
360         stdexec::__optional<__forward_consumer> __forward_consumer_;
361 
362        public:
363         using __id = __future_op;
364 
~__t()365         ~__t() noexcept {
366           if (__state_ != nullptr) {
367             auto __raw_state = __state_.get();
368             std::unique_lock __guard{__raw_state->__mutex_};
369             if (__raw_state->__data_.index() > 0) {
370               // completed given sender
371               // state is no longer needed
372               return;
373             }
374             __raw_state->__no_future_ = std::move(__state_);
375             __raw_state
376               ->__step_from_to_(__guard, __future_step::__future, __future_step::__no_future);
377           }
378         }
379 
380         template <class _Receiver2>
__t(_Receiver2 && __rcvr,std::unique_ptr<__future_state<_Sender,_Env>> __state)381         explicit __t(
382           _Receiver2&& __rcvr, std::unique_ptr<__future_state<_Sender, _Env>> __state)
383           : __subscription{{},
384             [](__subscription* __self) noexcept -> void {
385                 static_cast<__t*>(__self)->__complete_();
386             }}
387           , __rcvr_(static_cast<_Receiver2&&>(__rcvr))
388           , __state_(std::move(__state))
389           , __forward_consumer_(std::in_place, get_stop_token(get_env(__rcvr_)),
390               __forward_stopped{&__state_->__stop_source_}) {
391         }
392 
start()393         void start() & noexcept {
394           STDEXEC_TRY {
395             if (!!__state_) {
396               std::unique_lock __guard{__state_->__mutex_};
397               if (__state_->__data_.index() != 0) {
398                 __guard.unlock();
399                 __complete_();
400               } else {
401                 __state_->__subscribers_.push_back(this);
402               }
403             }
404           }
405           STDEXEC_CATCH_ALL {
406             stdexec::set_error(static_cast<_Receiver&&>(__rcvr_), std::current_exception());
407           }
408         }
409       };
410     };
411 
412 #if STDEXEC_EDG()
413     template <class _Fn>
414     struct __completion_as_tuple2_;
415 
416     template <class _Tag, class... _Ts>
417     struct __completion_as_tuple2_<_Tag(_Ts...)> {
418       using __t = std::tuple<_Tag, _Ts...>;
419     };
420     template <class _Fn>
421     using __completion_as_tuple_t = stdexec::__t<__completion_as_tuple2_<_Fn>>;
422 
423 #else
424 
425     template <class _Tag, class... _Ts>
426     auto __completion_as_tuple_(_Tag (*)(_Ts...)) -> std::tuple<_Tag, _Ts...>;
427 
428     template <class _Fn>
429     using __completion_as_tuple_t = decltype(__scope::__completion_as_tuple_(
430       static_cast<_Fn*>(nullptr)));
431 #endif
432 
433     template <class... _Ts>
434     using __decay_values_t = completion_signatures<set_value_t(__decay_t<_Ts>...)>;
435 
436     template <class _Ty>
437     using __decay_error_t = completion_signatures<set_error_t(__decay_t<_Ty>)>;
438 
439     template <class _Sender, class _Env>
440     using __future_completions_t = transform_completion_signatures_of<
441       _Sender,
442       __env_t<_Env>,
443       completion_signatures<set_stopped_t(), set_error_t(std::exception_ptr)>,
444       __decay_values_t,
445       __decay_error_t
446     >;
447 
448     template <class _Completions>
449     using __completions_as_variant = __mapply<
450       __mtransform<__q<__completion_as_tuple_t>, __mbind_front_q<std::variant, std::monostate>>,
451       _Completions
452     >;
453 
454     template <class _Ty>
455     struct __dynamic_delete {
__dynamic_deleteexec::__scope::__dynamic_delete456       __dynamic_delete()
457         : __delete_([](_Ty* __p) { delete __p; }) {
458       }
459 
460       template <class _Uy>
461         requires convertible_to<_Uy*, _Ty*>
__dynamic_deleteexec::__scope::__dynamic_delete462       __dynamic_delete(std::default_delete<_Uy>)
463         : __delete_([](_Ty* __p) { delete static_cast<_Uy*>(__p); }) {
464       }
465 
466       template <class _Uy>
467         requires convertible_to<_Uy*, _Ty*>
operator =exec::__scope::__dynamic_delete468       auto operator=(std::default_delete<_Uy> __d) -> __dynamic_delete& {
469         __delete_ = __dynamic_delete{__d}.__delete_;
470         return *this;
471       }
472 
operator ()exec::__scope::__dynamic_delete473       void operator()(_Ty* __p) {
474         __delete_(__p);
475       }
476 
477       void (*__delete_)(_Ty*);
478     };
479 
480     template <class _Completions, class _Env>
481     struct __future_state_base {
__future_state_baseexec::__scope::__future_state_base482       __future_state_base(_Env __env, const __impl* __scope)
483         : __forward_scope_{std::in_place, __scope->__stop_source_.get_token(), __forward_stopped{&__stop_source_}}
484         , __env_(make_env(
485             static_cast<_Env&&>(__env),
486             stdexec::prop{get_stop_token, __scope->__stop_source_.get_token()})) {
487       }
488 
~__future_state_baseexec::__scope::__future_state_base489       ~__future_state_base() {
490         std::unique_lock __guard{__mutex_};
491         if (__step_ == __future_step::__created) {
492           // exception during connect() will end up here
493           __step_from_to_(__guard, __future_step::__created, __future_step::__deleted);
494         } else if (__step_ != __future_step::__deleted) {
495           // completing the given sender before the future is dropped will end here
496           __step_from_to_(__guard, __future_step::__future, __future_step::__deleted);
497         }
498       }
499 
__step_from_to_exec::__scope::__future_state_base500       void __step_from_to_(
501         std::unique_lock<std::mutex>& __guard,
502         __future_step __from,
503         __future_step __to) {
504         STDEXEC_ASSERT(__guard.owns_lock());
505         auto actual = std::exchange(__step_, __to);
506         STDEXEC_ASSERT(actual == __from);
507       }
508 
509       inplace_stop_source __stop_source_;
510       stdexec::__optional<inplace_stop_callback<__forward_stopped>> __forward_scope_;
511       std::mutex __mutex_;
512       __future_step __step_ = __future_step::__created;
513       std::unique_ptr<__future_state_base, __dynamic_delete<__future_state_base>> __no_future_;
514       __completions_as_variant<_Completions> __data_;
515       __intrusive_queue<&__subscription::__next_> __subscribers_;
516       __env_t<_Env> __env_;
517     };
518 
519     template <class _Completions, class _EnvId>
520     struct __future_rcvr {
521       using _Env = stdexec::__t<_EnvId>;
522 
523       struct __t {
524         using __id = __future_rcvr;
525         using receiver_concept = stdexec::receiver_t;
526         __future_state_base<_Completions, _Env>* __state_;
527         const __impl* __scope_;
528 
__dispatch_result_exec::__scope::__future_rcvr::__t529         void __dispatch_result_(std::unique_lock<std::mutex>& __guard) noexcept {
530           auto& __state = *__state_;
531           auto __local_subscribers = std::move(__state.__subscribers_);
532           __state.__forward_scope_.reset();
533           if (__state.__no_future_.get() != nullptr) {
534             // nobody is waiting for the results
535             // delete this and return
536             __state.__step_from_to_(__guard, __future_step::__no_future, __future_step::__deleted);
537             __guard.unlock();
538             __state.__no_future_.reset();
539             return;
540           }
541           __guard.unlock();
542           while (!__local_subscribers.empty()) {
543             auto* __sub = __local_subscribers.pop_front();
544             __sub->__complete();
545           }
546         }
547 
548         template <class _Tag, class... _As>
__save_completionexec::__scope::__future_rcvr::__t549         void __save_completion(_Tag, _As&&... __as) noexcept {
550           auto& __state = *__state_;
551           STDEXEC_TRY {
552             using _Tuple = __decayed_std_tuple<_Tag, _As...>;
553             __state.__data_.template emplace<_Tuple>(_Tag(), static_cast<_As&&>(__as)...);
554           }
555           STDEXEC_CATCH_ALL {
556             using _Tuple = std::tuple<set_error_t, std::exception_ptr>;
557             __state.__data_.template emplace<_Tuple>(set_error_t(), std::current_exception());
558           }
559         }
560 
561         template <__movable_value... _As>
set_valueexec::__scope::__future_rcvr::__t562         void set_value(_As&&... __as) noexcept {
563           auto& __state = *__state_;
564           std::unique_lock __guard{__state.__mutex_};
565           __save_completion(set_value_t(), static_cast<_As&&>(__as)...);
566           __dispatch_result_(__guard);
567         }
568 
569         template <__movable_value _Error>
set_errorexec::__scope::__future_rcvr::__t570         void set_error(_Error&& __err) noexcept {
571           auto& __state = *__state_;
572           std::unique_lock __guard{__state.__mutex_};
573           __save_completion(set_error_t(), static_cast<_Error&&>(__err));
574           __dispatch_result_(__guard);
575         }
576 
set_stoppedexec::__scope::__future_rcvr::__t577         void set_stopped() noexcept {
578           auto& __state = *__state_;
579           std::unique_lock __guard{__state.__mutex_};
580           __save_completion(set_stopped_t());
581           __dispatch_result_(__guard);
582         }
583 
get_envexec::__scope::__future_rcvr::__t584         auto get_env() const noexcept -> const __env_t<_Env>& {
585           return __state_->__env_;
586         }
587       };
588     };
589 
590     template <class _Sender, class _Env>
591     using __future_receiver_t =
592       __t<__future_rcvr<__future_completions_t<_Sender, _Env>, __id<_Env>>>;
593 
594     template <class _Sender, class _Env>
595     struct __future_state : __future_state_base<__future_completions_t<_Sender, _Env>, _Env> {
596       using _Completions = __future_completions_t<_Sender, _Env>;
597 
__future_stateexec::__scope::__future_state598       __future_state(connect_t, _Sender&& __sndr, _Env __env, const __impl* __scope)
599         : __future_state_base<_Completions, _Env>(static_cast<_Env&&>(__env), __scope)
600         , __op_(static_cast<_Sender&&>(__sndr), __future_receiver_t<_Sender, _Env>{this, __scope}) {
601       }
602 
__future_stateexec::__scope::__future_state603       __future_state(_Sender __sndr, _Env __env, const __impl* __scope)
604         : __future_state(
605             stdexec::connect,
606             static_cast<_Sender&&>(__sndr),
607             static_cast<_Env&&>(__env),
608             __scope) {
609         // If the operation completes synchronously, then the following line will cause
610         // the destruction of *this, which is not a problem because we used a delegating
611         // constructor, so *this is considered fully constructed.
612         __op_.submit(
613           static_cast<_Sender&&>(__sndr), __future_receiver_t<_Sender, _Env>{this, __scope});
614       }
615 
STDEXEC_ATTRIBUTEexec::__scope::__future_state616       STDEXEC_ATTRIBUTE(no_unique_address)
617       submit_result<_Sender, __future_receiver_t<_Sender, _Env>> __op_{};
618     };
619 
620     template <class _SenderId, class _EnvId>
621     struct __future {
622       using _Sender = stdexec::__t<_SenderId>;
623       using _Env = stdexec::__t<_EnvId>;
624 
625       class __t {
626         template <class _Self>
627         using __completions_t = __future_completions_t<__mfront<_Sender, _Self>, _Env>;
628 
629         template <class _Receiver>
630         using __future_op_t =
631           stdexec::__t<__future_op<_SenderId, _EnvId, stdexec::__id<_Receiver>>>;
632 
633        public:
634         using __id = __future;
635         using sender_concept = stdexec::sender_t;
636 
637         __t(__t&&) = default;
638         auto operator=(__t&&) -> __t& = default;
639 
~__t()640         ~__t() noexcept {
641           if (__state_ != nullptr) {
642             auto __raw_state = __state_.get();
643             std::unique_lock __guard{__raw_state->__mutex_};
644             if (__raw_state->__data_.index() != 0) {
645               // completed given sender
646               // state is no longer needed
647               return;
648             }
649             __raw_state->__no_future_ = std::move(__state_);
650             __raw_state
651               ->__step_from_to_(__guard, __future_step::__future, __future_step::__no_future);
652           }
653         }
654 
655         template <__decays_to<__t> _Self, receiver _Receiver>
656           requires receiver_of<_Receiver, __completions_t<_Self>>
connect(_Self && __self,_Receiver __rcvr)657         static auto connect(_Self&& __self, _Receiver __rcvr) -> __future_op_t<_Receiver> {
658           return __future_op_t<_Receiver>{
659             static_cast<_Receiver&&>(__rcvr), static_cast<_Self&&>(__self).__state_};
660         }
661 
662         template <__decays_to<__t> _Self, class... _OtherEnv>
get_completion_signatures(_Self &&,_OtherEnv &&...)663         static auto get_completion_signatures(_Self&&, _OtherEnv&&...) -> __completions_t<_Self> {
664           return {};
665         }
666 
667        private:
668         friend struct async_scope;
669 
__t(std::unique_ptr<__future_state<_Sender,_Env>> __state)670         explicit __t(std::unique_ptr<__future_state<_Sender, _Env>> __state) noexcept
671           : __state_(std::move(__state)) {
672           std::unique_lock __guard{__state_->__mutex_};
673           __state_->__step_from_to_(__guard, __future_step::__created, __future_step::__future);
674         }
675 
676         std::unique_ptr<__future_state<_Sender, _Env>> __state_;
677       };
678     };
679 
680     template <class _Sender, class _Env>
681     using __future_t =
682       stdexec::__t<__future<__id<__nest_sender_t<_Sender>>, __id<__decay_t<_Env>>>>;
683 
684     ////////////////////////////////////////////////////////////////////////////
685     // async_scope::spawn implementation
686     struct __spawn_env_ {
687       inplace_stop_token __token_;
688 
689       [[nodiscard]]
queryexec::__scope::__spawn_env_690       auto query(get_stop_token_t) const noexcept -> inplace_stop_token {
691         return __token_;
692       }
693 
694       [[nodiscard]]
queryexec::__scope::__spawn_env_695       auto query(get_scheduler_t) const noexcept -> stdexec::inline_scheduler {
696         return {};
697       }
698     };
699 
700     template <class _Env>
701     using __spawn_env_t = __join_env_t<_Env, __spawn_env_>;
702 
703     template <class _EnvId>
704     struct __spawn_op_base {
705       using _Env = stdexec::__t<_EnvId>;
706       __spawn_env_t<_Env> __env_;
707       void (*__delete_)(__spawn_op_base*);
708     };
709 
710     template <class _EnvId>
711     struct __spawn_rcvr {
712       using _Env = stdexec::__t<_EnvId>;
713 
714       struct __t {
715         using __id = __spawn_rcvr;
716         using receiver_concept = stdexec::receiver_t;
717         __spawn_op_base<_EnvId>* __op_;
718 
set_valueexec::__scope::__spawn_rcvr::__t719         void set_value() noexcept {
720           __op_->__delete_(__op_);
721         }
722 
723         // BUGBUG NOT TO SPEC spawn shouldn't accept senders that can fail.
724         [[noreturn]]
set_errorexec::__scope::__spawn_rcvr::__t725         void set_error(std::exception_ptr __eptr) noexcept {
726           std::rethrow_exception(std::move(__eptr));
727         }
728 
set_stoppedexec::__scope::__spawn_rcvr::__t729         void set_stopped() noexcept {
730           __op_->__delete_(__op_);
731         }
732 
get_envexec::__scope::__spawn_rcvr::__t733         auto get_env() const noexcept -> const __spawn_env_t<_Env>& {
734           return __op_->__env_;
735         }
736       };
737     };
738 
739     template <class _Env>
740     using __spawn_receiver_t = stdexec::__t<__spawn_rcvr<__id<_Env>>>;
741 
742     template <class _SenderId, class _EnvId>
743     struct __spawn_op {
744       using _Env = stdexec::__t<_EnvId>;
745       using _Sender = stdexec::__t<_SenderId>;
746 
747       struct __t : __spawn_op_base<_EnvId> {
__texec::__scope::__spawn_op::__t748         __t(connect_t, _Sender&& __sndr, _Env __env, const __impl* __scope)
749           : __spawn_op_base<
750               _EnvId
751             >{__env::__join(
752                 static_cast<_Env&&>(__env),
753                 __spawn_env_{__scope->__stop_source_.get_token()}),
754               [](__spawn_op_base<_EnvId>* __op) { delete static_cast<__t*>(__op); }}
755           , __data_(static_cast<_Sender&&>(__sndr), __spawn_receiver_t<_Env>{this}) {
756         }
757 
__texec::__scope::__spawn_op::__t758         __t(_Sender __sndr, _Env __env, const __impl* __scope)
759           : __t(
760               stdexec::connect,
761               static_cast<_Sender&&>(__sndr),
762               static_cast<_Env&&>(__env),
763               __scope) {
764           // If the operation completes synchronously, then the following line will cause
765           // the destruction of *this, which is not a problem because we used a delegating
766           // constructor, so *this is considered fully constructed.
767           __data_.submit(static_cast<_Sender&&>(__sndr), __spawn_receiver_t<_Env>{this});
768         }
769 
770         STDEXEC_ATTRIBUTE(no_unique_address)
771         submit_result<_Sender, __spawn_receiver_t<_Env>> __data_;
772       };
773     };
774 
775     template <class _Sender, class _Env>
776     using __spawn_operation_t = stdexec::__t<__spawn_op<__id<_Sender>, __id<_Env>>>;
777 
778     ////////////////////////////////////////////////////////////////////////////
779     // async_scope
780     struct async_scope : __immovable {
781       async_scope() = default;
782 
783       template <sender _Constrained>
784       [[nodiscard]]
when_emptyexec::__scope::async_scope785       auto when_empty(_Constrained&& __c) const -> __when_empty_sender_t<_Constrained> {
786         return __when_empty_sender_t<_Constrained>{&__impl_, static_cast<_Constrained&&>(__c)};
787       }
788 
789       [[nodiscard]]
on_emptyexec::__scope::async_scope790       auto on_empty() const {
791         return when_empty(just());
792       }
793 
794       template <sender _Constrained>
795       using nest_result_t = __nest_sender_t<_Constrained>;
796 
797       template <sender _Constrained>
798       [[nodiscard]]
nestexec::__scope::async_scope799       auto nest(_Constrained&& __c) -> nest_result_t<_Constrained> {
800         return nest_result_t<_Constrained>{&__impl_, static_cast<_Constrained&&>(__c)};
801       }
802 
803       template <__movable_value _Env = env<>, sender_in<__spawn_env_t<_Env>> _Sender>
804         requires sender_to<nest_result_t<_Sender>, __spawn_receiver_t<_Env>>
spawnexec::__scope::async_scope805       void spawn(_Sender&& __sndr, _Env __env = {}) {
806         using __op_t = __spawn_operation_t<nest_result_t<_Sender>, _Env>;
807         // this will connect and start the operation, after which the operation state is
808         // responsible for deleting itself after it completes.
809         [[maybe_unused]]
810         auto* __op =
811           new __op_t{nest(static_cast<_Sender&&>(__sndr)), static_cast<_Env&&>(__env), &__impl_};
812       }
813 
814       template <__movable_value _Env = env<>, sender_in<__env_t<_Env>> _Sender>
spawn_futureexec::__scope::async_scope815       auto spawn_future(_Sender&& __sndr, _Env __env = {}) -> __future_t<_Sender, _Env> {
816         using __state_t = __future_state<nest_result_t<_Sender>, _Env>;
817         auto __state = std::make_unique<__state_t>(
818           nest(static_cast<_Sender&&>(__sndr)), static_cast<_Env&&>(__env), &__impl_);
819         return __future_t<_Sender, _Env>{std::move(__state)};
820       }
821 
get_stop_sourceexec::__scope::async_scope822       auto get_stop_source() noexcept -> inplace_stop_source& {
823         return __impl_.__stop_source_;
824       }
825 
get_stop_tokenexec::__scope::async_scope826       auto get_stop_token() const noexcept -> inplace_stop_token {
827         return __impl_.__stop_source_.get_token();
828       }
829 
request_stopexec::__scope::async_scope830       auto request_stop() noexcept -> bool {
831         return __impl_.__stop_source_.request_stop();
832       }
833 
834      private:
835       __impl __impl_;
836     };
837   } // namespace __scope
838 
839   using __scope::async_scope;
840 
841   template <class _AsyncScope, class _Sender>
842   using nest_result_t = decltype(stdexec::__declval<_AsyncScope&>()
843                                    .nest(stdexec::__declval<_Sender&&>()));
844 } // namespace exec
845