xref: /openbmc/sdbusplus/include/sdbusplus/async/stdexec/async_scope.hpp (revision 36137e09614746b13603b5fbae79e6f70819c46b)
1 /*
2  * Copyright (c) 2021-2024 NVIDIA Corporation
3  *
4  * Licensed under the Apache License Version 2.0 with LLVM Exceptions
5  * (the "License"); you may not use this file except in compliance with
6  * the License. You may obtain a copy of the License at
7  *
8  *   https://llvm.org/LICENSE.txt
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
18 #include "../stdexec/__detail/__intrusive_queue.hpp"
19 #include "../stdexec/__detail/__optional.hpp"
20 #include "../stdexec/execution.hpp"
21 #include "../stdexec/stop_token.hpp"
22 #include "env.hpp"
23 
24 #include <mutex>
25 
26 namespace exec
27 {
28 /////////////////////////////////////////////////////////////////////////////
29 // async_scope
30 namespace __scope
31 {
32 using namespace stdexec;
33 
34 struct __impl;
35 struct async_scope;
36 
37 template <class _A>
38 concept __async_scope = requires(_A& __a) {
39                             {
40                                 __a.nest(stdexec::just())
41                             } -> sender_of<stdexec::set_value_t()>;
42                         };
43 
44 struct __task : __immovable
45 {
46     const __impl* __scope_;
47     void (*__notify_waiter)(__task*) noexcept;
48     __task* __next_ = nullptr;
49 };
50 
51 template <class _BaseEnv>
52 using __env_t =
53     make_env_t<_BaseEnv, prop<get_stop_token_t, inplace_stop_token>>;
54 
55 struct __impl
56 {
57     inplace_stop_source __stop_source_{};
58     mutable std::mutex __lock_{};
59     mutable std::ptrdiff_t __active_ = 0;
60     mutable __intrusive_queue<&__task::__next_> __waiters_{};
61 
~__implexec::__scope::__impl62     ~__impl()
63     {
64         std::unique_lock __guard{__lock_};
65         STDEXEC_ASSERT(__active_ == 0);
66         STDEXEC_ASSERT(__waiters_.empty());
67     }
68 };
69 
70 ////////////////////////////////////////////////////////////////////////////
71 // async_scope::when_empty implementation
72 template <class _ConstrainedId, class _ReceiverId>
73 struct __when_empty_op
74 {
75     using _Constrained = __cvref_t<_ConstrainedId>;
76     using _Receiver = stdexec::__t<_ReceiverId>;
77 
78     struct __t : __task
79     {
80         using __id = __when_empty_op;
81 
__texec::__scope::__when_empty_op::__t82         explicit __t(const __impl* __scope, _Constrained&& __sndr,
83                      _Receiver __rcvr) :
84             __task{{}, __scope, __notify_waiter},
85             __op_(stdexec::connect(static_cast<_Constrained&&>(__sndr),
86                                    static_cast<_Receiver&&>(__rcvr)))
87         {}
88 
startexec::__scope::__when_empty_op::__t89         void start() & noexcept
90         {
91             std::unique_lock __guard{this->__scope_->__lock_};
92             auto& __active = this->__scope_->__active_;
93             auto& __waiters = this->__scope_->__waiters_;
94             if (__active != 0)
95             {
96                 __waiters.push_back(this);
97                 return;
98             }
99             __guard.unlock();
100             stdexec::start(this->__op_);
101         }
102 
103       private:
__notify_waiterexec::__scope::__when_empty_op::__t104         static void __notify_waiter(__task* __self) noexcept
105         {
106             stdexec::start(static_cast<__t*>(__self)->__op_);
107         }
108 
109         STDEXEC_IMMOVABLE_NO_UNIQUE_ADDRESS
110         connect_result_t<_Constrained, _Receiver> __op_;
111     };
112 };
113 
114 template <class _ConstrainedId>
115 struct __when_empty_sender
116 {
117     using _Constrained = stdexec::__t<_ConstrainedId>;
118 
119     struct __t
120     {
121         using __id = __when_empty_sender;
122         using sender_concept = stdexec::sender_t;
123 
124         template <class _Self, class _Receiver>
125         using __when_empty_op_t =
126             stdexec::__t<__when_empty_op<__cvref_id<_Self, _Constrained>,
127                                          stdexec::__id<_Receiver>>>;
128 
129         template <__decays_to<__t> _Self, receiver _Receiver>
130             requires sender_to<__copy_cvref_t<_Self, _Constrained>, _Receiver>
connectexec::__scope::__when_empty_sender::__t131         [[nodiscard]] static auto connect(_Self&& __self, _Receiver __rcvr) //
132             -> __when_empty_op_t<_Self, _Receiver>
133         {
134             return __when_empty_op_t<_Self, _Receiver>{
135                 __self.__scope_, static_cast<_Self&&>(__self).__c_,
136                 static_cast<_Receiver&&>(__rcvr)};
137         }
138 
139         template <__decays_to<__t> _Self, class... _Env>
get_completion_signaturesexec::__scope::__when_empty_sender::__t140         static auto get_completion_signatures(_Self&&, _Env&&...)
141             -> __completion_signatures_of_t<__copy_cvref_t<_Self, _Constrained>,
142                                             __env_t<_Env>...>
143         {
144             return {};
145         }
146 
147         const __impl* __scope_;
148         STDEXEC_ATTRIBUTE((no_unique_address))
149         _Constrained __c_;
150     };
151 };
152 
153 template <class _Constrained>
154 using __when_empty_sender_t =
155     stdexec::__t<__when_empty_sender<__id<__decay_t<_Constrained>>>>;
156 
157 ////////////////////////////////////////////////////////////////////////////
158 // async_scope::nest implementation
159 template <class _ReceiverId>
160 struct __nest_op_base : __immovable
161 {
162     using _Receiver = stdexec::__t<_ReceiverId>;
163     const __impl* __scope_;
164     STDEXEC_ATTRIBUTE((no_unique_address))
165     _Receiver __rcvr_;
166 };
167 
168 template <class _ReceiverId>
169 struct __nest_rcvr
170 {
171     using _Receiver = stdexec::__t<_ReceiverId>;
172 
173     struct __t
174     {
175         using __id = __nest_rcvr;
176         using receiver_concept = stdexec::receiver_t;
177         __nest_op_base<_ReceiverId>* __op_;
178 
__completeexec::__scope::__nest_rcvr::__t179         static void __complete(const __impl* __scope) noexcept
180         {
181             std::unique_lock __guard{__scope->__lock_};
182             auto& __active = __scope->__active_;
183             if (--__active == 0)
184             {
185                 auto __local_waiters = std::move(__scope->__waiters_);
186                 __guard.unlock();
187                 __scope = nullptr;
188                 // do not access __scope
189                 while (!__local_waiters.empty())
190                 {
191                     auto* __next = __local_waiters.pop_front();
192                     __next->__notify_waiter(__next);
193                     // __scope must be considered deleted
194                 }
195             }
196         }
197 
198         template <class... _As>
199             requires __callable<set_value_t, _Receiver, _As...>
set_valueexec::__scope::__nest_rcvr::__t200         void set_value(_As&&... __as) noexcept
201         {
202             auto __scope = __op_->__scope_;
203             stdexec::set_value(std::move(__op_->__rcvr_),
204                                static_cast<_As&&>(__as)...);
205             // do not access __op_
206             // do not access this
207             __complete(__scope);
208         }
209 
210         template <class _Error>
211             requires __callable<set_error_t, _Receiver, _Error>
set_errorexec::__scope::__nest_rcvr::__t212         void set_error(_Error&& __err) noexcept
213         {
214             auto __scope = __op_->__scope_;
215             stdexec::set_error(std::move(__op_->__rcvr_),
216                                static_cast<_Error&&>(__err));
217             // do not access __op_
218             // do not access this
219             __complete(__scope);
220         }
221 
set_stoppedexec::__scope::__nest_rcvr::__t222         void set_stopped() noexcept
223             requires __callable<set_stopped_t, _Receiver>
224         {
225             auto __scope = __op_->__scope_;
226             stdexec::set_stopped(std::move(__op_->__rcvr_));
227             // do not access __op_
228             // do not access this
229             __complete(__scope);
230         }
231 
get_envexec::__scope::__nest_rcvr::__t232         auto get_env() const noexcept -> __env_t<env_of_t<_Receiver>>
233         {
234             return make_env(
235                 stdexec::get_env(__op_->__rcvr_),
236                 stdexec::prop{get_stop_token,
237                               __op_->__scope_->__stop_source_.get_token()});
238         }
239     };
240 };
241 
242 template <class _ConstrainedId, class _ReceiverId>
243 struct __nest_op
244 {
245     using _Constrained = stdexec::__t<_ConstrainedId>;
246     using _Receiver = stdexec::__t<_ReceiverId>;
247 
248     struct __t : __nest_op_base<_ReceiverId>
249     {
250         using __id = __nest_op;
251         using __nest_rcvr_t = stdexec::__t<__nest_rcvr<_ReceiverId>>;
252         STDEXEC_IMMOVABLE_NO_UNIQUE_ADDRESS
253         connect_result_t<_Constrained, __nest_rcvr_t> __op_;
254 
255         template <__decays_to<_Constrained> _Sender,
256                   __decays_to<_Receiver> _Rcvr>
__texec::__scope::__nest_op::__t257         explicit __t(const __impl* __scope, _Sender&& __c, _Rcvr&& __rcvr) :
258             __nest_op_base<_ReceiverId>{
259                 {}, __scope, static_cast<_Rcvr&&>(__rcvr)},
260             __op_(stdexec::connect(static_cast<_Sender&&>(__c),
261                                    __nest_rcvr_t{this}))
262         {}
263 
startexec::__scope::__nest_op::__t264         void start() & noexcept
265         {
266             STDEXEC_ASSERT(this->__scope_);
267             std::unique_lock __guard{this->__scope_->__lock_};
268             auto& __active = this->__scope_->__active_;
269             ++__active;
270             __guard.unlock();
271             stdexec::start(__op_);
272         }
273     };
274 };
275 
276 template <class _ConstrainedId>
277 struct __nest_sender
278 {
279     using _Constrained = stdexec::__t<_ConstrainedId>;
280 
281     struct __t
282     {
283         using __id = __nest_sender;
284         using sender_concept = stdexec::sender_t;
285 
286         const __impl* __scope_;
287         STDEXEC_ATTRIBUTE((no_unique_address))
288         _Constrained __c_;
289 
290         template <class _Receiver>
291         using __nest_operation_t =
292             stdexec::__t<__nest_op<_ConstrainedId, stdexec::__id<_Receiver>>>;
293 
294         template <class _Receiver>
295         using __nest_receiver_t =
296             stdexec::__t<__nest_rcvr<stdexec::__id<_Receiver>>>;
297 
298         template <__decays_to<__t> _Self, receiver _Receiver>
299             requires sender_to<__copy_cvref_t<_Self, _Constrained>,
300                                __nest_receiver_t<_Receiver>>
connectexec::__scope::__nest_sender::__t301         [[nodiscard]] static auto connect(_Self&& __self, _Receiver __rcvr)
302             -> __nest_operation_t<_Receiver>
303         {
304             return __nest_operation_t<_Receiver>{
305                 __self.__scope_, static_cast<_Self&&>(__self).__c_,
306                 static_cast<_Receiver&&>(__rcvr)};
307         }
308 
309         template <__decays_to<__t> _Self, class... _Env>
get_completion_signaturesexec::__scope::__nest_sender::__t310         static auto get_completion_signatures(_Self&&, _Env&&...)
311             -> __completion_signatures_of_t<__copy_cvref_t<_Self, _Constrained>,
312                                             __env_t<_Env>...>
313         {
314             return {};
315         }
316     };
317 };
318 
319 template <class _Constrained>
320 using __nest_sender_t =
321     stdexec::__t<__nest_sender<__id<__decay_t<_Constrained>>>>;
322 
323 ////////////////////////////////////////////////////////////////////////////
324 // async_scope::spawn_future implementation
325 enum class __future_step
326 {
327     __invalid = 0,
328     __created,
329     __future,
330     __no_future,
331     __deleted
332 };
333 
334 template <class _Sender, class _Env>
335 struct __future_state;
336 
337 struct __forward_stopped
338 {
339     inplace_stop_source* __stop_source_;
340 
operator ()exec::__scope::__forward_stopped341     void operator()() noexcept
342     {
343         __stop_source_->request_stop();
344     }
345 };
346 
347 struct __subscription : __immovable
348 {
349     void (*__complete_)(__subscription*) noexcept = nullptr;
350 
__completeexec::__scope::__subscription351     void __complete() noexcept
352     {
353         __complete_(this);
354     }
355 
356     __subscription* __next_ = nullptr;
357 };
358 
359 template <class _SenderId, class _EnvId, class _ReceiverId>
360 struct __future_op
361 {
362     using _Sender = stdexec::__t<_SenderId>;
363     using _Env = stdexec::__t<_EnvId>;
364     using _Receiver = stdexec::__t<_ReceiverId>;
365 
366     class __t : __subscription
367     {
368         using __forward_consumer = typename stop_token_of_t<
369             env_of_t<_Receiver>>::template callback_type<__forward_stopped>;
370 
__complete_()371         void __complete_() noexcept
372         {
373             try
374             {
375                 __forward_consumer_.reset();
376                 auto __state = std::move(__state_);
377                 STDEXEC_ASSERT(__state != nullptr);
378                 std::unique_lock __guard{__state->__mutex_};
379                 // either the future is still in use or it has passed ownership
380                 // to __state->__no_future_
381                 if (__state->__no_future_.get() != nullptr ||
382                     __state->__step_ != __future_step::__future)
383                 {
384                     // invalid state - there is a code bug in the state machine
385                     std::terminate();
386                 }
387                 else if (get_stop_token(get_env(__rcvr_)).stop_requested())
388                 {
389                     __guard.unlock();
390                     stdexec::set_stopped(static_cast<_Receiver&&>(__rcvr_));
391                     __guard.lock();
392                 }
393                 else
394                 {
395                     std::visit(
396                         [this, &__guard]<class _Tup>(_Tup& __tup) {
397                             if constexpr (same_as<_Tup, std::monostate>)
398                             {
399                                 std::terminate();
400                             }
401                             else
402                             {
403                                 std::apply(
404                                     [this, &__guard]<class... _As>(
405                                         auto tag, _As&... __as) {
406                                         __guard.unlock();
407                                         tag(static_cast<_Receiver&&>(__rcvr_),
408                                             static_cast<_As&&>(__as)...);
409                                         __guard.lock();
410                                     },
411                                     __tup);
412                             }
413                         },
414                         __state->__data_);
415                 }
416             }
417             catch (...)
418             {
419                 stdexec::set_error(static_cast<_Receiver&&>(__rcvr_),
420                                    std::current_exception());
421             }
422         }
423 
424         STDEXEC_ATTRIBUTE((no_unique_address))
425         _Receiver __rcvr_;
426         std::unique_ptr<__future_state<_Sender, _Env>> __state_;
427         STDEXEC_ATTRIBUTE((no_unique_address))
428         stdexec::__optional<__forward_consumer> __forward_consumer_;
429 
430       public:
431         using __id = __future_op;
432 
~__t()433         ~__t() noexcept
434         {
435             if (__state_ != nullptr)
436             {
437                 auto __raw_state = __state_.get();
438                 std::unique_lock __guard{__raw_state->__mutex_};
439                 if (__raw_state->__data_.index() > 0)
440                 {
441                     // completed given sender
442                     // state is no longer needed
443                     return;
444                 }
445                 __raw_state->__no_future_ = std::move(__state_);
446                 __raw_state->__step_from_to_(__guard, __future_step::__future,
447                                              __future_step::__no_future);
448             }
449         }
450 
451         template <class _Receiver2>
__t(_Receiver2 && __rcvr,std::unique_ptr<__future_state<_Sender,_Env>> __state)452         explicit __t(_Receiver2&& __rcvr,
453                      std::unique_ptr<__future_state<_Sender, _Env>> __state) :
454             __subscription{{},
455                            [](__subscription* __self) noexcept -> void {
456                                static_cast<__t*>(__self)->__complete_();
457                            }},
458             __rcvr_(static_cast<_Receiver2&&>(__rcvr)),
459             __state_(std::move(__state)),
460             __forward_consumer_(std::in_place, get_stop_token(get_env(__rcvr_)),
461                                 __forward_stopped{&__state_->__stop_source_})
462         {}
463 
start()464         void start() & noexcept
465         {
466             try
467             {
468                 if (!!__state_)
469                 {
470                     std::unique_lock __guard{__state_->__mutex_};
471                     if (__state_->__data_.index() != 0)
472                     {
473                         __guard.unlock();
474                         __complete_();
475                     }
476                     else
477                     {
478                         __state_->__subscribers_.push_back(this);
479                     }
480                 }
481             }
482             catch (...)
483             {
484                 stdexec::set_error(static_cast<_Receiver&&>(__rcvr_),
485                                    std::current_exception());
486             }
487         }
488     };
489 };
490 
491 #if STDEXEC_EDG()
492 template <class _Fn>
493 struct __completion_as_tuple2_;
494 
495 template <class _Tag, class... _Ts>
496 struct __completion_as_tuple2_<_Tag(_Ts...)>
497 {
498     using __t = std::tuple<_Tag, _Ts...>;
499 };
500 template <class _Fn>
501 using __completion_as_tuple_t = stdexec::__t<__completion_as_tuple2_<_Fn>>;
502 
503 #else
504 
505 template <class _Tag, class... _Ts>
506 auto __completion_as_tuple_(_Tag (*)(_Ts...)) -> std::tuple<_Tag, _Ts...>;
507 
508 template <class _Fn>
509 using __completion_as_tuple_t =
510     decltype(__scope::__completion_as_tuple_(static_cast<_Fn*>(nullptr)));
511 #endif
512 
513 template <class... _Ts>
514 using __decay_values_t = completion_signatures<set_value_t(__decay_t<_Ts>...)>;
515 
516 template <class _Ty>
517 using __decay_error_t = completion_signatures<set_error_t(__decay_t<_Ty>)>;
518 
519 template <class _Sender, class _Env>
520 using __future_completions_t = //
521     transform_completion_signatures_of<
522         _Sender, __env_t<_Env>,
523         completion_signatures<set_stopped_t(), set_error_t(std::exception_ptr)>,
524         __decay_values_t, __decay_error_t>;
525 
526 template <class _Completions>
527 using __completions_as_variant = //
528     __mapply<__mtransform<__q<__completion_as_tuple_t>,
529                           __mbind_front_q<std::variant, std::monostate>>,
530              _Completions>;
531 
532 template <class _Ty>
533 struct __dynamic_delete
534 {
__anonf62d8bc60202exec::__scope::__dynamic_delete535     __dynamic_delete() : __delete_([](_Ty* __p) { delete __p; }) {}
536 
537     template <class _Uy>
538         requires convertible_to<_Uy*, _Ty*>
__dynamic_deleteexec::__scope::__dynamic_delete539     __dynamic_delete(std::default_delete<_Uy>) :
540         __delete_([](_Ty* __p) { delete static_cast<_Uy*>(__p); })
541     {}
542 
543     template <class _Uy>
544         requires convertible_to<_Uy*, _Ty*>
operator =exec::__scope::__dynamic_delete545     auto operator=(std::default_delete<_Uy> __d) -> __dynamic_delete&
546     {
547         __delete_ = __dynamic_delete{__d}.__delete_;
548         return *this;
549     }
550 
operator ()exec::__scope::__dynamic_delete551     void operator()(_Ty* __p)
552     {
553         __delete_(__p);
554     }
555 
556     void (*__delete_)(_Ty*);
557 };
558 
559 template <class _Completions, class _Env>
560 struct __future_state_base
561 {
__future_state_baseexec::__scope::__future_state_base562     __future_state_base(_Env __env, const __impl* __scope) :
563         __forward_scope_{std::in_place, __scope->__stop_source_.get_token(),
564                          __forward_stopped{&__stop_source_}},
565         __env_(make_env(
566             static_cast<_Env&&>(__env),
567             stdexec::prop{get_stop_token, __scope->__stop_source_.get_token()}))
568     {}
569 
~__future_state_baseexec::__scope::__future_state_base570     ~__future_state_base()
571     {
572         std::unique_lock __guard{__mutex_};
573         if (__step_ == __future_step::__created)
574         {
575             // exception during connect() will end up here
576             __step_from_to_(__guard, __future_step::__created,
577                             __future_step::__deleted);
578         }
579         else if (__step_ != __future_step::__deleted)
580         {
581             // completing the given sender before the future is dropped will end
582             // here
583             __step_from_to_(__guard, __future_step::__future,
584                             __future_step::__deleted);
585         }
586     }
587 
__step_from_to_exec::__scope::__future_state_base588     void __step_from_to_(std::unique_lock<std::mutex>& __guard,
589                          __future_step __from, __future_step __to)
590     {
591         STDEXEC_ASSERT(__guard.owns_lock());
592         auto actual = std::exchange(__step_, __to);
593         STDEXEC_ASSERT(actual == __from);
594     }
595 
596     inplace_stop_source __stop_source_;
597     stdexec::__optional<inplace_stop_callback<__forward_stopped>>
598         __forward_scope_;
599     std::mutex __mutex_;
600     __future_step __step_ = __future_step::__created;
601     std::unique_ptr<__future_state_base, __dynamic_delete<__future_state_base>>
602         __no_future_;
603     __completions_as_variant<_Completions> __data_;
604     __intrusive_queue<&__subscription::__next_> __subscribers_;
605     __env_t<_Env> __env_;
606 };
607 
608 template <class _Completions, class _EnvId>
609 struct __future_rcvr
610 {
611     using _Env = stdexec::__t<_EnvId>;
612 
613     struct __t
614     {
615         using __id = __future_rcvr;
616         using receiver_concept = stdexec::receiver_t;
617         __future_state_base<_Completions, _Env>* __state_;
618         const __impl* __scope_;
619 
__dispatch_result_exec::__scope::__future_rcvr::__t620         void __dispatch_result_(std::unique_lock<std::mutex>& __guard) noexcept
621         {
622             auto& __state = *__state_;
623             auto __local_subscribers = std::move(__state.__subscribers_);
624             __state.__forward_scope_.reset();
625             if (__state.__no_future_.get() != nullptr)
626             {
627                 // nobody is waiting for the results
628                 // delete this and return
629                 __state.__step_from_to_(__guard, __future_step::__no_future,
630                                         __future_step::__deleted);
631                 __guard.unlock();
632                 __state.__no_future_.reset();
633                 return;
634             }
635             __guard.unlock();
636             while (!__local_subscribers.empty())
637             {
638                 auto* __sub = __local_subscribers.pop_front();
639                 __sub->__complete();
640             }
641         }
642 
643         template <class _Tag, class... _As>
__save_completionexec::__scope::__future_rcvr::__t644         void __save_completion(_Tag, _As&&... __as) noexcept
645         {
646             auto& __state = *__state_;
647             try
648             {
649                 using _Tuple = __decayed_std_tuple<_Tag, _As...>;
650                 __state.__data_.template emplace<_Tuple>(
651                     _Tag(), static_cast<_As&&>(__as)...);
652             }
653             catch (...)
654             {
655                 using _Tuple = std::tuple<set_error_t, std::exception_ptr>;
656                 __state.__data_.template emplace<_Tuple>(
657                     set_error_t(), std::current_exception());
658             }
659         }
660 
661         template <__movable_value... _As>
set_valueexec::__scope::__future_rcvr::__t662         void set_value(_As&&... __as) noexcept
663         {
664             auto& __state = *__state_;
665             std::unique_lock __guard{__state.__mutex_};
666             __save_completion(set_value_t(), static_cast<_As&&>(__as)...);
667             __dispatch_result_(__guard);
668         }
669 
670         template <__movable_value _Error>
set_errorexec::__scope::__future_rcvr::__t671         void set_error(_Error&& __err) noexcept
672         {
673             auto& __state = *__state_;
674             std::unique_lock __guard{__state.__mutex_};
675             __save_completion(set_error_t(), static_cast<_Error&&>(__err));
676             __dispatch_result_(__guard);
677         }
678 
set_stoppedexec::__scope::__future_rcvr::__t679         void set_stopped() noexcept
680         {
681             auto& __state = *__state_;
682             std::unique_lock __guard{__state.__mutex_};
683             __save_completion(set_stopped_t());
684             __dispatch_result_(__guard);
685         }
686 
get_envexec::__scope::__future_rcvr::__t687         auto get_env() const noexcept -> const __env_t<_Env>&
688         {
689             return __state_->__env_;
690         }
691     };
692 };
693 
694 template <class _Sender, class _Env>
695 using __future_receiver_t =
696     __t<__future_rcvr<__future_completions_t<_Sender, _Env>, __id<_Env>>>;
697 
698 template <class _Sender, class _Env>
699 struct __future_state :
700     __future_state_base<__future_completions_t<_Sender, _Env>, _Env>
701 {
702     using _Completions = __future_completions_t<_Sender, _Env>;
703 
__future_stateexec::__scope::__future_state704     __future_state(_Sender __sndr, _Env __env, const __impl* __scope) :
705         __future_state_base<_Completions, _Env>(static_cast<_Env&&>(__env),
706                                                 __scope),
707         __op_(
708             stdexec::connect(static_cast<_Sender&&>(__sndr),
709                              __future_receiver_t<_Sender, _Env>{this, __scope}))
710     {}
711 
712     connect_result_t<_Sender, __future_receiver_t<_Sender, _Env>> __op_;
713 };
714 
715 template <class _SenderId, class _EnvId>
716 struct __future
717 {
718     using _Sender = stdexec::__t<_SenderId>;
719     using _Env = stdexec::__t<_EnvId>;
720 
721     class __t
722     {
723         template <class _Self>
724         using __completions_t =
725             __future_completions_t<__mfront<_Sender, _Self>, _Env>;
726 
727         template <class _Receiver>
728         using __future_op_t = stdexec::__t<
729             __future_op<_SenderId, _EnvId, stdexec::__id<_Receiver>>>;
730 
731       public:
732         using __id = __future;
733         using sender_concept = stdexec::sender_t;
734 
735         __t(__t&&) = default;
736         auto operator=(__t&&) -> __t& = default;
737 
~__t()738         ~__t() noexcept
739         {
740             if (__state_ != nullptr)
741             {
742                 auto __raw_state = __state_.get();
743                 std::unique_lock __guard{__raw_state->__mutex_};
744                 if (__raw_state->__data_.index() != 0)
745                 {
746                     // completed given sender
747                     // state is no longer needed
748                     return;
749                 }
750                 __raw_state->__no_future_ = std::move(__state_);
751                 __raw_state->__step_from_to_(__guard, __future_step::__future,
752                                              __future_step::__no_future);
753             }
754         }
755 
756         template <__decays_to<__t> _Self, receiver _Receiver>
757             requires receiver_of<_Receiver, __completions_t<_Self>>
connect(_Self && __self,_Receiver __rcvr)758         static auto connect(_Self&& __self, _Receiver __rcvr)
759             -> __future_op_t<_Receiver>
760         {
761             return __future_op_t<_Receiver>{
762                 static_cast<_Receiver&&>(__rcvr),
763                 static_cast<_Self&&>(__self).__state_};
764         }
765 
766         template <__decays_to<__t> _Self, class... _OtherEnv>
get_completion_signatures(_Self &&,_OtherEnv &&...)767         static auto get_completion_signatures(_Self&&, _OtherEnv&&...)
768             -> __completions_t<_Self>
769         {
770             return {};
771         }
772 
773       private:
774         friend struct async_scope;
775 
__t(std::unique_ptr<__future_state<_Sender,_Env>> __state)776         explicit __t(
777             std::unique_ptr<__future_state<_Sender, _Env>> __state) noexcept :
778             __state_(std::move(__state))
779         {
780             std::unique_lock __guard{__state_->__mutex_};
781             __state_->__step_from_to_(__guard, __future_step::__created,
782                                       __future_step::__future);
783         }
784 
785         std::unique_ptr<__future_state<_Sender, _Env>> __state_;
786     };
787 };
788 
789 template <class _Sender, class _Env>
790 using __future_t = stdexec::__t<
791     __future<__id<__nest_sender_t<_Sender>>, __id<__decay_t<_Env>>>>;
792 
793 ////////////////////////////////////////////////////////////////////////////
794 // async_scope::spawn implementation
795 struct __spawn_env_
796 {
797     inplace_stop_token __token_;
798 
queryexec::__scope::__spawn_env_799     auto query(get_stop_token_t) const noexcept -> inplace_stop_token
800     {
801         return __token_;
802     }
803 
queryexec::__scope::__spawn_env_804     auto query(get_scheduler_t) const noexcept -> __inln::__scheduler
805     {
806         return {};
807     }
808 };
809 
810 template <class _Env>
811 using __spawn_env_t = __env::__join_t<_Env, __spawn_env_>;
812 
813 template <class _EnvId>
814 struct __spawn_op_base
815 {
816     using _Env = stdexec::__t<_EnvId>;
817     __spawn_env_t<_Env> __env_;
818     void (*__delete_)(__spawn_op_base*);
819 };
820 
821 template <class _EnvId>
822 struct __spawn_rcvr
823 {
824     using _Env = stdexec::__t<_EnvId>;
825 
826     struct __t
827     {
828         using __id = __spawn_rcvr;
829         using receiver_concept = stdexec::receiver_t;
830         __spawn_op_base<_EnvId>* __op_;
831 
set_valueexec::__scope::__spawn_rcvr::__t832         void set_value() noexcept
833         {
834             __op_->__delete_(__op_);
835         }
836 
837         // BUGBUG NOT TO SPEC spawn shouldn't accept senders that can fail.
set_errorexec::__scope::__spawn_rcvr::__t838         [[noreturn]] void set_error(std::exception_ptr __eptr) noexcept
839         {
840             std::rethrow_exception(std::move(__eptr));
841         }
842 
set_stoppedexec::__scope::__spawn_rcvr::__t843         void set_stopped() noexcept
844         {
845             __op_->__delete_(__op_);
846         }
847 
get_envexec::__scope::__spawn_rcvr::__t848         auto get_env() const noexcept -> const __spawn_env_t<_Env>&
849         {
850             return __op_->__env_;
851         }
852     };
853 };
854 
855 template <class _Env>
856 using __spawn_receiver_t = stdexec::__t<__spawn_rcvr<__id<_Env>>>;
857 
858 template <class _SenderId, class _EnvId>
859 struct __spawn_op
860 {
861     using _Env = stdexec::__t<_EnvId>;
862     using _Sender = stdexec::__t<_SenderId>;
863 
864     struct __t : __spawn_op_base<_EnvId>
865     {
866         template <__decays_to<_Sender> _Sndr>
__texec::__scope::__spawn_op::__t867         __t(_Sndr&& __sndr, _Env __env, const __impl* __scope) :
868             __spawn_op_base<_EnvId>{
869                 __env::__join(
870                     static_cast<_Env&&>(__env),
871                     __spawn_env_{__scope->__stop_source_.get_token()}),
872                 [](__spawn_op_base<_EnvId>* __op) {
873                     delete static_cast<__t*>(__op);
874                 }},
875             __op_(stdexec::connect(static_cast<_Sndr&&>(__sndr),
876                                    __spawn_receiver_t<_Env>{this}))
877         {}
878 
startexec::__scope::__spawn_op::__t879         void start() & noexcept
880         {
881             stdexec::start(__op_);
882         }
883 
884         connect_result_t<_Sender, __spawn_receiver_t<_Env>> __op_;
885     };
886 };
887 
888 template <class _Sender, class _Env>
889 using __spawn_operation_t = stdexec::__t<__spawn_op<__id<_Sender>, __id<_Env>>>;
890 
891 ////////////////////////////////////////////////////////////////////////////
892 // async_scope
893 struct async_scope : __immovable
894 {
895     async_scope() = default;
896 
897     template <sender _Constrained>
when_emptyexec::__scope::async_scope898     [[nodiscard]] auto when_empty(_Constrained&& __c) const
899         -> __when_empty_sender_t<_Constrained>
900     {
901         return __when_empty_sender_t<_Constrained>{
902             &__impl_, static_cast<_Constrained&&>(__c)};
903     }
904 
on_emptyexec::__scope::async_scope905     [[nodiscard]] auto on_empty() const
906     {
907         return when_empty(just());
908     }
909 
910     template <sender _Constrained>
911     using nest_result_t = __nest_sender_t<_Constrained>;
912 
913     template <sender _Constrained>
nestexec::__scope::async_scope914     [[nodiscard]] auto nest(_Constrained&& __c) -> nest_result_t<_Constrained>
915     {
916         return nest_result_t<_Constrained>{&__impl_,
917                                            static_cast<_Constrained&&>(__c)};
918     }
919 
920     template <__movable_value _Env = empty_env,
921               sender_in<__spawn_env_t<_Env>> _Sender>
922         requires sender_to<nest_result_t<_Sender>, __spawn_receiver_t<_Env>>
spawnexec::__scope::async_scope923     void spawn(_Sender&& __sndr, _Env __env = {})
924     {
925         using __op_t = __spawn_operation_t<nest_result_t<_Sender>, _Env>;
926         // start is noexcept so we can assume that the operation will complete
927         // after this, which means we can rely on its self-ownership to ensure
928         // that it is eventually deleted
929         stdexec::start(*(new __op_t{nest(static_cast<_Sender&&>(__sndr)),
930                                     static_cast<_Env&&>(__env), &__impl_}));
931     }
932 
933     template <__movable_value _Env = empty_env,
934               sender_in<__env_t<_Env>> _Sender>
spawn_futureexec::__scope::async_scope935     auto spawn_future(_Sender&& __sndr, _Env __env = {})
936         -> __future_t<_Sender, _Env>
937     {
938         using __state_t = __future_state<nest_result_t<_Sender>, _Env>;
939         auto __state =
940             std::make_unique<__state_t>(nest(static_cast<_Sender&&>(__sndr)),
941                                         static_cast<_Env&&>(__env), &__impl_);
942         stdexec::start(__state->__op_);
943         return __future_t<_Sender, _Env>{std::move(__state)};
944     }
945 
get_stop_sourceexec::__scope::async_scope946     auto get_stop_source() noexcept -> inplace_stop_source&
947     {
948         return __impl_.__stop_source_;
949     }
950 
get_stop_tokenexec::__scope::async_scope951     auto get_stop_token() const noexcept -> inplace_stop_token
952     {
953         return __impl_.__stop_source_.get_token();
954     }
955 
request_stopexec::__scope::async_scope956     auto request_stop() noexcept -> bool
957     {
958         return __impl_.__stop_source_.request_stop();
959     }
960 
961   private:
962     __impl __impl_;
963 };
964 } // namespace __scope
965 
966 using __scope::async_scope;
967 
968 template <class _AsyncScope, class _Sender>
969 using nest_result_t = decltype(stdexec::__declval<_AsyncScope&>().nest(
970     stdexec::__declval<_Sender&&>()));
971 } // namespace exec
972