1 /* 2 * Copyright (c) 2023 Maikel Nadolski 3 * Copyright (c) 2023 NVIDIA Corporation 4 * 5 * Licensed under the Apache License Version 2.0 with LLVM Exceptions 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * https://llvm.org/LICENSE.txt 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 #pragma once 18 19 #include "../stdexec/execution.hpp" 20 21 namespace exec 22 { 23 struct sequence_sender_t : stdexec::sender_t 24 {}; 25 26 using sequence_tag [[deprecated("Renamed to exec::sequence_sender_t")]] = 27 exec::sequence_sender_t; 28 29 namespace __sequence_sndr 30 { 31 using namespace stdexec; 32 33 template <class _Haystack> 34 struct __mall_contained_in_impl 35 { 36 template <class... _Needles> 37 using __f = __mand<__mapply<__mcontains<_Needles>, _Haystack>...>; 38 }; 39 40 template <class _Needles, class _Haystack> 41 using __mall_contained_in = 42 __mapply<__mall_contained_in_impl<_Haystack>, _Needles>; 43 44 template <class _Needles, class _Haystack> 45 concept __all_contained_in = __v<__mall_contained_in<_Needles, _Haystack>>; 46 47 // This concept checks if a given sender satisfies the requirements to be 48 // returned from `set_next`. 49 template <class _Sender, class _Env = empty_env> 50 concept next_sender = // 51 sender_in<_Sender, _Env> // 52 && 53 __all_contained_in<completion_signatures_of_t<_Sender, _Env>, 54 completion_signatures<set_value_t(), set_stopped_t()>>; 55 56 // This is a sequence-receiver CPO that is used to apply algorithms on an input 57 // sender and it returns a next-sender. `set_next` is usually called in a 58 // context where a sender will be connected to a receiver. Since calling 59 // `set_next` usually involves constructing senders it is allowed to throw an 60 // excpetion, which needs to be handled by a calling sequence-operation. The 61 // returned object is a sender that can complete with `set_value_t()` or 62 // `set_stopped_t()`. 63 struct set_next_t 64 { 65 template <receiver _Receiver, sender _Item> 66 requires tag_invocable<set_next_t, _Receiver&, _Item> 67 auto operator()(_Receiver& __rcvr, _Item&& __item) const 68 noexcept(nothrow_tag_invocable<set_next_t, _Receiver&, _Item>) 69 -> tag_invoke_result_t<set_next_t, _Receiver&, _Item> 70 { 71 static_assert( 72 next_sender<tag_invoke_result_t<set_next_t, _Receiver&, _Item>>, 73 "The sender returned from set_next is required to complete with set_value_t() or " 74 "set_stopped_t()"); 75 return tag_invoke(*this, __rcvr, static_cast<_Item&&>(__item)); 76 } 77 }; 78 } // namespace __sequence_sndr 79 80 using __sequence_sndr::set_next_t; 81 inline constexpr set_next_t set_next; 82 83 template <class _Receiver, class _Sender> 84 using next_sender_of_t = decltype(exec::set_next( 85 stdexec::__declval<stdexec::__decay_t<_Receiver>&>(), 86 stdexec::__declval<_Sender>())); 87 88 namespace __sequence_sndr 89 { 90 91 template <class _ReceiverId> 92 struct __stopped_means_break 93 { 94 struct __t 95 { 96 using receiver_concept = stdexec::receiver_t; 97 using __id = __stopped_means_break; 98 using _Receiver = stdexec::__t<_ReceiverId>; 99 using _Token = stop_token_of_t<env_of_t<_Receiver>>; 100 STDEXEC_ATTRIBUTE((no_unique_address)) 101 _Receiver __rcvr_; 102 103 auto get_env() const noexcept -> env_of_t<_Receiver> 104 { 105 return stdexec::get_env(__rcvr_); 106 } 107 108 void set_value() noexcept 109 requires __callable<set_value_t, _Receiver> 110 { 111 return stdexec::set_value(static_cast<_Receiver&&>(__rcvr_)); 112 } 113 114 void set_stopped() noexcept 115 requires __callable<set_value_t, _Receiver> && 116 (unstoppable_token<_Token> || 117 __callable<set_stopped_t, _Receiver>) 118 { 119 if constexpr (unstoppable_token<_Token>) 120 { 121 stdexec::set_value(static_cast<_Receiver&&>(__rcvr_)); 122 } 123 else 124 { 125 auto __token = 126 stdexec::get_stop_token(stdexec::get_env(__rcvr_)); 127 if (__token.stop_requested()) 128 { 129 stdexec::set_stopped(static_cast<_Receiver&&>(__rcvr_)); 130 } 131 else 132 { 133 stdexec::set_value(static_cast<_Receiver&&>(__rcvr_)); 134 } 135 } 136 } 137 }; 138 }; 139 140 template <class _Rcvr> 141 using __stopped_means_break_t = 142 __t<__stopped_means_break<__id<__decay_t<_Rcvr>>>>; 143 } // namespace __sequence_sndr 144 145 template <class _Sender> 146 concept __enable_sequence_sender = // 147 requires { typename _Sender::sender_concept; } && // 148 stdexec::same_as<typename _Sender::sender_concept, sequence_sender_t>; 149 150 template <class _Sender> 151 inline constexpr bool enable_sequence_sender = 152 __enable_sequence_sender<_Sender>; 153 154 template <class... _Senders> 155 struct item_types 156 {}; 157 158 template <class _Tp> 159 concept __has_item_typedef = requires { typename _Tp::item_types; }; 160 161 ///////////////////////////////////////////////////////////////////////////// 162 // [execution.sndtraits] 163 namespace __sequence_sndr 164 { 165 struct get_item_types_t; 166 template <class _Sender, class _Env> 167 using __tfx_sender = 168 transform_sender_result_t<__late_domain_of_t<_Sender, _Env>, _Sender, _Env>; 169 170 template <class _Sender, class _Env> 171 concept __with_tag_invoke = // 172 tag_invocable<get_item_types_t, __tfx_sender<_Sender, _Env>, _Env>; 173 template <class _Sender, class _Env> 174 using __member_alias_t = // 175 typename __decay_t<__tfx_sender<_Sender, _Env>>::item_types; 176 177 template <class _Sender, class _Env> 178 concept __with_member_alias = __mvalid<__member_alias_t, _Sender, _Env>; 179 180 struct get_item_types_t 181 { 182 template <class _Sender, class _Env> 183 static auto __impl() 184 { 185 static_assert(sizeof(_Sender), 186 "Incomplete type used with get_item_types"); 187 static_assert(sizeof(_Env), "Incomplete type used with get_item_types"); 188 using _TfxSender = __tfx_sender<_Sender, _Env>; 189 if constexpr (__with_tag_invoke<_Sender, _Env>) 190 { 191 using _Result = 192 tag_invoke_result_t<get_item_types_t, _TfxSender, _Env>; 193 return static_cast<_Result (*)()>(nullptr); 194 } 195 else if constexpr (__with_member_alias<_TfxSender, _Env>) 196 { 197 using _Result = __member_alias_t<_TfxSender, _Env>; 198 return static_cast<_Result (*)()>(nullptr); 199 } 200 else if constexpr (sender_in<_TfxSender, _Env> && 201 !enable_sequence_sender< 202 stdexec::__decay_t<_TfxSender>>) 203 { 204 using _Result = item_types<stdexec::__decay_t<_TfxSender>>; 205 return static_cast<_Result (*)()>(nullptr); 206 } 207 else if constexpr (__is_debug_env<_Env>) 208 { 209 using __tag_invoke::tag_invoke; 210 // This ought to cause a hard error that indicates where the problem 211 // is. 212 using _Completions [[maybe_unused]] = 213 tag_invoke_result_t<get_item_types_t, 214 __tfx_sender<_Sender, _Env>, _Env>; 215 return static_cast<__debug::__completion_signatures (*)()>(nullptr); 216 } 217 else 218 { 219 using _Result = 220 __mexception<_UNRECOGNIZED_SENDER_TYPE_<>, 221 _WITH_SENDER_<_Sender>, _WITH_ENVIRONMENT_<_Env>>; 222 return static_cast<_Result (*)()>(nullptr); 223 } 224 } 225 226 template <class _Sender, class _Env = empty_env> 227 constexpr auto operator()(_Sender&&, _Env&& = {}) const noexcept 228 -> decltype(__impl<_Sender, _Env>()()) 229 { 230 return {}; 231 } 232 }; 233 } // namespace __sequence_sndr 234 235 using __sequence_sndr::get_item_types_t; 236 inline constexpr get_item_types_t get_item_types{}; 237 238 template <class _Sender, class... _Env> 239 using item_types_of_t = decltype(get_item_types(stdexec::__declval<_Sender>(), 240 stdexec::__declval<_Env>()...)); 241 242 template <class _Sender, class... _Env> 243 concept sequence_sender = // 244 stdexec::sender_in<_Sender, _Env...> && // 245 enable_sequence_sender<stdexec::__decay_t<_Sender>>; 246 247 template <class _Sender, class... _Env> 248 concept has_sequence_item_types = 249 requires(_Sender&& __sndr, _Env&&... __env) { 250 get_item_types(static_cast<_Sender&&>(__sndr), 251 static_cast<_Env&&>(__env)...); 252 }; 253 254 template <class _Sender, class... _Env> 255 concept sequence_sender_in = // 256 stdexec::sender_in<_Sender, _Env...> && // 257 has_sequence_item_types<_Sender, _Env...> && // 258 sequence_sender<_Sender, _Env...>; 259 260 template <class _Receiver> 261 struct _WITH_RECEIVER_ 262 {}; 263 264 template <class _Item> 265 struct _MISSING_SET_NEXT_OVERLOAD_FOR_ITEM_ 266 {}; 267 268 template <class _Receiver, class _Item> 269 auto __try_item(_Item*) // 270 -> stdexec::__mexception<_MISSING_SET_NEXT_OVERLOAD_FOR_ITEM_<_Item>, 271 _WITH_RECEIVER_<_Receiver>>; 272 273 template <class _Receiver, class _Item> 274 requires stdexec::__callable<set_next_t, _Receiver&, _Item> 275 auto __try_item(_Item*) -> stdexec::__msuccess; 276 277 template <class _Receiver, class... _Items> 278 auto __try_items(exec::item_types<_Items...>*) // 279 -> decltype((stdexec::__msuccess(), ..., 280 exec::__try_item<_Receiver>(static_cast<_Items*>(nullptr)))); 281 282 template <class _Receiver, class _Items> 283 concept __sequence_receiver_of = 284 requires(_Items* __items) { 285 { 286 exec::__try_items<stdexec::__decay_t<_Receiver>>(__items) 287 } -> stdexec::__ok; 288 }; 289 290 template <class _Receiver, class _SequenceItems> 291 concept sequence_receiver_of = // 292 stdexec::receiver<_Receiver> && // 293 __sequence_receiver_of<_Receiver, _SequenceItems>; 294 295 template <class _Completions> 296 using __to_sequence_completions_t = // 297 stdexec::__transform_completion_signatures< 298 _Completions, 299 stdexec::__mconst< 300 stdexec::completion_signatures<stdexec::set_value_t()>>::__f, 301 stdexec::__sigs::__default_set_error, 302 stdexec::completion_signatures<stdexec::set_stopped_t()>, 303 stdexec::__concat_completion_signatures>; 304 305 template <class _Sender, class... _Env> 306 using __item_completion_signatures = // 307 stdexec::transform_completion_signatures< 308 stdexec::__completion_signatures_of_t<_Sender, _Env...>, 309 stdexec::completion_signatures<stdexec::set_value_t()>, 310 stdexec::__mconst<stdexec::completion_signatures<>>::__f>; 311 312 template <class _Sequence, class... _Env> 313 using __sequence_completion_signatures = // 314 stdexec::transform_completion_signatures< 315 stdexec::__completion_signatures_of_t<_Sequence, _Env...>, 316 stdexec::completion_signatures<stdexec::set_value_t()>, 317 stdexec::__mconst<stdexec::completion_signatures<>>::__f>; 318 319 template <class _Sequence, class... _Env> 320 using __sequence_completion_signatures_of_t = // 321 stdexec::__mapply< 322 stdexec::__mtransform< 323 stdexec::__mbind_back_q<__item_completion_signatures, _Env...>, 324 stdexec::__mbind_back< 325 stdexec::__mtry_q<stdexec::__concat_completion_signatures>, 326 __sequence_completion_signatures<_Sequence, _Env...>>>, 327 item_types_of_t<_Sequence, _Env...>>; 328 329 template <class _Receiver, class _Sender> 330 concept sequence_receiver_from = // 331 stdexec::receiver<_Receiver> && // 332 stdexec::sender_in<_Sender, stdexec::env_of_t<_Receiver>> && // 333 sequence_receiver_of< 334 _Receiver, item_types_of_t<_Sender, stdexec::env_of_t<_Receiver>>> && // 335 ((sequence_sender_in<_Sender, stdexec::env_of_t<_Receiver>> && 336 stdexec::receiver_of<_Receiver, 337 stdexec::completion_signatures_of_t< 338 _Sender, stdexec::env_of_t<_Receiver>>>) || // 339 (!sequence_sender_in<_Sender, stdexec::env_of_t<_Receiver>> && 340 stdexec::__receiver_from< 341 __sequence_sndr::__stopped_means_break_t<_Receiver>, 342 next_sender_of_t<_Receiver, _Sender>>)); 343 344 namespace __sequence_sndr 345 { 346 struct subscribe_t; 347 348 template <class _Env> 349 using __single_sender_completion_sigs = 350 __if_c<unstoppable_token<stop_token_of_t<_Env>>, 351 completion_signatures<set_value_t()>, 352 completion_signatures<set_value_t(), set_stopped_t()>>; 353 354 template <class _Sender, class _Receiver> 355 concept __next_connectable = 356 receiver<_Receiver> && // 357 sender_in<_Sender, env_of_t<_Receiver>> && // 358 !sequence_sender_in<_Sender, env_of_t<_Receiver>> && // 359 sequence_receiver_of<_Receiver, 360 item_types<stdexec::__decay_t<_Sender>>> && // 361 sender_to<next_sender_of_t<_Receiver, _Sender>, 362 __stopped_means_break_t<_Receiver>>; 363 364 template <class _Sender, class _Receiver> 365 concept __subscribeable_with_tag_invoke = 366 receiver<_Receiver> && // 367 sequence_sender_in<_Sender, env_of_t<_Receiver>> && // 368 sequence_receiver_from<_Receiver, _Sender> && // 369 tag_invocable<subscribe_t, _Sender, _Receiver>; 370 371 struct subscribe_t 372 { 373 template <class _Sender, class _Receiver> 374 using __tfx_sndr = __tfx_sender<_Sender, env_of_t<_Receiver>>; 375 376 template <class _Sender, class _Receiver> 377 static constexpr auto __select_impl() noexcept 378 { 379 using _Domain = __late_domain_of_t<_Sender, env_of_t<_Receiver&>>; 380 constexpr bool _NothrowTfxSender = 381 __nothrow_callable<transform_sender_t, _Domain, _Sender, 382 env_of_t<_Receiver&>>; 383 using _TfxSender = __tfx_sndr<_Sender, _Receiver>; 384 if constexpr (__next_connectable<_TfxSender, _Receiver>) 385 { 386 using _Result = 387 connect_result_t<next_sender_of_t<_Receiver, _TfxSender>, 388 __stopped_means_break_t<_Receiver>>; 389 constexpr bool _Nothrow = 390 __nothrow_connectable<next_sender_of_t<_Receiver, _TfxSender>, 391 __stopped_means_break_t<_Receiver>>; 392 return static_cast<_Result (*)() noexcept(_Nothrow)>(nullptr); 393 } 394 else if constexpr (__subscribeable_with_tag_invoke<_TfxSender, 395 _Receiver>) 396 { 397 using _Result = 398 tag_invoke_result_t<subscribe_t, _TfxSender, _Receiver>; 399 constexpr bool _Nothrow = // 400 _NothrowTfxSender && 401 nothrow_tag_invocable<subscribe_t, _TfxSender, _Receiver>; 402 return static_cast<_Result (*)() noexcept(_Nothrow)>(nullptr); 403 } 404 else 405 { 406 return static_cast<__debug::__debug_operation (*)() noexcept>( 407 nullptr); 408 } 409 } 410 411 template <class _Sender, class _Receiver> 412 using __select_impl_t = decltype(__select_impl<_Sender, _Receiver>()); 413 414 template <sender _Sender, receiver _Receiver> 415 requires __next_connectable<__tfx_sndr<_Sender, _Receiver>, 416 _Receiver> || 417 __subscribeable_with_tag_invoke<__tfx_sndr<_Sender, _Receiver>, 418 _Receiver> || 419 __is_debug_env<env_of_t<_Receiver>> 420 auto operator()(_Sender&& __sndr, _Receiver&& __rcvr) const 421 noexcept(__nothrow_callable<__select_impl_t<_Sender, _Receiver>>) 422 -> __call_result_t<__select_impl_t<_Sender, _Receiver>> 423 { 424 using _TfxSender = __tfx_sndr<_Sender, _Receiver>; 425 auto&& __env = get_env(__rcvr); 426 auto __domain = __get_late_domain(__sndr, __env); 427 if constexpr (__next_connectable<_TfxSender, _Receiver>) 428 { 429 static_assert( 430 operation_state< 431 connect_result_t<next_sender_of_t<_Receiver, _TfxSender>, 432 __stopped_means_break_t<_Receiver>>>, 433 "stdexec::connect(sender, receiver) must return a type that " 434 "satisfies the operation_state concept"); 435 next_sender_of_t<_Receiver, _TfxSender> __next = set_next( 436 __rcvr, transform_sender( 437 __domain, static_cast<_Sender&&>(__sndr), __env)); 438 return stdexec::connect( 439 static_cast<next_sender_of_t<_Receiver, _TfxSender>&&>(__next), 440 __stopped_means_break_t<_Receiver>{ 441 static_cast<_Receiver&&>(__rcvr)}); 442 } 443 else if constexpr (__subscribeable_with_tag_invoke<_TfxSender, 444 _Receiver>) 445 { 446 static_assert( 447 operation_state< 448 tag_invoke_result_t<subscribe_t, _TfxSender, _Receiver>>, 449 "exec::subscribe(sender, receiver) must return a type that " 450 "satisfies the operation_state concept"); 451 return tag_invoke( 452 subscribe_t{}, 453 transform_sender(__domain, static_cast<_Sender&&>(__sndr), 454 __env), 455 static_cast<_Receiver&&>(__rcvr)); 456 } 457 else if constexpr (enable_sequence_sender< 458 stdexec::__decay_t<_TfxSender>>) 459 { 460 // This should generate an instantiate backtrace that contains 461 // useful debugging information. 462 using __tag_invoke::tag_invoke; 463 tag_invoke(*this, 464 transform_sender(__domain, 465 static_cast<_Sender&&>(__sndr), __env), 466 static_cast<_Receiver&&>(__rcvr)); 467 } 468 else 469 { 470 next_sender_of_t<_Receiver, _TfxSender> __next = set_next( 471 __rcvr, transform_sender( 472 __domain, static_cast<_Sender&&>(__sndr), __env)); 473 return tag_invoke( 474 connect_t{}, 475 static_cast<next_sender_of_t<_Receiver, _TfxSender>&&>(__next), 476 __stopped_means_break_t<_Receiver>{ 477 static_cast<_Receiver&&>(__rcvr)}); 478 } 479 } 480 481 static constexpr auto query(stdexec::forwarding_query_t) noexcept -> bool 482 { 483 return false; 484 } 485 }; 486 487 template <class _Sender, class _Receiver> 488 using subscribe_result_t = __call_result_t<subscribe_t, _Sender, _Receiver>; 489 } // namespace __sequence_sndr 490 491 using __sequence_sndr::__single_sender_completion_sigs; 492 493 using __sequence_sndr::subscribe_t; 494 inline constexpr subscribe_t subscribe; 495 496 using __sequence_sndr::subscribe_result_t; 497 498 template <class _Sender, class _Receiver> 499 concept sequence_sender_to = 500 sequence_receiver_from<_Receiver, _Sender> && // 501 requires(_Sender&& __sndr, _Receiver&& __rcvr) { 502 subscribe(static_cast<_Sender&&>(__sndr), 503 static_cast<_Receiver&&>(__rcvr)); 504 }; 505 506 template <class _Receiver> 507 concept __stoppable_receiver = // 508 stdexec::__callable<stdexec::set_value_t, _Receiver> && // 509 (stdexec::unstoppable_token< 510 stdexec::stop_token_of_t<stdexec::env_of_t<_Receiver>>> || 511 stdexec::__callable<stdexec::set_stopped_t, _Receiver>); 512 513 template <class _Receiver> 514 requires __stoppable_receiver<_Receiver> 515 void __set_value_unless_stopped(_Receiver&& __rcvr) 516 { 517 using token_type = stdexec::stop_token_of_t<stdexec::env_of_t<_Receiver>>; 518 if constexpr (stdexec::unstoppable_token<token_type>) 519 { 520 stdexec::set_value(static_cast<_Receiver&&>(__rcvr)); 521 } 522 else 523 { 524 auto token = stdexec::get_stop_token(stdexec::get_env(__rcvr)); 525 if (!token.stop_requested()) 526 { 527 stdexec::set_value(static_cast<_Receiver&&>(__rcvr)); 528 } 529 else 530 { 531 stdexec::set_stopped(static_cast<_Receiver&&>(__rcvr)); 532 } 533 } 534 } 535 } // namespace exec 536