1 /* 2 * Copyright (c) 2021-2024 NVIDIA Corporation 3 * 4 * Licensed under the Apache License Version 2.0 with LLVM Exceptions 5 * (the "License"); you may not use this file except in compliance with 6 * the License. You may obtain a copy of the License at 7 * 8 * https://llvm.org/LICENSE.txt 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 #pragma once 17 18 #include "__execution_fwd.hpp" 19 20 // include these after __execution_fwd.hpp 21 #include "__basic_sender.hpp" 22 #include "__env.hpp" 23 #include "__intrusive_slist.hpp" 24 #include "__optional.hpp" 25 #include "__meta.hpp" 26 #include "__receivers.hpp" 27 #include "__transform_completion_signatures.hpp" 28 #include "__tuple.hpp" 29 #include "__variant.hpp" // IWYU pragma: keep 30 31 #include "../stop_token.hpp" 32 33 #include <atomic> 34 #include <exception> 35 #include <mutex> 36 #include <type_traits> 37 #include <utility> 38 39 //////////////////////////////////////////////////////////////////////////// 40 // shared components of split and ensure_started 41 // 42 // The split and ensure_started algorithms are very similar in implementation. 43 // The salient differences are: 44 // 45 // split: the input async operation is always connected. It is only 46 // started when one of the split senders is connected and started. 47 // split senders are copyable, so there are multiple operation states 48 // to be notified on completion. These are stored in an instrusive 49 // linked list. 50 // 51 // ensure_started: the input async operation is always started, so 52 // the internal receiver will always be completed. The ensure_started 53 // sender is move-only and single-shot, so there will only ever be one 54 // operation state to be notified on completion. 55 // 56 // The shared state should add-ref itself when the input async 57 // operation is started and release itself when its completion 58 // is notified. 59 namespace stdexec::__shared { 60 template <class _BaseEnv> 61 using __env_t = __join_env_t< 62 prop<get_stop_token_t, inplace_stop_token>, 63 _BaseEnv 64 >; // BUGBUG NOT TO SPEC 65 66 template <class _Receiver> 67 struct __notify_fn { 68 template <class _Tag, class... _Args> operator ()stdexec::__shared::__notify_fn69 void operator()(_Tag __tag, _Args&&... __args) const noexcept { 70 __tag(static_cast<_Receiver&&>(__rcvr_), static_cast<_Args&&>(__args)...); 71 } 72 73 _Receiver& __rcvr_; 74 }; 75 76 template <class _Receiver> __make_notify_visitor(_Receiver & __rcvr)77 auto __make_notify_visitor(_Receiver& __rcvr) noexcept { 78 return [&]<class _Tuple>(_Tuple&& __tupl) noexcept -> void { 79 __tupl.apply(__notify_fn<_Receiver>{__rcvr}, static_cast<_Tuple&&>(__tupl)); 80 }; 81 } 82 83 struct __local_state_base : __immovable { 84 using __notify_fn = void(__local_state_base*) noexcept; 85 __notifystdexec::__shared::__local_state_base86 void __notify() noexcept { 87 __notify_(this); 88 } 89 90 __notify_fn* __notify_{}; 91 __local_state_base* __next_{}; 92 }; 93 94 template <class _CvrefSender, class _Env> 95 struct __shared_state; 96 97 // The operation state of ensure_started, and each operation state of split, has one of these, 98 // created when the sender is connected. There are 0 or more of them for each underlying async 99 // operation. It is what ensure_started- and split-sender's `get_state` fn returns. It holds a 100 // ref count to the shared state. 101 template <class _CvrefSender, class _Receiver> 102 struct __local_state 103 : __local_state_base 104 , __enable_receiver_from_this<_CvrefSender, _Receiver, __local_state<_CvrefSender, _Receiver>> { 105 using __tag_t = tag_of_t<_CvrefSender>; 106 using __stok_t = stop_token_of_t<env_of_t<_Receiver>>; 107 static_assert(__one_of<__tag_t, __split::__split_t, __ensure_started::__ensure_started_t>); 108 __local_statestdexec::__shared::__local_state109 explicit __local_state(_CvrefSender&& __sndr) noexcept 110 : __local_state::__local_state_base{{}, &__notify<tag_of_t<_CvrefSender>>} 111 , __sh_state_(__get_sh_state(__sndr)) { 112 } 113 ~__local_statestdexec::__shared::__local_state114 ~__local_state() { 115 if (__sh_state_) { 116 __sh_state_->__detach(); 117 } 118 } 119 120 // Stop request callback: operator ()stdexec::__shared::__local_state121 void operator()() noexcept { 122 // We reach here when a split/ensure_started sender has received a stop request from the 123 // receiver to which it is connected. 124 if (std::unique_lock __lock{__sh_state_->__mutex_}) { 125 // Remove this operation from the waiters list. Removal can fail if: 126 // 1. It was already removed by another thread, or 127 // 2. It hasn't been added yet (see `start` below), or 128 // 3. The underlying operation has already completed. 129 130 // In each case, the right thing to do is nothing. If (1) then we raced with another 131 // thread and lost. In that case, the other thread will take care of it. If (2) then 132 // `start` will take care of it. If (3) then this stop request is safe to ignore. 133 if (!__sh_state_->__waiters_.remove(this)) 134 return; 135 } 136 137 // The following code and the __notify function cannot both execute. This is because the 138 // __notify function is called from the shared state's __notify_waiters function, which 139 // first sets __waiters_ to the completed state. As a result, the attempt to remove `this` 140 // from the waiters list above will fail and this stop request is ignored. 141 std::exchange(__sh_state_, nullptr)->__detach(); 142 stdexec::set_stopped(static_cast<_Receiver&&>(this->__receiver())); 143 } 144 145 // This is called from __shared_state::__notify_waiters when the input async operation 146 // completes; or, if it has already completed when start is called, it is called from start: 147 // __notify cannot race with __local_state::operator(). See comment in 148 // __local_state::operator(). 149 template <class _Tag> __notifystdexec::__shared::__local_state150 static void __notify(__local_state_base* __base) noexcept { 151 auto* const __self = static_cast<__local_state*>(__base); 152 153 // The split algorithm sends by T const&. ensure_started sends by T&&. 154 constexpr bool __is_split = same_as<__split::__split_t, _Tag>; 155 using __variant_t = decltype(__self->__sh_state_->__results_); 156 using __cv_variant_t = __if_c<__is_split, const __variant_t&, __variant_t>; 157 158 __self->__on_stop_.reset(); 159 160 auto __visitor = __make_notify_visitor(__self->__receiver()); 161 __variant_t::visit(__visitor, static_cast<__cv_variant_t&&>(__self->__sh_state_->__results_)); 162 } 163 __get_sh_statestdexec::__shared::__local_state164 static auto __get_sh_state(_CvrefSender& __sndr) noexcept { 165 auto __box = __sndr.apply(static_cast<_CvrefSender&&>(__sndr), __detail::__get_data()); 166 return std::exchange(__box.__sh_state_, nullptr); 167 } 168 169 using __sh_state_ptr_t = __result_of<__get_sh_state, _CvrefSender&>; 170 using __sh_state_t = std::remove_pointer_t<__sh_state_ptr_t>; 171 172 __optional<stop_callback_for_t<__stok_t, __local_state&>> __on_stop_{}; 173 __sh_state_ptr_t __sh_state_; 174 }; 175 176 template <class _CvrefSenderId, class _EnvId> 177 struct __receiver { 178 using _CvrefSender = stdexec::__cvref_t<_CvrefSenderId>; 179 using _Env = stdexec::__t<_EnvId>; 180 181 struct __t { 182 using receiver_concept = receiver_t; 183 using __id = __receiver; 184 185 template <class... _As> STDEXEC_ATTRIBUTEstdexec::__shared::__receiver::__t186 STDEXEC_ATTRIBUTE(always_inline) 187 void set_value(_As&&... __as) noexcept { 188 __sh_state_->__complete(set_value_t(), static_cast<_As&&>(__as)...); 189 } 190 191 template <class _Error> STDEXEC_ATTRIBUTEstdexec::__shared::__receiver::__t192 STDEXEC_ATTRIBUTE(always_inline) 193 void set_error(_Error&& __err) noexcept { 194 __sh_state_->__complete(set_error_t(), static_cast<_Error&&>(__err)); 195 } 196 STDEXEC_ATTRIBUTEstdexec::__shared::__receiver::__t197 STDEXEC_ATTRIBUTE(always_inline) void set_stopped() noexcept { 198 __sh_state_->__complete(set_stopped_t()); 199 } 200 get_envstdexec::__shared::__receiver::__t201 auto get_env() const noexcept -> const __env_t<_Env>& { 202 return __sh_state_->__env_; 203 } 204 205 // The receiver does not hold a reference to the shared state. 206 __shared_state<_CvrefSender, _Env>* __sh_state_; 207 }; 208 }; 209 210 //! Heap-allocatable shared state for things like `stdexec::split`. 211 template <class _CvrefSender, class _Env> 212 struct __shared_state { 213 using __receiver_t = __t<__receiver<__cvref_id<_CvrefSender>, __id<_Env>>>; 214 using __waiters_list_t = __intrusive_slist<&__local_state_base::__next_>; 215 216 using __variant_t = __transform_completion_signatures< 217 __completion_signatures_of_t<_CvrefSender, _Env>, 218 __mbind_front_q<__decayed_tuple, set_value_t>::__f, 219 __mbind_front_q<__decayed_tuple, set_error_t>::__f, 220 __tuple_for<set_error_t, std::exception_ptr>, 221 __munique<__mbind_front_q<__variant_for, __tuple_for<set_stopped_t>>>::__f, 222 __tuple_for<set_error_t, std::exception_ptr> 223 >; 224 225 inplace_stop_source __stop_source_{}; 226 __env_t<_Env> __env_; 227 __variant_t __results_{}; // Defaults to the "set_stopped" state 228 std::mutex __mutex_; // This mutex guards access to __waiters_. 229 __waiters_list_t __waiters_{}; 230 connect_result_t<_CvrefSender, __receiver_t> __shared_op_; 231 std::atomic_flag __started_{}; 232 std::atomic<std::size_t> __ref_count_{2}; 233 __local_state_base __tombstone_{}; 234 235 // Let a "consumer" be either a split/ensure_started sender, or an operation 236 // state created by connecting a split/ensure_started sender to a receiver. 237 // Let is_running be 1 if the shared operation is currently executing (after 238 // start has been called but before the receiver's completion functions have 239 // executed), and 0 otherwise. Then __ref_count_ is equal to: 240 241 // (2 * (nbr of consumers)) + is_running 242 __shared_statestdexec::__shared::__shared_state243 explicit __shared_state(_CvrefSender&& __sndr, _Env __env) 244 : __env_( 245 __env::__join( 246 prop{get_stop_token, __stop_source_.get_token()}, 247 static_cast<_Env&&>(__env))) 248 , __shared_op_(connect(static_cast<_CvrefSender&&>(__sndr), __receiver_t{this})) { 249 } 250 __inc_refstdexec::__shared::__shared_state251 void __inc_ref() noexcept { 252 __ref_count_.fetch_add(2ul, std::memory_order_relaxed); 253 } 254 __dec_refstdexec::__shared::__shared_state255 void __dec_ref() noexcept { 256 if (2ul == __ref_count_.fetch_sub(2ul, std::memory_order_acq_rel)) { 257 delete this; 258 } 259 } 260 __set_startedstdexec::__shared::__shared_state261 auto __set_started() noexcept -> bool { 262 if (__started_.test_and_set(std::memory_order_acq_rel)) { 263 return false; // already started 264 } 265 __ref_count_.fetch_add(1ul, std::memory_order_relaxed); 266 return true; 267 } 268 __set_completedstdexec::__shared::__shared_state269 void __set_completed() noexcept { 270 if (1ul == __ref_count_.fetch_sub(1ul, std::memory_order_acq_rel)) { 271 delete this; 272 } 273 } 274 __detachstdexec::__shared::__shared_state275 void __detach() noexcept { 276 if (__ref_count_.load() < 4ul) { 277 // We are the final "consumer", and we are about to release our reference 278 // to the shared state. Ask the operation to stop early. 279 __stop_source_.request_stop(); 280 } 281 __dec_ref(); 282 } 283 284 /// @post The "is running" bit is set in the shared state's ref count, OR the __waiters_ list 285 /// is set to the known "tombstone" value indicating completion. __try_startstdexec::__shared::__shared_state286 void __try_start() noexcept { 287 // With the split algorithm, multiple split senders can be started simultaneously, but 288 // only one should start the shared async operation. If the low bit is set, then 289 // someone else has already started the shared operation. Do nothing. 290 if (__set_started()) { 291 // we are the first to start the underlying operation 292 if (__stop_source_.stop_requested()) { 293 // Stop has already been requested. Rather than starting the operation, complete with 294 // set_stopped immediately. 295 // 1. Sets __waiters_ to a known "tombstone" value. 296 // 2. Notifies all the waiters that the operation has stopped. 297 // 3. Sets the "is running" bit in the ref count to 0. 298 __notify_waiters(); 299 } else { 300 stdexec::start(__shared_op_); 301 } 302 } 303 } 304 305 template <class _StopToken> __try_add_waiterstdexec::__shared::__shared_state306 auto __try_add_waiter(__local_state_base* __waiter, _StopToken __stok) noexcept -> bool { 307 std::unique_lock __lock{__mutex_}; 308 if (__waiters_.front() == &__tombstone_) { 309 // The work has already completed. Notify the waiter immediately. 310 __lock.unlock(); 311 __waiter->__notify(); 312 return true; 313 } else if (__stok.stop_requested()) { 314 // Stop has been requested. Do not add the waiter. 315 return false; 316 } else { 317 // Add the waiter to the list. 318 __waiters_.push_front(__waiter); 319 return true; 320 } 321 } 322 323 /// @brief This is called when the shared async operation completes. 324 /// @post __waiters_ is set to a known "tombstone" value. 325 template <class _Tag, class... _As> __completestdexec::__shared::__shared_state326 void __complete(_Tag, _As&&... __as) noexcept { 327 STDEXEC_TRY { 328 using __tuple_t = __decayed_tuple<_Tag, _As...>; 329 __results_.template emplace<__tuple_t>(_Tag(), static_cast<_As&&>(__as)...); 330 } 331 STDEXEC_CATCH_ALL { 332 using __tuple_t = __decayed_tuple<set_error_t, std::exception_ptr>; 333 __results_.template emplace<__tuple_t>(set_error, std::current_exception()); 334 } 335 336 __notify_waiters(); 337 } 338 339 /// @brief This is called when the shared async operation completes. 340 /// @post __waiters_ is set to a known "tombstone" value. __notify_waitersstdexec::__shared::__shared_state341 void __notify_waiters() noexcept { 342 __waiters_list_t __waiters_copy{&__tombstone_}; 343 344 // Set the waiters list to a known "tombstone" value that we can check later. 345 { 346 std::lock_guard __lock{__mutex_}; 347 __waiters_.swap(__waiters_copy); 348 } 349 350 STDEXEC_ASSERT(__waiters_copy.front() != &__tombstone_); 351 for (auto __itr = __waiters_copy.begin(); __itr != __waiters_copy.end();) { 352 __local_state_base* __item = *__itr; 353 354 // We must increment the iterator before calling notify, since notify may end up 355 // triggering *__item to be destructed on another thread, and the intrusive slist's 356 // iterator increment relies on __item. 357 ++__itr; 358 __item->__notify(); 359 } 360 361 // Set the "is running" bit in the ref count to zero. Delete the shared state if the 362 // ref-count is now zero. 363 __set_completed(); 364 } 365 }; 366 367 template <class _CvrefSender, class _Env> 368 __shared_state(_CvrefSender&&, _Env) -> __shared_state<_CvrefSender, _Env>; 369 370 template <class _Cvref, class _CvrefSender, class _Env> 371 using __make_completions = __try_make_completion_signatures< 372 // NOT TO SPEC: 373 // See https://github.com/cplusplus/sender-receiver/issues/23 374 _CvrefSender, 375 __env_t<_Env>, 376 completion_signatures< 377 set_error_t(__minvoke<_Cvref, std::exception_ptr>), 378 set_stopped_t() 379 >, // NOT TO SPEC 380 __mtransform<_Cvref, __mcompose<__q<completion_signatures>, __qf<set_value_t>>>, 381 __mtransform<_Cvref, __mcompose<__q<completion_signatures>, __qf<set_error_t>>> 382 >; 383 384 // split completes with const T&. ensure_started completes with T&&. 385 template <class _Tag> 386 using __cvref_results_t = 387 __mcompose<__if_c<same_as<_Tag, __split::__split_t>, __cpclr, __cp>, __q<__decay_t>>; 388 389 // NOTE: the use of __mapply in the return type below takes advantage of the fact that _ShState 390 // denotes an instance of the __shared_state template, which is parameterized on the 391 // cvref-qualified sender and the environment. 392 template <class _Tag, class _ShState> 393 using __completions = 394 __mapply<__mbind_front_q<__make_completions, __cvref_results_t<_Tag>>, _ShState>; 395 396 template <class _CvrefSender, class _Env, bool _Copyable = true> 397 struct __box { 398 using __tag_t = __if_c<_Copyable, __split::__split_t, __ensure_started::__ensure_started_t>; 399 using __sh_state_t = __shared_state<_CvrefSender, _Env>; 400 __boxstdexec::__shared::__box401 __box(__tag_t, __sh_state_t* __sh_state) noexcept 402 : __sh_state_(__sh_state) { 403 } 404 __boxstdexec::__shared::__box405 __box(__box&& __other) noexcept 406 : __sh_state_(std::exchange(__other.__sh_state_, nullptr)) { 407 } 408 __boxstdexec::__shared::__box409 __box(const __box& __other) noexcept 410 requires _Copyable 411 : __sh_state_(__other.__sh_state_) { 412 __sh_state_->__inc_ref(); 413 } 414 ~__boxstdexec::__shared::__box415 ~__box() { 416 if (__sh_state_) { 417 __sh_state_->__detach(); 418 } 419 } 420 421 __sh_state_t* __sh_state_; 422 }; 423 424 template <class _CvrefSender, class _Env> 425 __box(__split::__split_t, __shared_state<_CvrefSender, _Env>*) -> __box<_CvrefSender, _Env, true>; 426 427 template <class _CvrefSender, class _Env> 428 __box(__ensure_started::__ensure_started_t, __shared_state<_CvrefSender, _Env>*) 429 -> __box<_CvrefSender, _Env, false>; 430 431 template <class _Tag> 432 struct __shared_impl : __sexpr_defaults { 433 static constexpr auto get_state = 434 []<class _CvrefSender, class _Receiver>(_CvrefSender&& __sndr, _Receiver&) noexcept 435 -> __local_state<_CvrefSender, _Receiver> { 436 static_assert(sender_expr_for<_CvrefSender, _Tag>); 437 return __local_state<_CvrefSender, _Receiver>{static_cast<_CvrefSender&&>(__sndr)}; 438 }; 439 440 static constexpr auto get_completion_signatures = 441 []<class _Self>(const _Self&, auto&&...) noexcept 442 -> __completions<_Tag, typename __data_of<_Self>::__sh_state_t> { 443 static_assert(sender_expr_for<_Self, _Tag>); 444 return {}; 445 }; 446 447 static constexpr auto start = []<class _Sender, class _Receiver>( 448 __local_state<_Sender, _Receiver>& __self, 449 _Receiver& __rcvr) noexcept -> void { 450 // Scenario: there are no more split senders, this is the only operation state, the 451 // underlying operation has not yet been started, and the receiver's stop token is already 452 // in the "stop requested" state. Then registering the stop callback will call 453 // __local_state::operator() on __self synchronously. It may also be called asynchronously 454 // at any point after the callback is registered. Beware. We are guaranteed, however, that 455 // __local_state::operator() will not complete the operation or decrement the shared state's 456 // ref count until after __self has been added to the waiters list. 457 const auto __stok = stdexec::get_stop_token(stdexec::get_env(__rcvr)); 458 __self.__on_stop_.emplace(__stok, __self); 459 460 // We haven't put __self in the waiters list yet and we are holding a ref count to 461 // __sh_state_, so nothing can happen to the __sh_state_ here. 462 463 // Start the shared op. As an optimization, skip it if the receiver's stop token has already 464 // been signaled. 465 if (!__stok.stop_requested()) { 466 __self.__sh_state_->__try_start(); 467 if (__self.__sh_state_->__try_add_waiter(&__self, __stok)) { 468 // successfully added the waiter 469 return; 470 } 471 } 472 473 // Otherwise, failed to add the waiter because of a stop-request. 474 // Complete synchronously with set_stopped(). 475 __self.__on_stop_.reset(); 476 std::exchange(__self.__sh_state_, nullptr)->__detach(); 477 stdexec::set_stopped(static_cast<_Receiver&&>(__rcvr)); 478 }; 479 }; 480 } // namespace stdexec::__shared 481