xref: /openbmc/sdbusplus/include/sdbusplus/async/stdexec/__detail/__shared.hpp (revision f083bc1a64e1f94c99fc270b7c0856810f4be638)
1 /*
2  * Copyright (c) 2021-2024 NVIDIA Corporation
3  *
4  * Licensed under the Apache License Version 2.0 with LLVM Exceptions
5  * (the "License"); you may not use this file except in compliance with
6  * the License. You may obtain a copy of the License at
7  *
8  *   https://llvm.org/LICENSE.txt
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
18 #include "__execution_fwd.hpp"
19 
20 // include these after __execution_fwd.hpp
21 #include "../functional.hpp"
22 #include "../stop_token.hpp"
23 #include "__basic_sender.hpp"
24 #include "__cpo.hpp"
25 #include "__env.hpp"
26 #include "__intrusive_ptr.hpp"
27 #include "__intrusive_slist.hpp"
28 #include "__meta.hpp"
29 #include "__optional.hpp"
30 #include "__transform_completion_signatures.hpp"
31 #include "__tuple.hpp"
32 #include "__variant.hpp"
33 
34 #include <exception>
35 #include <mutex>
36 
37 namespace stdexec
38 {
39 ////////////////////////////////////////////////////////////////////////////
40 // shared components of split and ensure_started
41 //
42 // The split and ensure_started algorithms are very similar in implementation.
43 // The salient differences are:
44 //
45 // split: the input async operation is always connected. It is only
46 //   started when one of the split senders is connected and started.
47 //   split senders are copyable, so there are multiple operation states
48 //   to be notified on completion. These are stored in an instrusive
49 //   linked list.
50 //
51 // ensure_started: the input async operation is always started, so
52 //   the internal receiver will always be completed. The ensure_started
53 //   sender is move-only and single-shot, so there will only ever be one
54 //   operation state to be notified on completion.
55 //
56 // The shared state should add-ref itself when the input async
57 // operation is started and release itself when its completion
58 // is notified.
59 namespace __shared
60 {
61 template <class _BaseEnv>
62 using __env_t =                //
63     __env::__join_t<prop<get_stop_token_t, inplace_stop_token>,
64                     _BaseEnv>; // BUGBUG NOT TO SPEC
65 
66 template <class _Receiver>
__make_notify_visitor(_Receiver & __rcvr)67 auto __make_notify_visitor(_Receiver& __rcvr) noexcept
68 {
69     return [&]<class _Tuple>(_Tuple&& __tupl) noexcept -> void {
70         __tupl.apply(
71             [&](auto __tag, auto&&... __args) noexcept -> void {
72                 __tag(static_cast<_Receiver&&>(__rcvr),
73                       __forward_like<_Tuple>(__args)...);
74             },
75             __tupl);
76     };
77 }
78 
79 struct __local_state_base : __immovable
80 {
81     using __notify_fn = void(__local_state_base*) noexcept;
82 
83     __notify_fn* __notify_{};
84     __local_state_base* __next_{};
85 };
86 
87 template <class _CvrefSender, class _Env>
88 struct __shared_state;
89 
90 // The operation state of ensure_started, and each operation state of split, has
91 // one of these, created when the sender is connected. There are 0 or more of
92 // them for each underlying async operation. It is what ensure_started- and
93 // split-sender's `get_state` fn returns. It holds a ref count to the shared
94 // state.
95 template <class _CvrefSender, class _Receiver>
96 struct __local_state :
97     __local_state_base,
98     __enable_receiver_from_this<_CvrefSender, _Receiver,
99                                 __local_state<_CvrefSender, _Receiver>>
100 {
101     using __tag_t = tag_of_t<_CvrefSender>;
102     using __stok_t = stop_token_of_t<env_of_t<_Receiver>>;
103     static_assert(__one_of<__tag_t, __split::__split_t,
104                            __ensure_started::__ensure_started_t>);
105 
__local_statestdexec::__shared::__local_state106     explicit __local_state(_CvrefSender&& __sndr) noexcept :
107         __local_state::__local_state_base{{},
108                                           &__notify<tag_of_t<_CvrefSender>>},
109         __sh_state_(__get_sh_state(__sndr))
110     {}
111 
~__local_statestdexec::__shared::__local_state112     ~__local_state()
113     {
114         __sh_state_t::__detach(__sh_state_);
115     }
116 
117     // Stop request callback:
operator ()stdexec::__shared::__local_state118     void operator()() noexcept
119     {
120         // We reach here when a split/ensure_started sender has received a stop
121         // request from the receiver to which it is connected.
122         if (std::unique_lock __lock{__sh_state_->__mutex_})
123         {
124             // Remove this operation from the waiters list. Removal can fail if:
125             //   1. It was already removed by another thread, or
126             //   2. It hasn't been added yet (see `start` below), or
127             //   3. The underlying operation has already completed.
128             //
129             // In each case, the right thing to do is nothing. If (1) then we
130             // raced with another thread and lost. In that case, the other
131             // thread will take care of it. If (2) then `start` will take care
132             // of it. If (3) then this stop request is safe to ignore.
133             if (!__sh_state_->__waiters_.remove(this))
134                 return;
135         }
136 
137         // The following code and the __notify function cannot both execute.
138         // This is because the
139         // __notify function is called from the shared state's __notify_waiters
140         // function, which first sets __waiters_ to the completed state. As a
141         // result, the attempt to remove `this` from the waiters list above will
142         // fail and this stop request is ignored.
143         __sh_state_t::__detach(__sh_state_);
144         stdexec::set_stopped(static_cast<_Receiver&&>(this->__receiver()));
145     }
146 
147     // This is called from __shared_state::__notify_waiters when the input async
148     // operation completes; or, if it has already completed when start is
149     // called, it is called from start:
150     // __notify cannot race with __on_stop_request. See comment in
151     // __on_stop_request.
152     template <class _Tag>
__notifystdexec::__shared::__local_state153     static void __notify(__local_state_base* __base) noexcept
154     {
155         auto* const __self = static_cast<__local_state*>(__base);
156 
157         // The split algorithm sends by T const&. ensure_started sends by T&&.
158         constexpr bool __is_split = same_as<__split::__split_t, _Tag>;
159         using __variant_t = decltype(__self->__sh_state_->__results_);
160         using __cv_variant_t =
161             __if_c<__is_split, const __variant_t&, __variant_t>;
162 
163         __self->__on_stop_.reset();
164 
165         auto __visitor = __make_notify_visitor(__self->__receiver());
166         __variant_t::visit(__visitor, static_cast<__cv_variant_t&&>(
167                                           __self->__sh_state_->__results_));
168     }
169 
__get_sh_statestdexec::__shared::__local_state170     static auto __get_sh_state(_CvrefSender& __sndr) noexcept
171     {
172         return __sndr
173             .apply(static_cast<_CvrefSender&&>(__sndr), __detail::__get_data())
174             .__sh_state_;
175     }
176 
177     using __sh_state_ptr_t = __result_of<__get_sh_state, _CvrefSender&>;
178     using __sh_state_t = typename __sh_state_ptr_t::element_type;
179 
180     __optional<stop_callback_for_t<__stok_t, __local_state&>> __on_stop_{};
181     __sh_state_ptr_t __sh_state_;
182 };
183 
184 template <class _CvrefSenderId, class _EnvId>
185 struct __receiver
186 {
187     using _CvrefSender = stdexec::__cvref_t<_CvrefSenderId>;
188     using _Env = stdexec::__t<_EnvId>;
189 
190     struct __t
191     {
192         using receiver_concept = receiver_t;
193         using __id = __receiver;
194 
195         template <class... _As>
196         STDEXEC_ATTRIBUTE((always_inline))
set_valuestdexec::__shared::__receiver::__t197         void set_value(_As&&... __as) noexcept
198         {
199             __sh_state_->__complete(set_value_t(), static_cast<_As&&>(__as)...);
200         }
201 
202         template <class _Error>
203         STDEXEC_ATTRIBUTE((always_inline))
set_errorstdexec::__shared::__receiver::__t204         void set_error(_Error&& __err) noexcept
205         {
206             __sh_state_->__complete(set_error_t(),
207                                     static_cast<_Error&&>(__err));
208         }
209 
210         STDEXEC_ATTRIBUTE((always_inline))
set_stoppedstdexec::__shared::__receiver::__t211         void set_stopped() noexcept
212         {
213             __sh_state_->__complete(set_stopped_t());
214         }
215 
get_envstdexec::__shared::__receiver::__t216         auto get_env() const noexcept -> const __env_t<_Env>&
217         {
218             return __sh_state_->__env_;
219         }
220 
221         // The receiver does not hold a reference to the shared state.
222         __shared_state<_CvrefSender, _Env>* __sh_state_;
223     };
224 };
225 
__get_tombstone()226 inline __local_state_base* __get_tombstone() noexcept
227 {
228     static __local_state_base __tombstone_{{}, nullptr, nullptr};
229     return &__tombstone_;
230 }
231 
232 //! Heap-allocatable shared state for things like `stdexec::split`.
233 template <class _CvrefSender, class _Env>
234 struct __shared_state :
235     private __enable_intrusive_from_this<__shared_state<_CvrefSender, _Env>, 2>
236 {
237     using __receiver_t = __t<__receiver<__cvref_id<_CvrefSender>, __id<_Env>>>;
238     using __waiters_list_t = __intrusive_slist<&__local_state_base::__next_>;
239 
240     using __variant_t = //
241         __transform_completion_signatures<
242             __completion_signatures_of_t<_CvrefSender, _Env>,
243             __mbind_front_q<__decayed_tuple, set_value_t>::__f,
244             __mbind_front_q<__decayed_tuple, set_error_t>::__f,
245             __tuple_for<set_error_t, std::exception_ptr>,
246             __munique<__mbind_front_q<__variant_for,
247                                       __tuple_for<set_stopped_t>>>::__f,
248             __tuple_for<set_error_t, std::exception_ptr>>;
249 
250     static constexpr std::size_t __started_bit = 0;
251     static constexpr std::size_t __completed_bit = 1;
252 
253     inplace_stop_source __stop_source_{};
254     __env_t<_Env> __env_;
255     __variant_t __results_{}; // Defaults to the "set_stopped" state
256     std::mutex __mutex_;      // This mutex guards access to __waiters_.
257     __waiters_list_t __waiters_{};
258     connect_result_t<_CvrefSender, __receiver_t> __shared_op_;
259 
__shared_statestdexec::__shared::__shared_state260     explicit __shared_state(_CvrefSender&& __sndr, _Env __env) :
261         __env_(__env::__join(prop{get_stop_token, __stop_source_.get_token()},
262                              static_cast<_Env&&>(__env))),
263         __shared_op_(
264             connect(static_cast<_CvrefSender&&>(__sndr), __receiver_t{this}))
265     {
266         // add one ref count to account for the case where there are no watchers
267         // left but the shared op is still running.
268         this->__inc_ref();
269     }
270 
271     // The caller of this wants to release their reference to the shared state.
272     // The ref count must be at least 2 at this point: one owned by the caller,
273     // and one added in the
274     // __shared_state ctor.
__detachstdexec::__shared::__shared_state275     static void __detach(__intrusive_ptr<__shared_state, 2>& __ptr) noexcept
276     {
277         // Ask the intrusive ptr to stop managing the reference count so we can
278         // manage it manually.
279         if (auto* __self = __ptr.__release_())
280         {
281             auto __old = __self->__dec_ref();
282             STDEXEC_ASSERT(__count(__old) >= 2);
283 
284             if (__count(__old) == 2)
285             {
286                 // The last watcher has released its reference. Asked the shared
287                 // op to stop.
288                 static_cast<__shared_state*>(__self)
289                     ->__stop_source_.request_stop();
290 
291                 // Additionally, if the shared op was never started, or if it
292                 // has already completed, then the shared state is no longer
293                 // needed. Decrement the ref count to 0 here, which will delete
294                 // __self.
295                 if (!__bit<__started_bit>(__old) ||
296                     __bit<__completed_bit>(__old))
297                 {
298                     __self->__dec_ref();
299                 }
300             }
301         }
302     }
303 
304     /// @post The started bit is set in the shared state's ref count, OR the
305     /// __waiters_ list is set to the known "tombstone" value indicating
306     /// completion.
__try_startstdexec::__shared::__shared_state307     void __try_start() noexcept
308     {
309         // With the split algorithm, multiple split senders can be started
310         // simultaneously, but only one should start the shared async operation.
311         // If the "started" bit is set, then someone else has already started
312         // the shared operation. Do nothing.
313         if (this->template __is_set<__started_bit>())
314         {
315             return;
316         }
317         else if (__bit<__started_bit>(
318                      this->template __set_bit<__started_bit>()))
319         {
320             return;
321         }
322         else if (__stop_source_.stop_requested())
323         {
324             // Stop has already been requested. Rather than starting the
325             // operation, complete with set_stopped immediately.
326             // 1. Sets __waiters_ to a known "tombstone" value
327             // 2. Notifies all the waiters that the operation has stopped
328             // 3. Sets the "completed" bit in the ref count.
329             __notify_waiters();
330             return;
331         }
332         else
333         {
334             stdexec::start(__shared_op_);
335         }
336     }
337 
338     template <class _StopToken>
__try_add_waiterstdexec::__shared::__shared_state339     bool __try_add_waiter(__local_state_base* __waiter,
340                           _StopToken __stok) noexcept
341     {
342         std::unique_lock __lock{__mutex_};
343         if (__waiters_.front() == __get_tombstone())
344         {
345             // The work has already completed. Notify the waiter immediately.
346             __lock.unlock();
347             __waiter->__notify_(__waiter);
348             return true;
349         }
350         else if (__stok.stop_requested())
351         {
352             // Stop has been requested. Do not add the waiter.
353             return false;
354         }
355         else
356         {
357             // Add the waiter to the list.
358             __waiters_.push_front(__waiter);
359             return true;
360         }
361     }
362 
363     /// @brief This is called when the shared async operation completes.
364     /// @post __waiters_ is set to a known "tombstone" value.
365     template <class _Tag, class... _As>
__completestdexec::__shared::__shared_state366     void __complete(_Tag, _As&&... __as) noexcept
367     {
368         try
369         {
370             using __tuple_t = __decayed_tuple<_Tag, _As...>;
371             __results_.template emplace<__tuple_t>(_Tag(),
372                                                    static_cast<_As&&>(__as)...);
373         }
374         catch (...)
375         {
376             using __tuple_t = __decayed_tuple<set_error_t, std::exception_ptr>;
377             __results_.template emplace<__tuple_t>(set_error,
378                                                    std::current_exception());
379         }
380 
381         __notify_waiters();
382     }
383 
384     /// @brief This is called when the shared async operation completes.
385     /// @post __waiters_ is set to a known "tombstone" value.
__notify_waitersstdexec::__shared::__shared_state386     void __notify_waiters() noexcept
387     {
388         __waiters_list_t __waiters_copy{__get_tombstone()};
389 
390         // Set the waiters list to a known "tombstone" value that we can check
391         // later.
392         {
393             std::lock_guard __lock{__mutex_};
394             __waiters_.swap(__waiters_copy);
395         }
396 
397         STDEXEC_ASSERT(__waiters_copy.front() != __get_tombstone());
398         for (auto __itr = __waiters_copy.begin();
399              __itr != __waiters_copy.end();)
400         {
401             __local_state_base* __item = *__itr;
402 
403             // We must increment the iterator before calling notify, since
404             // notify may end up triggering *__item to be destructed on another
405             // thread, and the intrusive slist's iterator increment relies on
406             // __item.
407             ++__itr;
408 
409             __item->__notify_(__item);
410         }
411 
412         // Set the "completed" bit in the ref count. If the ref count is 1, then
413         // there are no more waiters. Release the final reference.
414         if (__count(this->template __set_bit<__completed_bit>()) == 1)
415         {
416             this->__dec_ref(); // release the extra ref count, deletes this
417         }
418     }
419 };
420 
421 template <class _Cvref, class _CvrefSender, class _Env>
422 using __make_completions = //
423     __try_make_completion_signatures<
424         // NOT TO SPEC:
425         // See https://github.com/cplusplus/sender-receiver/issues/23
426         _CvrefSender, __env_t<_Env>,
427         completion_signatures<set_error_t(
428                                   __minvoke<_Cvref, std::exception_ptr>),
429                               set_stopped_t()>, // NOT TO SPEC
430         __mtransform<_Cvref,
431                      __mcompose<__q<completion_signatures>, __qf<set_value_t>>>,
432         __mtransform<
433             _Cvref, __mcompose<__q<completion_signatures>, __qf<set_error_t>>>>;
434 
435 // split completes with const T&. ensure_started completes with T&&.
436 template <class _Tag>
437 using __cvref_results_t = //
438     __mcompose<__if_c<same_as<_Tag, __split::__split_t>, __cpclr, __cp>,
439                __q<__decay_t>>;
440 
441 // NOTE: the use of __mapply in the return type below takes advantage of the
442 // fact that _ShState denotes an instance of the __shared_state template, which
443 // is parameterized on the cvref-qualified sender and the environment.
444 template <class _Tag, class _ShState>
445 using __completions = //
446     __mapply<__mbind_front_q<__make_completions, __cvref_results_t<_Tag>>,
447              _ShState>;
448 
449 template <class _CvrefSender, class _Env, bool _Copyable = true>
450 struct __box
451 {
452     using __tag_t = __if_c<_Copyable, __split::__split_t,
453                            __ensure_started::__ensure_started_t>;
454     using __sh_state_t = __shared_state<_CvrefSender, _Env>;
455 
__boxstdexec::__shared::__box456     __box(__tag_t, __intrusive_ptr<__sh_state_t, 2> __sh_state) noexcept :
457         __sh_state_(std::move(__sh_state))
458     {}
459 
460     __box(__box&&) noexcept = default;
461     __box(const __box&) noexcept
462         requires _Copyable
463     = default;
464 
~__boxstdexec::__shared::__box465     ~__box()
466     {
467         __sh_state_t::__detach(__sh_state_);
468     }
469 
470     __intrusive_ptr<__sh_state_t, 2> __sh_state_;
471 };
472 
473 template <class _CvrefSender, class _Env>
474 __box(__split::__split_t,
475       __intrusive_ptr<__shared_state<_CvrefSender, _Env>, 2>) //
476     ->__box<_CvrefSender, _Env, true>;
477 
478 template <class _CvrefSender, class _Env>
479 __box(__ensure_started::__ensure_started_t,
480       __intrusive_ptr<__shared_state<_CvrefSender, _Env>, 2>)
481     -> __box<_CvrefSender, _Env, false>;
482 
483 template <class _Tag>
484 struct __shared_impl : __sexpr_defaults
485 {
486     static constexpr auto get_state = //
487         []<class _CvrefSender, class _Receiver>(
488             _CvrefSender&& __sndr,
489             _Receiver&) noexcept -> __local_state<_CvrefSender, _Receiver> {
490         static_assert(sender_expr_for<_CvrefSender, _Tag>);
491         return __local_state<_CvrefSender, _Receiver>{
492             static_cast<_CvrefSender&&>(__sndr)};
493     };
494 
495     static constexpr auto get_completion_signatures = //
496         []<class _Self>(const _Self&, auto&&...) noexcept
497         -> __completions<_Tag, typename __data_of<_Self>::__sh_state_t> {
498         static_assert(sender_expr_for<_Self, _Tag>);
499         return {};
500     };
501 
502     static constexpr auto start = //
503         []<class _Sender, class _Receiver>(
504             __local_state<_Sender, _Receiver>& __self,
505             _Receiver& __rcvr) noexcept -> void {
506         using __sh_state_t =
507             typename __local_state<_Sender, _Receiver>::__sh_state_t;
508         // Scenario: there are no more split senders, this is the only operation
509         // state, the underlying operation has not yet been started, and the
510         // receiver's stop token is already in the "stop requested" state. Then
511         // registering the stop callback will call
512         // __on_stop_request on __self synchronously. It may also be called
513         // asynchronously at any point after the callback is registered. Beware.
514         // We are guaranteed, however, that
515         // __on_stop_request will not complete the operation or decrement the
516         // shared state's ref count until after __self has been added to the
517         // waiters list.
518         const auto __stok = stdexec::get_stop_token(stdexec::get_env(__rcvr));
519         __self.__on_stop_.emplace(__stok, __self);
520 
521         // We haven't put __self in the waiters list yet and we are holding a
522         // ref count to
523         // __sh_state_, so nothing can happen to the __sh_state_ here.
524 
525         // Start the shared op. As an optimization, skip it if the receiver's
526         // stop token has already been signaled.
527         if (!__stok.stop_requested())
528         {
529             __self.__sh_state_->__try_start();
530             if (__self.__sh_state_->__try_add_waiter(&__self, __stok))
531             {
532                 // successfully added the waiter
533                 return;
534             }
535         }
536 
537         // Otherwise, failed to add the waiter because of a stop-request.
538         // Complete synchronously with set_stopped().
539         __self.__on_stop_.reset();
540         __sh_state_t::__detach(__self.__sh_state_);
541         stdexec::set_stopped(static_cast<_Receiver&&>(__rcvr));
542     };
543 };
544 } // namespace __shared
545 } // namespace stdexec
546