1 /*
2 * Copyright (c) 2021-2024 NVIDIA Corporation
3 *
4 * Licensed under the Apache License Version 2.0 with LLVM Exceptions
5 * (the "License"); you may not use this file except in compliance with
6 * the License. You may obtain a copy of the License at
7 *
8 * https://llvm.org/LICENSE.txt
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #pragma once
17
18 #include "__execution_fwd.hpp"
19
20 // include these after __execution_fwd.hpp
21 #include "../functional.hpp"
22 #include "../stop_token.hpp"
23 #include "__basic_sender.hpp"
24 #include "__cpo.hpp"
25 #include "__env.hpp"
26 #include "__intrusive_ptr.hpp"
27 #include "__intrusive_slist.hpp"
28 #include "__meta.hpp"
29 #include "__optional.hpp"
30 #include "__transform_completion_signatures.hpp"
31 #include "__tuple.hpp"
32 #include "__variant.hpp"
33
34 #include <exception>
35 #include <mutex>
36
37 namespace stdexec
38 {
39 ////////////////////////////////////////////////////////////////////////////
40 // shared components of split and ensure_started
41 //
42 // The split and ensure_started algorithms are very similar in implementation.
43 // The salient differences are:
44 //
45 // split: the input async operation is always connected. It is only
46 // started when one of the split senders is connected and started.
47 // split senders are copyable, so there are multiple operation states
48 // to be notified on completion. These are stored in an instrusive
49 // linked list.
50 //
51 // ensure_started: the input async operation is always started, so
52 // the internal receiver will always be completed. The ensure_started
53 // sender is move-only and single-shot, so there will only ever be one
54 // operation state to be notified on completion.
55 //
56 // The shared state should add-ref itself when the input async
57 // operation is started and release itself when its completion
58 // is notified.
59 namespace __shared
60 {
61 template <class _BaseEnv>
62 using __env_t = //
63 __env::__join_t<prop<get_stop_token_t, inplace_stop_token>,
64 _BaseEnv>; // BUGBUG NOT TO SPEC
65
66 template <class _Receiver>
__make_notify_visitor(_Receiver & __rcvr)67 auto __make_notify_visitor(_Receiver& __rcvr) noexcept
68 {
69 return [&]<class _Tuple>(_Tuple&& __tupl) noexcept -> void {
70 __tupl.apply(
71 [&](auto __tag, auto&&... __args) noexcept -> void {
72 __tag(static_cast<_Receiver&&>(__rcvr),
73 __forward_like<_Tuple>(__args)...);
74 },
75 __tupl);
76 };
77 }
78
79 struct __local_state_base : __immovable
80 {
81 using __notify_fn = void(__local_state_base*) noexcept;
82
83 __notify_fn* __notify_{};
84 __local_state_base* __next_{};
85 };
86
87 template <class _CvrefSender, class _Env>
88 struct __shared_state;
89
90 // The operation state of ensure_started, and each operation state of split, has
91 // one of these, created when the sender is connected. There are 0 or more of
92 // them for each underlying async operation. It is what ensure_started- and
93 // split-sender's `get_state` fn returns. It holds a ref count to the shared
94 // state.
95 template <class _CvrefSender, class _Receiver>
96 struct __local_state :
97 __local_state_base,
98 __enable_receiver_from_this<_CvrefSender, _Receiver,
99 __local_state<_CvrefSender, _Receiver>>
100 {
101 using __tag_t = tag_of_t<_CvrefSender>;
102 using __stok_t = stop_token_of_t<env_of_t<_Receiver>>;
103 static_assert(__one_of<__tag_t, __split::__split_t,
104 __ensure_started::__ensure_started_t>);
105
__local_statestdexec::__shared::__local_state106 explicit __local_state(_CvrefSender&& __sndr) noexcept :
107 __local_state::__local_state_base{{},
108 &__notify<tag_of_t<_CvrefSender>>},
109 __sh_state_(__get_sh_state(__sndr))
110 {}
111
~__local_statestdexec::__shared::__local_state112 ~__local_state()
113 {
114 __sh_state_t::__detach(__sh_state_);
115 }
116
117 // Stop request callback:
operator ()stdexec::__shared::__local_state118 void operator()() noexcept
119 {
120 // We reach here when a split/ensure_started sender has received a stop
121 // request from the receiver to which it is connected.
122 if (std::unique_lock __lock{__sh_state_->__mutex_})
123 {
124 // Remove this operation from the waiters list. Removal can fail if:
125 // 1. It was already removed by another thread, or
126 // 2. It hasn't been added yet (see `start` below), or
127 // 3. The underlying operation has already completed.
128 //
129 // In each case, the right thing to do is nothing. If (1) then we
130 // raced with another thread and lost. In that case, the other
131 // thread will take care of it. If (2) then `start` will take care
132 // of it. If (3) then this stop request is safe to ignore.
133 if (!__sh_state_->__waiters_.remove(this))
134 return;
135 }
136
137 // The following code and the __notify function cannot both execute.
138 // This is because the
139 // __notify function is called from the shared state's __notify_waiters
140 // function, which first sets __waiters_ to the completed state. As a
141 // result, the attempt to remove `this` from the waiters list above will
142 // fail and this stop request is ignored.
143 __sh_state_t::__detach(__sh_state_);
144 stdexec::set_stopped(static_cast<_Receiver&&>(this->__receiver()));
145 }
146
147 // This is called from __shared_state::__notify_waiters when the input async
148 // operation completes; or, if it has already completed when start is
149 // called, it is called from start:
150 // __notify cannot race with __on_stop_request. See comment in
151 // __on_stop_request.
152 template <class _Tag>
__notifystdexec::__shared::__local_state153 static void __notify(__local_state_base* __base) noexcept
154 {
155 auto* const __self = static_cast<__local_state*>(__base);
156
157 // The split algorithm sends by T const&. ensure_started sends by T&&.
158 constexpr bool __is_split = same_as<__split::__split_t, _Tag>;
159 using __variant_t = decltype(__self->__sh_state_->__results_);
160 using __cv_variant_t =
161 __if_c<__is_split, const __variant_t&, __variant_t>;
162
163 __self->__on_stop_.reset();
164
165 auto __visitor = __make_notify_visitor(__self->__receiver());
166 __variant_t::visit(__visitor, static_cast<__cv_variant_t&&>(
167 __self->__sh_state_->__results_));
168 }
169
__get_sh_statestdexec::__shared::__local_state170 static auto __get_sh_state(_CvrefSender& __sndr) noexcept
171 {
172 return __sndr
173 .apply(static_cast<_CvrefSender&&>(__sndr), __detail::__get_data())
174 .__sh_state_;
175 }
176
177 using __sh_state_ptr_t = __result_of<__get_sh_state, _CvrefSender&>;
178 using __sh_state_t = typename __sh_state_ptr_t::element_type;
179
180 __optional<stop_callback_for_t<__stok_t, __local_state&>> __on_stop_{};
181 __sh_state_ptr_t __sh_state_;
182 };
183
184 template <class _CvrefSenderId, class _EnvId>
185 struct __receiver
186 {
187 using _CvrefSender = stdexec::__cvref_t<_CvrefSenderId>;
188 using _Env = stdexec::__t<_EnvId>;
189
190 struct __t
191 {
192 using receiver_concept = receiver_t;
193 using __id = __receiver;
194
195 template <class... _As>
196 STDEXEC_ATTRIBUTE((always_inline))
set_valuestdexec::__shared::__receiver::__t197 void set_value(_As&&... __as) noexcept
198 {
199 __sh_state_->__complete(set_value_t(), static_cast<_As&&>(__as)...);
200 }
201
202 template <class _Error>
203 STDEXEC_ATTRIBUTE((always_inline))
set_errorstdexec::__shared::__receiver::__t204 void set_error(_Error&& __err) noexcept
205 {
206 __sh_state_->__complete(set_error_t(),
207 static_cast<_Error&&>(__err));
208 }
209
210 STDEXEC_ATTRIBUTE((always_inline))
set_stoppedstdexec::__shared::__receiver::__t211 void set_stopped() noexcept
212 {
213 __sh_state_->__complete(set_stopped_t());
214 }
215
get_envstdexec::__shared::__receiver::__t216 auto get_env() const noexcept -> const __env_t<_Env>&
217 {
218 return __sh_state_->__env_;
219 }
220
221 // The receiver does not hold a reference to the shared state.
222 __shared_state<_CvrefSender, _Env>* __sh_state_;
223 };
224 };
225
__get_tombstone()226 inline __local_state_base* __get_tombstone() noexcept
227 {
228 static __local_state_base __tombstone_{{}, nullptr, nullptr};
229 return &__tombstone_;
230 }
231
232 //! Heap-allocatable shared state for things like `stdexec::split`.
233 template <class _CvrefSender, class _Env>
234 struct __shared_state :
235 private __enable_intrusive_from_this<__shared_state<_CvrefSender, _Env>, 2>
236 {
237 using __receiver_t = __t<__receiver<__cvref_id<_CvrefSender>, __id<_Env>>>;
238 using __waiters_list_t = __intrusive_slist<&__local_state_base::__next_>;
239
240 using __variant_t = //
241 __transform_completion_signatures<
242 __completion_signatures_of_t<_CvrefSender, _Env>,
243 __mbind_front_q<__decayed_tuple, set_value_t>::__f,
244 __mbind_front_q<__decayed_tuple, set_error_t>::__f,
245 __tuple_for<set_error_t, std::exception_ptr>,
246 __munique<__mbind_front_q<__variant_for,
247 __tuple_for<set_stopped_t>>>::__f,
248 __tuple_for<set_error_t, std::exception_ptr>>;
249
250 static constexpr std::size_t __started_bit = 0;
251 static constexpr std::size_t __completed_bit = 1;
252
253 inplace_stop_source __stop_source_{};
254 __env_t<_Env> __env_;
255 __variant_t __results_{}; // Defaults to the "set_stopped" state
256 std::mutex __mutex_; // This mutex guards access to __waiters_.
257 __waiters_list_t __waiters_{};
258 connect_result_t<_CvrefSender, __receiver_t> __shared_op_;
259
__shared_statestdexec::__shared::__shared_state260 explicit __shared_state(_CvrefSender&& __sndr, _Env __env) :
261 __env_(__env::__join(prop{get_stop_token, __stop_source_.get_token()},
262 static_cast<_Env&&>(__env))),
263 __shared_op_(
264 connect(static_cast<_CvrefSender&&>(__sndr), __receiver_t{this}))
265 {
266 // add one ref count to account for the case where there are no watchers
267 // left but the shared op is still running.
268 this->__inc_ref();
269 }
270
271 // The caller of this wants to release their reference to the shared state.
272 // The ref count must be at least 2 at this point: one owned by the caller,
273 // and one added in the
274 // __shared_state ctor.
__detachstdexec::__shared::__shared_state275 static void __detach(__intrusive_ptr<__shared_state, 2>& __ptr) noexcept
276 {
277 // Ask the intrusive ptr to stop managing the reference count so we can
278 // manage it manually.
279 if (auto* __self = __ptr.__release_())
280 {
281 auto __old = __self->__dec_ref();
282 STDEXEC_ASSERT(__count(__old) >= 2);
283
284 if (__count(__old) == 2)
285 {
286 // The last watcher has released its reference. Asked the shared
287 // op to stop.
288 static_cast<__shared_state*>(__self)
289 ->__stop_source_.request_stop();
290
291 // Additionally, if the shared op was never started, or if it
292 // has already completed, then the shared state is no longer
293 // needed. Decrement the ref count to 0 here, which will delete
294 // __self.
295 if (!__bit<__started_bit>(__old) ||
296 __bit<__completed_bit>(__old))
297 {
298 __self->__dec_ref();
299 }
300 }
301 }
302 }
303
304 /// @post The started bit is set in the shared state's ref count, OR the
305 /// __waiters_ list is set to the known "tombstone" value indicating
306 /// completion.
__try_startstdexec::__shared::__shared_state307 void __try_start() noexcept
308 {
309 // With the split algorithm, multiple split senders can be started
310 // simultaneously, but only one should start the shared async operation.
311 // If the "started" bit is set, then someone else has already started
312 // the shared operation. Do nothing.
313 if (this->template __is_set<__started_bit>())
314 {
315 return;
316 }
317 else if (__bit<__started_bit>(
318 this->template __set_bit<__started_bit>()))
319 {
320 return;
321 }
322 else if (__stop_source_.stop_requested())
323 {
324 // Stop has already been requested. Rather than starting the
325 // operation, complete with set_stopped immediately.
326 // 1. Sets __waiters_ to a known "tombstone" value
327 // 2. Notifies all the waiters that the operation has stopped
328 // 3. Sets the "completed" bit in the ref count.
329 __notify_waiters();
330 return;
331 }
332 else
333 {
334 stdexec::start(__shared_op_);
335 }
336 }
337
338 template <class _StopToken>
__try_add_waiterstdexec::__shared::__shared_state339 bool __try_add_waiter(__local_state_base* __waiter,
340 _StopToken __stok) noexcept
341 {
342 std::unique_lock __lock{__mutex_};
343 if (__waiters_.front() == __get_tombstone())
344 {
345 // The work has already completed. Notify the waiter immediately.
346 __lock.unlock();
347 __waiter->__notify_(__waiter);
348 return true;
349 }
350 else if (__stok.stop_requested())
351 {
352 // Stop has been requested. Do not add the waiter.
353 return false;
354 }
355 else
356 {
357 // Add the waiter to the list.
358 __waiters_.push_front(__waiter);
359 return true;
360 }
361 }
362
363 /// @brief This is called when the shared async operation completes.
364 /// @post __waiters_ is set to a known "tombstone" value.
365 template <class _Tag, class... _As>
__completestdexec::__shared::__shared_state366 void __complete(_Tag, _As&&... __as) noexcept
367 {
368 try
369 {
370 using __tuple_t = __decayed_tuple<_Tag, _As...>;
371 __results_.template emplace<__tuple_t>(_Tag(),
372 static_cast<_As&&>(__as)...);
373 }
374 catch (...)
375 {
376 using __tuple_t = __decayed_tuple<set_error_t, std::exception_ptr>;
377 __results_.template emplace<__tuple_t>(set_error,
378 std::current_exception());
379 }
380
381 __notify_waiters();
382 }
383
384 /// @brief This is called when the shared async operation completes.
385 /// @post __waiters_ is set to a known "tombstone" value.
__notify_waitersstdexec::__shared::__shared_state386 void __notify_waiters() noexcept
387 {
388 __waiters_list_t __waiters_copy{__get_tombstone()};
389
390 // Set the waiters list to a known "tombstone" value that we can check
391 // later.
392 {
393 std::lock_guard __lock{__mutex_};
394 __waiters_.swap(__waiters_copy);
395 }
396
397 STDEXEC_ASSERT(__waiters_copy.front() != __get_tombstone());
398 for (auto __itr = __waiters_copy.begin();
399 __itr != __waiters_copy.end();)
400 {
401 __local_state_base* __item = *__itr;
402
403 // We must increment the iterator before calling notify, since
404 // notify may end up triggering *__item to be destructed on another
405 // thread, and the intrusive slist's iterator increment relies on
406 // __item.
407 ++__itr;
408
409 __item->__notify_(__item);
410 }
411
412 // Set the "completed" bit in the ref count. If the ref count is 1, then
413 // there are no more waiters. Release the final reference.
414 if (__count(this->template __set_bit<__completed_bit>()) == 1)
415 {
416 this->__dec_ref(); // release the extra ref count, deletes this
417 }
418 }
419 };
420
421 template <class _Cvref, class _CvrefSender, class _Env>
422 using __make_completions = //
423 __try_make_completion_signatures<
424 // NOT TO SPEC:
425 // See https://github.com/cplusplus/sender-receiver/issues/23
426 _CvrefSender, __env_t<_Env>,
427 completion_signatures<set_error_t(
428 __minvoke<_Cvref, std::exception_ptr>),
429 set_stopped_t()>, // NOT TO SPEC
430 __mtransform<_Cvref,
431 __mcompose<__q<completion_signatures>, __qf<set_value_t>>>,
432 __mtransform<
433 _Cvref, __mcompose<__q<completion_signatures>, __qf<set_error_t>>>>;
434
435 // split completes with const T&. ensure_started completes with T&&.
436 template <class _Tag>
437 using __cvref_results_t = //
438 __mcompose<__if_c<same_as<_Tag, __split::__split_t>, __cpclr, __cp>,
439 __q<__decay_t>>;
440
441 // NOTE: the use of __mapply in the return type below takes advantage of the
442 // fact that _ShState denotes an instance of the __shared_state template, which
443 // is parameterized on the cvref-qualified sender and the environment.
444 template <class _Tag, class _ShState>
445 using __completions = //
446 __mapply<__mbind_front_q<__make_completions, __cvref_results_t<_Tag>>,
447 _ShState>;
448
449 template <class _CvrefSender, class _Env, bool _Copyable = true>
450 struct __box
451 {
452 using __tag_t = __if_c<_Copyable, __split::__split_t,
453 __ensure_started::__ensure_started_t>;
454 using __sh_state_t = __shared_state<_CvrefSender, _Env>;
455
__boxstdexec::__shared::__box456 __box(__tag_t, __intrusive_ptr<__sh_state_t, 2> __sh_state) noexcept :
457 __sh_state_(std::move(__sh_state))
458 {}
459
460 __box(__box&&) noexcept = default;
461 __box(const __box&) noexcept
462 requires _Copyable
463 = default;
464
~__boxstdexec::__shared::__box465 ~__box()
466 {
467 __sh_state_t::__detach(__sh_state_);
468 }
469
470 __intrusive_ptr<__sh_state_t, 2> __sh_state_;
471 };
472
473 template <class _CvrefSender, class _Env>
474 __box(__split::__split_t,
475 __intrusive_ptr<__shared_state<_CvrefSender, _Env>, 2>) //
476 ->__box<_CvrefSender, _Env, true>;
477
478 template <class _CvrefSender, class _Env>
479 __box(__ensure_started::__ensure_started_t,
480 __intrusive_ptr<__shared_state<_CvrefSender, _Env>, 2>)
481 -> __box<_CvrefSender, _Env, false>;
482
483 template <class _Tag>
484 struct __shared_impl : __sexpr_defaults
485 {
486 static constexpr auto get_state = //
487 []<class _CvrefSender, class _Receiver>(
488 _CvrefSender&& __sndr,
489 _Receiver&) noexcept -> __local_state<_CvrefSender, _Receiver> {
490 static_assert(sender_expr_for<_CvrefSender, _Tag>);
491 return __local_state<_CvrefSender, _Receiver>{
492 static_cast<_CvrefSender&&>(__sndr)};
493 };
494
495 static constexpr auto get_completion_signatures = //
496 []<class _Self>(const _Self&, auto&&...) noexcept
497 -> __completions<_Tag, typename __data_of<_Self>::__sh_state_t> {
498 static_assert(sender_expr_for<_Self, _Tag>);
499 return {};
500 };
501
502 static constexpr auto start = //
503 []<class _Sender, class _Receiver>(
504 __local_state<_Sender, _Receiver>& __self,
505 _Receiver& __rcvr) noexcept -> void {
506 using __sh_state_t =
507 typename __local_state<_Sender, _Receiver>::__sh_state_t;
508 // Scenario: there are no more split senders, this is the only operation
509 // state, the underlying operation has not yet been started, and the
510 // receiver's stop token is already in the "stop requested" state. Then
511 // registering the stop callback will call
512 // __on_stop_request on __self synchronously. It may also be called
513 // asynchronously at any point after the callback is registered. Beware.
514 // We are guaranteed, however, that
515 // __on_stop_request will not complete the operation or decrement the
516 // shared state's ref count until after __self has been added to the
517 // waiters list.
518 const auto __stok = stdexec::get_stop_token(stdexec::get_env(__rcvr));
519 __self.__on_stop_.emplace(__stok, __self);
520
521 // We haven't put __self in the waiters list yet and we are holding a
522 // ref count to
523 // __sh_state_, so nothing can happen to the __sh_state_ here.
524
525 // Start the shared op. As an optimization, skip it if the receiver's
526 // stop token has already been signaled.
527 if (!__stok.stop_requested())
528 {
529 __self.__sh_state_->__try_start();
530 if (__self.__sh_state_->__try_add_waiter(&__self, __stok))
531 {
532 // successfully added the waiter
533 return;
534 }
535 }
536
537 // Otherwise, failed to add the waiter because of a stop-request.
538 // Complete synchronously with set_stopped().
539 __self.__on_stop_.reset();
540 __sh_state_t::__detach(__self.__sh_state_);
541 stdexec::set_stopped(static_cast<_Receiver&&>(__rcvr));
542 };
543 };
544 } // namespace __shared
545 } // namespace stdexec
546