1 /* 2 * Copyright (c) 2023 Maikel Nadolski 3 * Copyright (c) 2023 NVIDIA Corporation 4 * 5 * Licensed under the Apache License Version 2.0 with LLVM Exceptions 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * https://llvm.org/LICENSE.txt 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 #pragma once 18 19 #include "../stdexec/execution.hpp" 20 21 namespace exec 22 { 23 struct sequence_sender_t : stdexec::sender_t 24 {}; 25 26 using sequence_tag [[deprecated("Renamed to exec::sequence_sender_t")]] = 27 exec::sequence_sender_t; 28 29 namespace __sequence_sndr 30 { 31 using namespace stdexec; 32 33 template <class _Haystack> 34 struct __mall_contained_in_impl 35 { 36 template <class... _Needles> 37 using __f = __mand<__mapply<__contains<_Needles>, _Haystack>...>; 38 }; 39 40 template <class _Needles, class _Haystack> 41 using __mall_contained_in = 42 __mapply<__mall_contained_in_impl<_Haystack>, _Needles>; 43 44 template <class _Needles, class _Haystack> 45 concept __all_contained_in = __mall_contained_in<_Needles, _Haystack>::value; 46 47 // This concept checks if a given sender satisfies the requirements to be 48 // returned from `set_next`. 49 template <class _Sender, class _Env = empty_env> 50 concept next_sender = // 51 sender_in<_Sender, _Env> // 52 && 53 __all_contained_in<completion_signatures_of_t<_Sender, _Env>, 54 completion_signatures<set_value_t(), set_stopped_t()>>; 55 56 // This is a sequence-receiver CPO that is used to apply algorithms on an input 57 // sender and it returns a next-sender. `set_next` is usually called in a 58 // context where a sender will be connected to a receiver. Since calling 59 // `set_next` usually involves constructing senders it is allowed to throw an 60 // excpetion, which needs to be handled by a calling sequence-operation. The 61 // returned object is a sender that can complete with `set_value_t()` or 62 // `set_stopped_t()`. 63 struct set_next_t 64 { 65 template <receiver _Receiver, sender _Item> 66 requires tag_invocable<set_next_t, _Receiver&, _Item> 67 auto operator()(_Receiver& __rcvr, _Item&& __item) const 68 noexcept(nothrow_tag_invocable<set_next_t, _Receiver&, _Item>) 69 -> tag_invoke_result_t<set_next_t, _Receiver&, _Item> 70 { 71 static_assert( 72 next_sender<tag_invoke_result_t<set_next_t, _Receiver&, _Item>>, 73 "The sender returned from set_next is required to complete with set_value_t() or " 74 "set_stopped_t()"); 75 return tag_invoke(*this, __rcvr, (_Item&&)__item); 76 } 77 }; 78 } // namespace __sequence_sndr 79 80 using __sequence_sndr::set_next_t; 81 inline constexpr set_next_t set_next; 82 83 template <class _Receiver, class _Sender> 84 using next_sender_of_t = decltype(exec::set_next( 85 stdexec::__declval<stdexec::__decay_t<_Receiver>&>(), 86 stdexec::__declval<_Sender>())); 87 88 namespace __sequence_sndr 89 { 90 91 template <class _ReceiverId> 92 struct __stopped_means_break 93 { 94 struct __t 95 { 96 using receiver_concept = stdexec::receiver_t; 97 using __id = __stopped_means_break; 98 using _Receiver = stdexec::__t<_ReceiverId>; 99 using _Token = stop_token_of_t<env_of_t<_Receiver>>; 100 STDEXEC_ATTRIBUTE((no_unique_address)) _Receiver __rcvr_; 101 102 template <same_as<get_env_t> _GetEnv, same_as<__t> _Self> 103 friend env_of_t<_Receiver> tag_invoke(_GetEnv, 104 const _Self& __self) noexcept 105 { 106 return stdexec::get_env(__self.__rcvr_); 107 } 108 109 template <same_as<set_value_t> _SetValue, same_as<__t> _Self> 110 requires __callable<set_value_t, _Receiver&&> 111 friend void tag_invoke(_SetValue, _Self&& __self) noexcept 112 { 113 return stdexec::set_value(static_cast<_Receiver&&>(__self.__rcvr_)); 114 } 115 116 template <same_as<set_stopped_t> _SetStopped, same_as<__t> _Self> 117 requires __callable<set_value_t, _Receiver&&> && 118 (unstoppable_token<_Token> || 119 __callable<set_stopped_t, _Receiver &&>) 120 friend void tag_invoke(_SetStopped, _Self&& __self) noexcept 121 { 122 if constexpr (unstoppable_token<_Token>) 123 { 124 stdexec::set_value(static_cast<_Receiver&&>(__self.__rcvr_)); 125 } 126 else 127 { 128 auto __token = 129 stdexec::get_stop_token(stdexec::get_env(__self.__rcvr_)); 130 if (__token.stop_requested()) 131 { 132 stdexec::set_stopped( 133 static_cast<_Receiver&&>(__self.__rcvr_)); 134 } 135 else 136 { 137 stdexec::set_value( 138 static_cast<_Receiver&&>(__self.__rcvr_)); 139 } 140 } 141 } 142 }; 143 }; 144 145 template <class _Rcvr> 146 using __stopped_means_break_t = 147 __t<__stopped_means_break<__id<__decay_t<_Rcvr>>>>; 148 } // namespace __sequence_sndr 149 150 template <class _Sender> 151 concept __enable_sequence_sender = // 152 requires { typename _Sender::sender_concept; } && // 153 stdexec::same_as<typename _Sender::sender_concept, sequence_sender_t>; 154 155 template <class _Sender> 156 inline constexpr bool enable_sequence_sender = 157 __enable_sequence_sender<_Sender>; 158 159 template <class... _Senders> 160 struct item_types 161 {}; 162 163 template <class _Tp> 164 concept __has_item_typedef = requires { typename _Tp::item_types; }; 165 166 ///////////////////////////////////////////////////////////////////////////// 167 // [execution.sndtraits] 168 namespace __sequence_sndr 169 { 170 struct get_item_types_t; 171 template <class _Sender, class _Env> 172 using __tfx_sender = 173 transform_sender_result_t<__late_domain_of_t<_Sender, _Env>, _Sender, _Env>; 174 175 template <class _Sender, class _Env> 176 concept __with_tag_invoke = // 177 tag_invocable<get_item_types_t, __tfx_sender<_Sender, _Env>, _Env>; 178 template <class _Sender, class _Env> 179 using __member_alias_t = // 180 typename __decay_t<__tfx_sender<_Sender, _Env>>::item_types; 181 182 template <class _Sender, class _Env> 183 concept __with_member_alias = __mvalid<__member_alias_t, _Sender, _Env>; 184 185 struct get_item_types_t 186 { 187 template <class _Sender, class _Env> 188 static auto __impl() 189 { 190 static_assert(sizeof(_Sender), 191 "Incomplete type used with get_item_types"); 192 static_assert(sizeof(_Env), "Incomplete type used with get_item_types"); 193 using _TfxSender = __tfx_sender<_Sender, _Env>; 194 if constexpr (__with_tag_invoke<_Sender, _Env>) 195 { 196 using _Result = 197 tag_invoke_result_t<get_item_types_t, _TfxSender, _Env>; 198 return (_Result(*)()) nullptr; 199 } 200 else if constexpr (__with_member_alias<_TfxSender, _Env>) 201 { 202 using _Result = __member_alias_t<_TfxSender, _Env>; 203 return (_Result(*)()) nullptr; 204 } 205 else if constexpr (sender_in<_TfxSender, _Env> && 206 !enable_sequence_sender< 207 stdexec::__decay_t<_TfxSender>>) 208 { 209 using _Result = item_types<stdexec::__decay_t<_TfxSender>>; 210 return (_Result(*)()) nullptr; 211 } 212 else if constexpr (__is_debug_env<_Env>) 213 { 214 using __tag_invoke::tag_invoke; 215 // This ought to cause a hard error that indicates where the problem 216 // is. 217 using _Completions [[maybe_unused]] = 218 tag_invoke_result_t<get_item_types_t, 219 __tfx_sender<_Sender, _Env>, _Env>; 220 return (__debug::__completion_signatures(*)()) nullptr; 221 } 222 else 223 { 224 using _Result = 225 __mexception<_UNRECOGNIZED_SENDER_TYPE_<>, 226 _WITH_SENDER_<_Sender>, _WITH_ENVIRONMENT_<_Env>>; 227 return (_Result(*)()) nullptr; 228 } 229 } 230 231 template <class _Sender, class _Env = __default_env> 232 constexpr auto operator()(_Sender&&, const _Env&) const noexcept 233 -> decltype(__impl<_Sender, _Env>()()) 234 { 235 return {}; 236 } 237 }; 238 } // namespace __sequence_sndr 239 240 using __sequence_sndr::get_item_types_t; 241 inline constexpr get_item_types_t get_item_types{}; 242 243 template <class _Sender, class _Env> 244 using item_types_of_t = decltype(get_item_types(stdexec::__declval<_Sender>(), 245 stdexec::__declval<_Env>())); 246 247 template <class _Sender, class _Env> 248 concept sequence_sender = // 249 stdexec::sender_in<_Sender, _Env> && // 250 enable_sequence_sender<stdexec::__decay_t<_Sender>>; 251 252 template <class _Sender, class _Env> 253 concept has_sequence_item_types = 254 requires(_Sender&& __sndr, _Env&& __env) { 255 get_item_types((_Sender&&)__sndr, (_Env&&)__env); 256 }; 257 258 template <class _Sender, class _Env> 259 concept sequence_sender_in = // 260 stdexec::sender_in<_Sender, _Env> && // 261 has_sequence_item_types<_Sender, _Env> && // 262 sequence_sender<_Sender, _Env>; 263 264 template <class _Receiver> 265 struct _WITH_RECEIVER_ 266 {}; 267 268 template <class _Item> 269 struct _MISSING_SET_NEXT_OVERLOAD_FOR_ITEM_ 270 {}; 271 272 template <class _Receiver, class _Item> 273 auto __try_item(_Item*) 274 -> stdexec::__mexception<_MISSING_SET_NEXT_OVERLOAD_FOR_ITEM_<_Item>, 275 _WITH_RECEIVER_<_Receiver>>; 276 277 template <class _Receiver, class _Item> 278 requires stdexec::__callable<set_next_t, _Receiver&, _Item> 279 stdexec::__msuccess __try_item(_Item*); 280 281 template <class _Receiver, class... _Items> 282 auto __try_items(exec::item_types<_Items...>*) 283 -> decltype((stdexec::__msuccess(), ..., 284 exec::__try_item<_Receiver>((_Items*)nullptr))); 285 286 template <class _Receiver, class _Items> 287 concept __sequence_receiver_of = 288 requires(_Items* __items) { 289 { 290 exec::__try_items<stdexec::__decay_t<_Receiver>>(__items) 291 } -> stdexec::__ok; 292 }; 293 294 template <class _Receiver, class _SequenceItems> 295 concept sequence_receiver_of = // 296 stdexec::receiver<_Receiver> && // 297 __sequence_receiver_of<_Receiver, _SequenceItems>; 298 299 template <class _Items, class _Env> 300 using __concat_item_signatures_t = stdexec::__mapply< 301 stdexec::__q<stdexec::__concat_completion_signatures_t>, 302 stdexec::__mapply<stdexec::__transform<stdexec::__mbind_back_q< 303 stdexec::completion_signatures_of_t, _Env>>, 304 _Items>>; 305 306 template <class _Completions> 307 using __gather_error_signals = 308 stdexec::__only_gather_signal<stdexec::set_error_t, _Completions>; 309 310 template <class _Completions> 311 using __gather_stopped_signals = 312 stdexec::__only_gather_signal<stdexec::set_stopped_t, _Completions>; 313 314 template <class _Completions> 315 using __to_sequence_completions_t = stdexec::__concat_completion_signatures_t< 316 stdexec::completion_signatures<stdexec::set_value_t()>, 317 __gather_error_signals<_Completions>, 318 __gather_stopped_signals<_Completions>>; 319 320 template <class _Sender, class _Env> 321 using __to_sequence_completion_signatures = stdexec::make_completion_signatures< 322 _Sender, _Env, stdexec::completion_signatures<stdexec::set_value_t()>, 323 stdexec::__mconst<stdexec::completion_signatures<>>::__f>; 324 325 template <class _Sequence, class _Env> 326 using __sequence_completion_signatures_of_t = 327 stdexec::__concat_completion_signatures_t< 328 stdexec::__try_make_completion_signatures< 329 _Sequence, _Env, 330 stdexec::completion_signatures<stdexec::set_value_t()>, 331 stdexec::__mconst<stdexec::completion_signatures<>>>, 332 stdexec::__mapply< 333 stdexec::__q<stdexec::__concat_completion_signatures_t>, 334 stdexec::__mapply<stdexec::__transform<stdexec::__mbind_back_q< 335 __to_sequence_completion_signatures, _Env>>, 336 item_types_of_t<_Sequence, _Env>>>>; 337 338 template <class _Receiver, class _Sender> 339 concept sequence_receiver_from = // 340 stdexec::receiver<_Receiver> && // 341 stdexec::sender_in<_Sender, stdexec::env_of_t<_Receiver>> && // 342 sequence_receiver_of< 343 _Receiver, item_types_of_t<_Sender, stdexec::env_of_t<_Receiver>>> && // 344 ((sequence_sender_in<_Sender, stdexec::env_of_t<_Receiver>> && 345 stdexec::receiver_of<_Receiver, 346 stdexec::completion_signatures_of_t< 347 _Sender, stdexec::env_of_t<_Receiver>>>) || // 348 (!sequence_sender_in<_Sender, stdexec::env_of_t<_Receiver>> && 349 stdexec::__receiver_from< 350 __sequence_sndr::__stopped_means_break_t<_Receiver>, 351 next_sender_of_t<_Receiver, _Sender>>)); 352 353 namespace __sequence_sndr 354 { 355 struct subscribe_t; 356 357 template <class _Env> 358 using __single_sender_completion_sigs = 359 __if_c<unstoppable_token<stop_token_of_t<_Env>>, 360 completion_signatures<set_value_t()>, 361 completion_signatures<set_value_t(), set_stopped_t()>>; 362 363 template <class _Sender, class _Receiver> 364 concept __next_connectable_with_tag_invoke = 365 receiver<_Receiver> && // 366 sender_in<_Sender, env_of_t<_Receiver>> && // 367 !sequence_sender_in<_Sender, env_of_t<_Receiver>> && // 368 sequence_receiver_of<_Receiver, 369 item_types<stdexec::__decay_t<_Sender>>> && // 370 __receiver_from<__stopped_means_break_t<_Receiver>, 371 next_sender_of_t<_Receiver, _Sender>> && // 372 __connect::__connectable_with_tag_invoke< 373 next_sender_of_t<_Receiver, _Sender>&&, 374 __stopped_means_break_t<_Receiver>>; 375 376 template <class _Sender, class _Receiver> 377 concept __subscribeable_with_tag_invoke = 378 receiver<_Receiver> && // 379 sequence_sender_in<_Sender, env_of_t<_Receiver>> && // 380 sequence_receiver_from<_Receiver, _Sender> && // 381 tag_invocable<subscribe_t, _Sender, _Receiver>; 382 383 struct subscribe_t 384 { 385 template <class _Sender, class _Receiver> 386 using __tfx_sndr = __tfx_sender<_Sender, env_of_t<_Receiver>>; 387 388 template <class _Sender, class _Receiver> 389 static constexpr auto __select_impl() noexcept 390 { 391 using _Domain = __late_domain_of_t<_Sender, env_of_t<_Receiver&>>; 392 constexpr bool _NothrowTfxSender = 393 __nothrow_callable<get_env_t, _Receiver&> && 394 __nothrow_callable<transform_sender_t, _Domain, _Sender, 395 env_of_t<_Receiver&>>; 396 using _TfxSender = __tfx_sndr<_Sender, _Receiver>; 397 if constexpr (__next_connectable_with_tag_invoke<_TfxSender, _Receiver>) 398 { 399 using _Result = 400 tag_invoke_result_t<connect_t, 401 next_sender_of_t<_Receiver, _TfxSender>, 402 __stopped_means_break_t<_Receiver>>; 403 constexpr bool _Nothrow = 404 nothrow_tag_invocable<connect_t, 405 next_sender_of_t<_Receiver, _TfxSender>, 406 __stopped_means_break_t<_Receiver>>; 407 return static_cast<_Result (*)() noexcept(_Nothrow)>(nullptr); 408 } 409 else if constexpr (__subscribeable_with_tag_invoke<_TfxSender, 410 _Receiver>) 411 { 412 using _Result = 413 tag_invoke_result_t<subscribe_t, _TfxSender, _Receiver>; 414 constexpr bool _Nothrow = // 415 _NothrowTfxSender && 416 nothrow_tag_invocable<subscribe_t, _TfxSender, _Receiver>; 417 return static_cast<_Result (*)() noexcept(_Nothrow)>(nullptr); 418 } 419 else 420 { 421 return static_cast<__debug::__debug_operation (*)() noexcept>( 422 nullptr); 423 } 424 } 425 426 template <class _Sender, class _Receiver> 427 using __select_impl_t = decltype(__select_impl<_Sender, _Receiver>()); 428 429 template <sender _Sender, receiver _Receiver> 430 requires __next_connectable_with_tag_invoke< 431 __tfx_sndr<_Sender, _Receiver>, _Receiver> || 432 __subscribeable_with_tag_invoke<__tfx_sndr<_Sender, _Receiver>, 433 _Receiver> || 434 __is_debug_env<env_of_t<_Receiver>> 435 auto operator()(_Sender&& __sndr, _Receiver&& __rcvr) const 436 noexcept(__nothrow_callable<__select_impl_t<_Sender, _Receiver>>) 437 -> __call_result_t<__select_impl_t<_Sender, _Receiver>> 438 { 439 using _TfxSender = __tfx_sndr<_Sender, _Receiver>; 440 auto&& __env = get_env(__rcvr); 441 auto __domain = __get_late_domain(__sndr, __env); 442 if constexpr (__next_connectable_with_tag_invoke<_TfxSender, _Receiver>) 443 { 444 static_assert( 445 operation_state<tag_invoke_result_t< 446 connect_t, next_sender_of_t<_Receiver, _TfxSender>, 447 __stopped_means_break_t<_Receiver>>>, 448 "stdexec::connect(sender, receiver) must return a type that " 449 "satisfies the operation_state concept"); 450 next_sender_of_t<_Receiver, _TfxSender> __next = set_next( 451 __rcvr, transform_sender(__domain, (_Sender&&)__sndr, __env)); 452 return tag_invoke( 453 connect_t{}, 454 static_cast<next_sender_of_t<_Receiver, _TfxSender>&&>(__next), 455 __stopped_means_break_t<_Receiver>{(_Receiver&&)__rcvr}); 456 } 457 else if constexpr (__subscribeable_with_tag_invoke<_TfxSender, 458 _Receiver>) 459 { 460 static_assert( 461 operation_state< 462 tag_invoke_result_t<subscribe_t, _TfxSender, _Receiver>>, 463 "exec::subscribe(sender, receiver) must return a type that " 464 "satisfies the operation_state concept"); 465 return tag_invoke( 466 subscribe_t{}, 467 transform_sender(__domain, (_Sender&&)__sndr, __env), 468 (_Receiver&&)__rcvr); 469 } 470 else if constexpr (enable_sequence_sender< 471 stdexec::__decay_t<_TfxSender>>) 472 { 473 // This should generate an instantiate backtrace that contains 474 // useful debugging information. 475 using __tag_invoke::tag_invoke; 476 tag_invoke(*this, 477 transform_sender(__domain, (_Sender&&)__sndr, __env), 478 (_Receiver&&)__rcvr); 479 } 480 else 481 { 482 next_sender_of_t<_Receiver, _TfxSender> __next = set_next( 483 __rcvr, transform_sender(__domain, (_Sender&&)__sndr, __env)); 484 return tag_invoke( 485 connect_t{}, 486 static_cast<next_sender_of_t<_Receiver, _TfxSender>&&>(__next), 487 __stopped_means_break_t<_Receiver>{(_Receiver&&)__rcvr}); 488 } 489 } 490 491 friend constexpr bool tag_invoke(forwarding_query_t, subscribe_t) noexcept 492 { 493 return false; 494 } 495 }; 496 497 template <class _Sender, class _Receiver> 498 using subscribe_result_t = __call_result_t<subscribe_t, _Sender, _Receiver>; 499 } // namespace __sequence_sndr 500 501 using __sequence_sndr::__single_sender_completion_sigs; 502 503 using __sequence_sndr::subscribe_t; 504 inline constexpr subscribe_t subscribe; 505 506 using __sequence_sndr::subscribe_result_t; 507 508 template <class _Sender, class _Receiver> 509 concept sequence_sender_to = 510 sequence_receiver_from<_Receiver, _Sender> && // 511 requires(_Sender&& __sndr, _Receiver&& __rcvr) { 512 { 513 subscribe((_Sender&&)__sndr, (_Receiver&&)__rcvr) 514 }; 515 }; 516 517 template <class _Receiver> 518 concept __stoppable_receiver = // 519 stdexec::__callable<stdexec::set_value_t, _Receiver> && // 520 (stdexec::unstoppable_token< 521 stdexec::stop_token_of_t<stdexec::env_of_t<_Receiver>>> || 522 stdexec::__callable<stdexec::set_stopped_t, _Receiver>); 523 524 template <class _Receiver> 525 requires __stoppable_receiver<_Receiver> 526 void __set_value_unless_stopped(_Receiver&& __rcvr) 527 { 528 using token_type = stdexec::stop_token_of_t<stdexec::env_of_t<_Receiver>>; 529 if constexpr (stdexec::unstoppable_token<token_type>) 530 { 531 stdexec::set_value(static_cast<_Receiver&&>(__rcvr)); 532 } 533 else 534 { 535 auto token = stdexec::get_stop_token(stdexec::get_env(__rcvr)); 536 if (!token.stop_requested()) 537 { 538 stdexec::set_value(static_cast<_Receiver&&>(__rcvr)); 539 } 540 else 541 { 542 stdexec::set_stopped(static_cast<_Receiver&&>(__rcvr)); 543 } 544 } 545 } 546 } // namespace exec 547