xref: /openbmc/sdbusplus/include/sdbusplus/async/stdexec/__detail/__shared.hpp (revision 10d0b4b7d1498cfd5c3d37edea271a54d1984e41)
1 /*
2  * Copyright (c) 2021-2024 NVIDIA Corporation
3  *
4  * Licensed under the Apache License Version 2.0 with LLVM Exceptions
5  * (the "License"); you may not use this file except in compliance with
6  * the License. You may obtain a copy of the License at
7  *
8  *   https://llvm.org/LICENSE.txt
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
18 #include "__execution_fwd.hpp"
19 
20 // include these after __execution_fwd.hpp
21 #include "__basic_sender.hpp"
22 #include "__env.hpp"
23 #include "__intrusive_slist.hpp"
24 #include "__optional.hpp"
25 #include "__meta.hpp"
26 #include "__receivers.hpp"
27 #include "__transform_completion_signatures.hpp"
28 #include "__tuple.hpp"
29 #include "__variant.hpp" // IWYU pragma: keep
30 
31 #include "../stop_token.hpp"
32 
33 #include <atomic>
34 #include <exception>
35 #include <mutex>
36 #include <type_traits>
37 #include <utility>
38 
39 ////////////////////////////////////////////////////////////////////////////
40 // shared components of split and ensure_started
41 //
42 // The split and ensure_started algorithms are very similar in implementation.
43 // The salient differences are:
44 //
45 // split: the input async operation is always connected. It is only
46 //   started when one of the split senders is connected and started.
47 //   split senders are copyable, so there are multiple operation states
48 //   to be notified on completion. These are stored in an instrusive
49 //   linked list.
50 //
51 // ensure_started: the input async operation is always started, so
52 //   the internal receiver will always be completed. The ensure_started
53 //   sender is move-only and single-shot, so there will only ever be one
54 //   operation state to be notified on completion.
55 //
56 // The shared state should add-ref itself when the input async
57 // operation is started and release itself when its completion
58 // is notified.
59 namespace stdexec::__shared {
60   template <class _BaseEnv>
61   using __env_t = __join_env_t<
62     prop<get_stop_token_t, inplace_stop_token>,
63     _BaseEnv
64   >; // BUGBUG NOT TO SPEC
65 
66   template <class _Receiver>
67   struct __notify_fn {
68     template <class _Tag, class... _Args>
operator ()stdexec::__shared::__notify_fn69     void operator()(_Tag __tag, _Args&&... __args) const noexcept {
70       __tag(static_cast<_Receiver&&>(__rcvr_), static_cast<_Args&&>(__args)...);
71     }
72 
73     _Receiver& __rcvr_;
74   };
75 
76   template <class _Receiver>
__make_notify_visitor(_Receiver & __rcvr)77   auto __make_notify_visitor(_Receiver& __rcvr) noexcept {
78     return [&]<class _Tuple>(_Tuple&& __tupl) noexcept -> void {
79       __tupl.apply(__notify_fn<_Receiver>{__rcvr}, static_cast<_Tuple&&>(__tupl));
80     };
81   }
82 
83   struct __local_state_base : __immovable {
84     using __notify_fn = void(__local_state_base*) noexcept;
85 
__notifystdexec::__shared::__local_state_base86     void __notify() noexcept {
87       __notify_(this);
88     }
89 
90     __notify_fn* __notify_{};
91     __local_state_base* __next_{};
92   };
93 
94   template <class _CvrefSender, class _Env>
95   struct __shared_state;
96 
97   // The operation state of ensure_started, and each operation state of split, has one of these,
98   // created when the sender is connected. There are 0 or more of them for each underlying async
99   // operation. It is what ensure_started- and split-sender's `get_state` fn returns. It holds a
100   // ref count to the shared state.
101   template <class _CvrefSender, class _Receiver>
102   struct __local_state
103     : __local_state_base
104     , __enable_receiver_from_this<_CvrefSender, _Receiver, __local_state<_CvrefSender, _Receiver>> {
105     using __tag_t = tag_of_t<_CvrefSender>;
106     using __stok_t = stop_token_of_t<env_of_t<_Receiver>>;
107     static_assert(__one_of<__tag_t, __split::__split_t, __ensure_started::__ensure_started_t>);
108 
__local_statestdexec::__shared::__local_state109     explicit __local_state(_CvrefSender&& __sndr) noexcept
110       : __local_state::__local_state_base{{}, &__notify<tag_of_t<_CvrefSender>>}
111       , __sh_state_(__get_sh_state(__sndr)) {
112     }
113 
~__local_statestdexec::__shared::__local_state114     ~__local_state() {
115       if (__sh_state_) {
116         __sh_state_->__detach();
117       }
118     }
119 
120     // Stop request callback:
operator ()stdexec::__shared::__local_state121     void operator()() noexcept {
122       // We reach here when a split/ensure_started sender has received a stop request from the
123       // receiver to which it is connected.
124       if (std::unique_lock __lock{__sh_state_->__mutex_}) {
125         // Remove this operation from the waiters list. Removal can fail if:
126         //   1. It was already removed by another thread, or
127         //   2. It hasn't been added yet (see `start` below), or
128         //   3. The underlying operation has already completed.
129 
130         // In each case, the right thing to do is nothing. If (1) then we raced with another
131         // thread and lost. In that case, the other thread will take care of it. If (2) then
132         // `start` will take care of it. If (3) then this stop request is safe to ignore.
133         if (!__sh_state_->__waiters_.remove(this))
134           return;
135       }
136 
137       // The following code and the __notify function cannot both execute. This is because the
138       // __notify function is called from the shared state's __notify_waiters function, which
139       // first sets __waiters_ to the completed state. As a result, the attempt to remove `this`
140       // from the waiters list above will fail and this stop request is ignored.
141       std::exchange(__sh_state_, nullptr)->__detach();
142       stdexec::set_stopped(static_cast<_Receiver&&>(this->__receiver()));
143     }
144 
145     // This is called from __shared_state::__notify_waiters when the input async operation
146     // completes; or, if it has already completed when start is called, it is called from start:
147     // __notify cannot race with __local_state::operator(). See comment in
148     // __local_state::operator().
149     template <class _Tag>
__notifystdexec::__shared::__local_state150     static void __notify(__local_state_base* __base) noexcept {
151       auto* const __self = static_cast<__local_state*>(__base);
152 
153       // The split algorithm sends by T const&. ensure_started sends by T&&.
154       constexpr bool __is_split = same_as<__split::__split_t, _Tag>;
155       using __variant_t = decltype(__self->__sh_state_->__results_);
156       using __cv_variant_t = __if_c<__is_split, const __variant_t&, __variant_t>;
157 
158       __self->__on_stop_.reset();
159 
160       auto __visitor = __make_notify_visitor(__self->__receiver());
161       __variant_t::visit(__visitor, static_cast<__cv_variant_t&&>(__self->__sh_state_->__results_));
162     }
163 
__get_sh_statestdexec::__shared::__local_state164     static auto __get_sh_state(_CvrefSender& __sndr) noexcept {
165       auto __box = __sndr.apply(static_cast<_CvrefSender&&>(__sndr), __detail::__get_data());
166       return std::exchange(__box.__sh_state_, nullptr);
167     }
168 
169     using __sh_state_ptr_t = __result_of<__get_sh_state, _CvrefSender&>;
170     using __sh_state_t = std::remove_pointer_t<__sh_state_ptr_t>;
171 
172     __optional<stop_callback_for_t<__stok_t, __local_state&>> __on_stop_{};
173     __sh_state_ptr_t __sh_state_;
174   };
175 
176   template <class _CvrefSenderId, class _EnvId>
177   struct __receiver {
178     using _CvrefSender = stdexec::__cvref_t<_CvrefSenderId>;
179     using _Env = stdexec::__t<_EnvId>;
180 
181     struct __t {
182       using receiver_concept = receiver_t;
183       using __id = __receiver;
184 
185       template <class... _As>
STDEXEC_ATTRIBUTEstdexec::__shared::__receiver::__t186       STDEXEC_ATTRIBUTE(always_inline)
187       void set_value(_As&&... __as) noexcept {
188         __sh_state_->__complete(set_value_t(), static_cast<_As&&>(__as)...);
189       }
190 
191       template <class _Error>
STDEXEC_ATTRIBUTEstdexec::__shared::__receiver::__t192       STDEXEC_ATTRIBUTE(always_inline)
193       void set_error(_Error&& __err) noexcept {
194         __sh_state_->__complete(set_error_t(), static_cast<_Error&&>(__err));
195       }
196 
STDEXEC_ATTRIBUTEstdexec::__shared::__receiver::__t197       STDEXEC_ATTRIBUTE(always_inline) void set_stopped() noexcept {
198         __sh_state_->__complete(set_stopped_t());
199       }
200 
get_envstdexec::__shared::__receiver::__t201       auto get_env() const noexcept -> const __env_t<_Env>& {
202         return __sh_state_->__env_;
203       }
204 
205       // The receiver does not hold a reference to the shared state.
206       __shared_state<_CvrefSender, _Env>* __sh_state_;
207     };
208   };
209 
210   //! Heap-allocatable shared state for things like `stdexec::split`.
211   template <class _CvrefSender, class _Env>
212   struct __shared_state {
213     using __receiver_t = __t<__receiver<__cvref_id<_CvrefSender>, __id<_Env>>>;
214     using __waiters_list_t = __intrusive_slist<&__local_state_base::__next_>;
215 
216     using __variant_t = __transform_completion_signatures<
217       __completion_signatures_of_t<_CvrefSender, _Env>,
218       __mbind_front_q<__decayed_tuple, set_value_t>::__f,
219       __mbind_front_q<__decayed_tuple, set_error_t>::__f,
220       __tuple_for<set_error_t, std::exception_ptr>,
221       __munique<__mbind_front_q<__variant_for, __tuple_for<set_stopped_t>>>::__f,
222       __tuple_for<set_error_t, std::exception_ptr>
223     >;
224 
225     inplace_stop_source __stop_source_{};
226     __env_t<_Env> __env_;
227     __variant_t __results_{}; // Defaults to the "set_stopped" state
228     std::mutex __mutex_;      // This mutex guards access to __waiters_.
229     __waiters_list_t __waiters_{};
230     connect_result_t<_CvrefSender, __receiver_t> __shared_op_;
231     std::atomic_flag __started_{};
232     std::atomic<std::size_t> __ref_count_{2};
233     __local_state_base __tombstone_{};
234 
235     // Let a "consumer" be either a split/ensure_started sender, or an operation
236     // state created by connecting a split/ensure_started sender to a receiver.
237     // Let is_running be 1 if the shared operation is currently executing (after
238     // start has been called but before the receiver's completion functions have
239     // executed), and 0 otherwise. Then __ref_count_ is equal to:
240 
241     // (2 * (nbr of consumers)) + is_running
242 
__shared_statestdexec::__shared::__shared_state243     explicit __shared_state(_CvrefSender&& __sndr, _Env __env)
244       : __env_(
245           __env::__join(
246             prop{get_stop_token, __stop_source_.get_token()},
247             static_cast<_Env&&>(__env)))
248       , __shared_op_(connect(static_cast<_CvrefSender&&>(__sndr), __receiver_t{this})) {
249     }
250 
__inc_refstdexec::__shared::__shared_state251     void __inc_ref() noexcept {
252       __ref_count_.fetch_add(2ul, std::memory_order_relaxed);
253     }
254 
__dec_refstdexec::__shared::__shared_state255     void __dec_ref() noexcept {
256       if (2ul == __ref_count_.fetch_sub(2ul, std::memory_order_acq_rel)) {
257         delete this;
258       }
259     }
260 
__set_startedstdexec::__shared::__shared_state261     auto __set_started() noexcept -> bool {
262       if (__started_.test_and_set(std::memory_order_acq_rel)) {
263         return false; // already started
264       }
265       __ref_count_.fetch_add(1ul, std::memory_order_relaxed);
266       return true;
267     }
268 
__set_completedstdexec::__shared::__shared_state269     void __set_completed() noexcept {
270       if (1ul == __ref_count_.fetch_sub(1ul, std::memory_order_acq_rel)) {
271         delete this;
272       }
273     }
274 
__detachstdexec::__shared::__shared_state275     void __detach() noexcept {
276       if (__ref_count_.load() < 4ul) {
277         // We are the final "consumer", and we are about to release our reference
278         // to the shared state. Ask the operation to stop early.
279         __stop_source_.request_stop();
280       }
281       __dec_ref();
282     }
283 
284     /// @post The "is running" bit is set in the shared state's ref count, OR the __waiters_ list
285     /// is set to the known "tombstone" value indicating completion.
__try_startstdexec::__shared::__shared_state286     void __try_start() noexcept {
287       // With the split algorithm, multiple split senders can be started simultaneously, but
288       // only one should start the shared async operation. If the low bit is set, then
289       // someone else has already started the shared operation. Do nothing.
290       if (__set_started()) {
291         // we are the first to start the underlying operation
292         if (__stop_source_.stop_requested()) {
293           // Stop has already been requested. Rather than starting the operation, complete with
294           // set_stopped immediately.
295           // 1. Sets __waiters_ to a known "tombstone" value.
296           // 2. Notifies all the waiters that the operation has stopped.
297           // 3. Sets the "is running" bit in the ref count to 0.
298           __notify_waiters();
299         } else {
300           stdexec::start(__shared_op_);
301         }
302       }
303     }
304 
305     template <class _StopToken>
__try_add_waiterstdexec::__shared::__shared_state306     auto __try_add_waiter(__local_state_base* __waiter, _StopToken __stok) noexcept -> bool {
307       std::unique_lock __lock{__mutex_};
308       if (__waiters_.front() == &__tombstone_) {
309         // The work has already completed. Notify the waiter immediately.
310         __lock.unlock();
311         __waiter->__notify();
312         return true;
313       } else if (__stok.stop_requested()) {
314         // Stop has been requested. Do not add the waiter.
315         return false;
316       } else {
317         // Add the waiter to the list.
318         __waiters_.push_front(__waiter);
319         return true;
320       }
321     }
322 
323     /// @brief This is called when the shared async operation completes.
324     /// @post __waiters_ is set to a known "tombstone" value.
325     template <class _Tag, class... _As>
__completestdexec::__shared::__shared_state326     void __complete(_Tag, _As&&... __as) noexcept {
327       STDEXEC_TRY {
328         using __tuple_t = __decayed_tuple<_Tag, _As...>;
329         __results_.template emplace<__tuple_t>(_Tag(), static_cast<_As&&>(__as)...);
330       }
331       STDEXEC_CATCH_ALL {
332         using __tuple_t = __decayed_tuple<set_error_t, std::exception_ptr>;
333         __results_.template emplace<__tuple_t>(set_error, std::current_exception());
334       }
335 
336       __notify_waiters();
337     }
338 
339     /// @brief This is called when the shared async operation completes.
340     /// @post __waiters_ is set to a known "tombstone" value.
__notify_waitersstdexec::__shared::__shared_state341     void __notify_waiters() noexcept {
342       __waiters_list_t __waiters_copy{&__tombstone_};
343 
344       // Set the waiters list to a known "tombstone" value that we can check later.
345       {
346         std::lock_guard __lock{__mutex_};
347         __waiters_.swap(__waiters_copy);
348       }
349 
350       STDEXEC_ASSERT(__waiters_copy.front() != &__tombstone_);
351       for (auto __itr = __waiters_copy.begin(); __itr != __waiters_copy.end();) {
352         __local_state_base* __item = *__itr;
353 
354         // We must increment the iterator before calling notify, since notify may end up
355         // triggering *__item to be destructed on another thread, and the intrusive slist's
356         // iterator increment relies on __item.
357         ++__itr;
358         __item->__notify();
359       }
360 
361       // Set the "is running" bit in the ref count to zero. Delete the shared state if the
362       // ref-count is now zero.
363       __set_completed();
364     }
365   };
366 
367   template <class _CvrefSender, class _Env>
368   __shared_state(_CvrefSender&&, _Env) -> __shared_state<_CvrefSender, _Env>;
369 
370   template <class _Cvref, class _CvrefSender, class _Env>
371   using __make_completions = __try_make_completion_signatures<
372     // NOT TO SPEC:
373     // See https://github.com/cplusplus/sender-receiver/issues/23
374     _CvrefSender,
375     __env_t<_Env>,
376     completion_signatures<
377       set_error_t(__minvoke<_Cvref, std::exception_ptr>),
378       set_stopped_t()
379     >, // NOT TO SPEC
380     __mtransform<_Cvref, __mcompose<__q<completion_signatures>, __qf<set_value_t>>>,
381     __mtransform<_Cvref, __mcompose<__q<completion_signatures>, __qf<set_error_t>>>
382   >;
383 
384   // split completes with const T&. ensure_started completes with T&&.
385   template <class _Tag>
386   using __cvref_results_t =
387     __mcompose<__if_c<same_as<_Tag, __split::__split_t>, __cpclr, __cp>, __q<__decay_t>>;
388 
389   // NOTE: the use of __mapply in the return type below takes advantage of the fact that _ShState
390   // denotes an instance of the __shared_state template, which is parameterized on the
391   // cvref-qualified sender and the environment.
392   template <class _Tag, class _ShState>
393   using __completions =
394     __mapply<__mbind_front_q<__make_completions, __cvref_results_t<_Tag>>, _ShState>;
395 
396   template <class _CvrefSender, class _Env, bool _Copyable = true>
397   struct __box {
398     using __tag_t = __if_c<_Copyable, __split::__split_t, __ensure_started::__ensure_started_t>;
399     using __sh_state_t = __shared_state<_CvrefSender, _Env>;
400 
__boxstdexec::__shared::__box401     __box(__tag_t, __sh_state_t* __sh_state) noexcept
402       : __sh_state_(__sh_state) {
403     }
404 
__boxstdexec::__shared::__box405     __box(__box&& __other) noexcept
406       : __sh_state_(std::exchange(__other.__sh_state_, nullptr)) {
407     }
408 
__boxstdexec::__shared::__box409     __box(const __box& __other) noexcept
410       requires _Copyable
411       : __sh_state_(__other.__sh_state_) {
412       __sh_state_->__inc_ref();
413     }
414 
~__boxstdexec::__shared::__box415     ~__box() {
416       if (__sh_state_) {
417         __sh_state_->__detach();
418       }
419     }
420 
421     __sh_state_t* __sh_state_;
422   };
423 
424   template <class _CvrefSender, class _Env>
425   __box(__split::__split_t, __shared_state<_CvrefSender, _Env>*) -> __box<_CvrefSender, _Env, true>;
426 
427   template <class _CvrefSender, class _Env>
428   __box(__ensure_started::__ensure_started_t, __shared_state<_CvrefSender, _Env>*)
429     -> __box<_CvrefSender, _Env, false>;
430 
431   template <class _Tag>
432   struct __shared_impl : __sexpr_defaults {
433     static constexpr auto get_state =
434       []<class _CvrefSender, class _Receiver>(_CvrefSender&& __sndr, _Receiver&) noexcept
435       -> __local_state<_CvrefSender, _Receiver> {
436       static_assert(sender_expr_for<_CvrefSender, _Tag>);
437       return __local_state<_CvrefSender, _Receiver>{static_cast<_CvrefSender&&>(__sndr)};
438     };
439 
440     static constexpr auto get_completion_signatures =
441       []<class _Self>(const _Self&, auto&&...) noexcept
442       -> __completions<_Tag, typename __data_of<_Self>::__sh_state_t> {
443       static_assert(sender_expr_for<_Self, _Tag>);
444       return {};
445     };
446 
447     static constexpr auto start = []<class _Sender, class _Receiver>(
448                                     __local_state<_Sender, _Receiver>& __self,
449                                     _Receiver& __rcvr) noexcept -> void {
450       // Scenario: there are no more split senders, this is the only operation state, the
451       // underlying operation has not yet been started, and the receiver's stop token is already
452       // in the "stop requested" state. Then registering the stop callback will call
453       // __local_state::operator() on __self synchronously. It may also be called asynchronously
454       // at any point after the callback is registered. Beware. We are guaranteed, however, that
455       // __local_state::operator() will not complete the operation or decrement the shared state's
456       // ref count until after __self has been added to the waiters list.
457       const auto __stok = stdexec::get_stop_token(stdexec::get_env(__rcvr));
458       __self.__on_stop_.emplace(__stok, __self);
459 
460       // We haven't put __self in the waiters list yet and we are holding a ref count to
461       // __sh_state_, so nothing can happen to the __sh_state_ here.
462 
463       // Start the shared op. As an optimization, skip it if the receiver's stop token has already
464       // been signaled.
465       if (!__stok.stop_requested()) {
466         __self.__sh_state_->__try_start();
467         if (__self.__sh_state_->__try_add_waiter(&__self, __stok)) {
468           // successfully added the waiter
469           return;
470         }
471       }
472 
473       // Otherwise, failed to add the waiter because of a stop-request.
474       // Complete synchronously with set_stopped().
475       __self.__on_stop_.reset();
476       std::exchange(__self.__sh_state_, nullptr)->__detach();
477       stdexec::set_stopped(static_cast<_Receiver&&>(__rcvr));
478     };
479   };
480 } // namespace stdexec::__shared
481