1 /* 2 * Copyright (c) 2023 Maikel Nadolski 3 * Copyright (c) 2023 NVIDIA Corporation 4 * 5 * Licensed under the Apache License Version 2.0 with LLVM Exceptions 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * https://llvm.org/LICENSE.txt 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 #pragma once 18 19 #include "../stdexec/execution.hpp" 20 21 namespace exec 22 { 23 struct sequence_tag 24 {}; 25 26 namespace __sequence_sndr 27 { 28 using namespace stdexec; 29 30 template <class _Haystack> 31 struct __mall_contained_in_impl 32 { 33 template <class... _Needles> 34 using __f = __mand<__mapply<__contains<_Needles>, _Haystack>...>; 35 }; 36 37 template <class _Needles, class _Haystack> 38 using __mall_contained_in = 39 __mapply<__mall_contained_in_impl<_Haystack>, _Needles>; 40 41 template <class _Needles, class _Haystack> 42 concept __all_contained_in = __mall_contained_in<_Needles, _Haystack>::value; 43 44 // This concept checks if a given sender satisfies the requirements to be 45 // returned from `set_next`. 46 template <class _Sender, class _Env = empty_env> 47 concept next_sender = // 48 sender_in<_Sender, _Env> // 49 && 50 __all_contained_in<completion_signatures_of_t<_Sender, _Env>, 51 completion_signatures<set_value_t(), set_stopped_t()>>; 52 53 // This is a sequence-receiver CPO that is used to apply algorithms on an input 54 // sender and it returns a next-sender. `set_next` is usually called in a 55 // context where a sender will be connected to a receiver. Since calling 56 // `set_next` usually involves constructing senders it is allowed to throw an 57 // excpetion, which needs to be handled by a calling sequence-operation. The 58 // returned object is a sender that can complete with `set_value_t()` or 59 // `set_stopped_t()`. 60 struct set_next_t 61 { 62 template <receiver _Receiver, sender _Item> 63 requires tag_invocable<set_next_t, _Receiver&, _Item> 64 auto operator()(_Receiver& __rcvr, _Item&& __item) const 65 noexcept(nothrow_tag_invocable<set_next_t, _Receiver&, _Item>) 66 -> tag_invoke_result_t<set_next_t, _Receiver&, _Item> 67 { 68 static_assert( 69 next_sender<tag_invoke_result_t<set_next_t, _Receiver&, _Item>>, 70 "The sender returned from set_next is required to complete with set_value_t() or " 71 "set_stopped_t()"); 72 return tag_invoke(*this, __rcvr, (_Item&&)__item); 73 } 74 }; 75 } // namespace __sequence_sndr 76 77 using __sequence_sndr::set_next_t; 78 inline constexpr set_next_t set_next; 79 80 template <class _Receiver, class _Sender> 81 using next_sender_of_t = decltype(exec::set_next( 82 stdexec::__declval<stdexec::__decay_t<_Receiver>&>(), 83 stdexec::__declval<_Sender>())); 84 85 namespace __sequence_sndr 86 { 87 88 template <class _ReceiverId> 89 struct __stopped_means_break 90 { 91 struct __t 92 { 93 using is_receiver = void; 94 using __id = __stopped_means_break; 95 using _Receiver = stdexec::__t<_ReceiverId>; 96 using _Token = stop_token_of_t<env_of_t<_Receiver>>; 97 STDEXEC_ATTRIBUTE((no_unique_address)) _Receiver __rcvr_; 98 99 template <same_as<get_env_t> _GetEnv, same_as<__t> _Self> 100 friend env_of_t<_Receiver> tag_invoke(_GetEnv, 101 const _Self& __self) noexcept 102 { 103 return stdexec::get_env(__self.__rcvr_); 104 } 105 106 template <same_as<set_value_t> _SetValue, same_as<__t> _Self> 107 requires __callable<set_value_t, _Receiver&&> 108 friend void tag_invoke(_SetValue, _Self&& __self) noexcept 109 { 110 return stdexec::set_value(static_cast<_Receiver&&>(__self.__rcvr_)); 111 } 112 113 template <same_as<set_stopped_t> _SetStopped, same_as<__t> _Self> 114 requires __callable<set_value_t, _Receiver&&> && 115 (unstoppable_token<_Token> || 116 __callable<set_stopped_t, _Receiver &&>) 117 friend void tag_invoke(_SetStopped, _Self&& __self) noexcept 118 { 119 if constexpr (unstoppable_token<_Token>) 120 { 121 stdexec::set_value(static_cast<_Receiver&&>(__self.__rcvr_)); 122 } 123 else 124 { 125 auto __token = 126 stdexec::get_stop_token(stdexec::get_env(__self.__rcvr_)); 127 if (__token.stop_requested()) 128 { 129 stdexec::set_stopped( 130 static_cast<_Receiver&&>(__self.__rcvr_)); 131 } 132 else 133 { 134 stdexec::set_value( 135 static_cast<_Receiver&&>(__self.__rcvr_)); 136 } 137 } 138 } 139 }; 140 }; 141 142 template <class _Rcvr> 143 using __stopped_means_break_t = 144 __t<__stopped_means_break<__id<__decay_t<_Rcvr>>>>; 145 } // namespace __sequence_sndr 146 147 template <class _Sender> 148 concept __enable_sequence_sender = // 149 requires { typename _Sender::is_sender; } && // 150 stdexec::same_as<typename _Sender::is_sender, sequence_tag>; 151 152 template <class _Sender> 153 inline constexpr bool enable_sequence_sender = 154 __enable_sequence_sender<_Sender>; 155 156 template <class... _Senders> 157 struct item_types 158 {}; 159 160 template <class _Tp> 161 concept __has_item_typedef = requires { typename _Tp::item_types; }; 162 163 ///////////////////////////////////////////////////////////////////////////// 164 // [execution.sndtraits] 165 namespace __sequence_sndr 166 { 167 struct get_item_types_t; 168 template <class _Sender, class _Env> 169 using __tfx_sender = 170 transform_sender_result_t<__late_domain_of_t<_Sender, _Env>, _Sender, _Env>; 171 172 template <class _Sender, class _Env> 173 concept __with_tag_invoke = // 174 tag_invocable<get_item_types_t, __tfx_sender<_Sender, _Env>, _Env>; 175 template <class _Sender, class _Env> 176 using __member_alias_t = // 177 typename __decay_t<__tfx_sender<_Sender, _Env>>::item_types; 178 179 template <class _Sender, class _Env> 180 concept __with_member_alias = __mvalid<__member_alias_t, _Sender, _Env>; 181 182 struct get_item_types_t 183 { 184 template <class _Sender, class _Env> 185 static auto __impl() 186 { 187 static_assert(sizeof(_Sender), 188 "Incomplete type used with get_item_types"); 189 static_assert(sizeof(_Env), "Incomplete type used with get_item_types"); 190 using _TfxSender = __tfx_sender<_Sender, _Env>; 191 if constexpr (__with_tag_invoke<_Sender, _Env>) 192 { 193 using _Result = 194 tag_invoke_result_t<get_item_types_t, _TfxSender, _Env>; 195 return (_Result(*)()) nullptr; 196 } 197 else if constexpr (__with_member_alias<_TfxSender, _Env>) 198 { 199 using _Result = __member_alias_t<_TfxSender, _Env>; 200 return (_Result(*)()) nullptr; 201 } 202 else if constexpr (sender_in<_TfxSender, _Env> && 203 !enable_sequence_sender< 204 stdexec::__decay_t<_TfxSender>>) 205 { 206 using _Result = item_types<stdexec::__decay_t<_TfxSender>>; 207 return (_Result(*)()) nullptr; 208 } 209 else if constexpr (__is_debug_env<_Env>) 210 { 211 using __tag_invoke::tag_invoke; 212 // This ought to cause a hard error that indicates where the problem 213 // is. 214 using _Completions [[maybe_unused]] = 215 tag_invoke_result_t<get_item_types_t, 216 __tfx_sender<_Sender, _Env>, _Env>; 217 return (__debug::__completion_signatures(*)()) nullptr; 218 } 219 else 220 { 221 using _Result = 222 __mexception<_UNRECOGNIZED_SENDER_TYPE_<>, 223 _WITH_SENDER_<_Sender>, _WITH_ENVIRONMENT_<_Env>>; 224 return (_Result(*)()) nullptr; 225 } 226 } 227 228 template <class _Sender, class _Env = __default_env> 229 constexpr auto operator()(_Sender&&, const _Env&) const noexcept 230 -> decltype(__impl<_Sender, _Env>()()) 231 { 232 return {}; 233 } 234 }; 235 } // namespace __sequence_sndr 236 237 using __sequence_sndr::get_item_types_t; 238 inline constexpr get_item_types_t get_item_types{}; 239 240 template <class _Sender, class _Env> 241 using item_types_of_t = decltype(get_item_types(stdexec::__declval<_Sender>(), 242 stdexec::__declval<_Env>())); 243 244 template <class _Sender, class _Env> 245 concept sequence_sender = // 246 stdexec::sender_in<_Sender, _Env> && // 247 enable_sequence_sender<stdexec::__decay_t<_Sender>>; 248 249 template <class _Sender, class _Env> 250 concept has_sequence_item_types = 251 requires(_Sender&& __sndr, _Env&& __env) { 252 get_item_types((_Sender&&)__sndr, (_Env&&)__env); 253 }; 254 255 template <class _Sender, class _Env> 256 concept sequence_sender_in = // 257 stdexec::sender_in<_Sender, _Env> && // 258 has_sequence_item_types<_Sender, _Env> && // 259 sequence_sender<_Sender, _Env>; 260 261 template <class _Receiver> 262 struct _WITH_RECEIVER_ 263 {}; 264 265 template <class _Item> 266 struct _MISSING_SET_NEXT_OVERLOAD_FOR_ITEM_ 267 {}; 268 269 template <class _Receiver, class _Item> 270 auto __try_item(_Item*) 271 -> stdexec::__mexception<_MISSING_SET_NEXT_OVERLOAD_FOR_ITEM_<_Item>, 272 _WITH_RECEIVER_<_Receiver>>; 273 274 template <class _Receiver, class _Item> 275 requires stdexec::__callable<set_next_t, _Receiver&, _Item> 276 stdexec::__msuccess __try_item(_Item*); 277 278 template <class _Receiver, class... _Items> 279 auto __try_items(exec::item_types<_Items...>*) 280 -> decltype((stdexec::__msuccess(), ..., 281 exec::__try_item<_Receiver>((_Items*)nullptr))); 282 283 template <class _Receiver, class _Items> 284 concept __sequence_receiver_of = 285 requires(_Items* __items) { 286 { 287 exec::__try_items<stdexec::__decay_t<_Receiver>>(__items) 288 } -> stdexec::__ok; 289 }; 290 291 template <class _Receiver, class _SequenceItems> 292 concept sequence_receiver_of = // 293 stdexec::receiver<_Receiver> && // 294 __sequence_receiver_of<_Receiver, _SequenceItems>; 295 296 template <class _Items, class _Env> 297 using __concat_item_signatures_t = stdexec::__mapply< 298 stdexec::__q<stdexec::__concat_completion_signatures_t>, 299 stdexec::__mapply<stdexec::__transform<stdexec::__mbind_back_q< 300 stdexec::completion_signatures_of_t, _Env>>, 301 _Items>>; 302 303 template <class _Completions> 304 using __gather_error_signals = 305 stdexec::__only_gather_signal<stdexec::set_error_t, _Completions>; 306 307 template <class _Completions> 308 using __gather_stopped_signals = 309 stdexec::__only_gather_signal<stdexec::set_stopped_t, _Completions>; 310 311 template <class _Completions> 312 using __to_sequence_completions_t = stdexec::__concat_completion_signatures_t< 313 stdexec::completion_signatures<stdexec::set_value_t()>, 314 __gather_error_signals<_Completions>, 315 __gather_stopped_signals<_Completions>>; 316 317 template <class _Sender, class _Env> 318 using __to_sequence_completion_signatures = stdexec::make_completion_signatures< 319 _Sender, _Env, stdexec::completion_signatures<stdexec::set_value_t()>, 320 stdexec::__mconst<stdexec::completion_signatures<>>::__f>; 321 322 template <class _Sequence, class _Env> 323 using __sequence_completion_signatures_of_t = 324 stdexec::__concat_completion_signatures_t< 325 stdexec::__try_make_completion_signatures< 326 _Sequence, _Env, 327 stdexec::completion_signatures<stdexec::set_value_t()>, 328 stdexec::__mconst<stdexec::completion_signatures<>>>, 329 stdexec::__mapply< 330 stdexec::__q<stdexec::__concat_completion_signatures_t>, 331 stdexec::__mapply<stdexec::__transform<stdexec::__mbind_back_q< 332 __to_sequence_completion_signatures, _Env>>, 333 item_types_of_t<_Sequence, _Env>>>>; 334 335 template <class _Receiver, class _Sender> 336 concept sequence_receiver_from = // 337 stdexec::receiver<_Receiver> && // 338 stdexec::sender_in<_Sender, stdexec::env_of_t<_Receiver>> && // 339 sequence_receiver_of< 340 _Receiver, item_types_of_t<_Sender, stdexec::env_of_t<_Receiver>>> && // 341 ((sequence_sender_in<_Sender, stdexec::env_of_t<_Receiver>> && 342 stdexec::receiver_of<_Receiver, 343 stdexec::completion_signatures_of_t< 344 _Sender, stdexec::env_of_t<_Receiver>>>) || // 345 (!sequence_sender_in<_Sender, stdexec::env_of_t<_Receiver>> && 346 stdexec::__receiver_from< 347 __sequence_sndr::__stopped_means_break_t<_Receiver>, 348 next_sender_of_t<_Receiver, _Sender>>)); 349 350 namespace __sequence_sndr 351 { 352 struct subscribe_t; 353 354 template <class _Env> 355 using __single_sender_completion_sigs = 356 __if_c<unstoppable_token<stop_token_of_t<_Env>>, 357 completion_signatures<set_value_t()>, 358 completion_signatures<set_value_t(), set_stopped_t()>>; 359 360 template <class _Sender, class _Receiver> 361 concept __next_connectable_with_tag_invoke = 362 receiver<_Receiver> && // 363 sender_in<_Sender, env_of_t<_Receiver>> && // 364 !sequence_sender_in<_Sender, env_of_t<_Receiver>> && // 365 sequence_receiver_of<_Receiver, 366 item_types<stdexec::__decay_t<_Sender>>> && // 367 __receiver_from<__stopped_means_break_t<_Receiver>, 368 next_sender_of_t<_Receiver, _Sender>> && // 369 __connect::__connectable_with_tag_invoke< 370 next_sender_of_t<_Receiver, _Sender>&&, 371 __stopped_means_break_t<_Receiver>>; 372 373 template <class _Sender, class _Receiver> 374 concept __subscribeable_with_tag_invoke = 375 receiver<_Receiver> && // 376 sequence_sender_in<_Sender, env_of_t<_Receiver>> && // 377 sequence_receiver_from<_Receiver, _Sender> && // 378 tag_invocable<subscribe_t, _Sender, _Receiver>; 379 380 struct subscribe_t 381 { 382 template <class _Sender, class _Receiver> 383 using __tfx_sndr = __tfx_sender<_Sender, env_of_t<_Receiver>>; 384 385 template <class _Sender, class _Receiver> 386 static constexpr auto __select_impl() noexcept 387 { 388 using _Domain = __late_domain_of_t<_Sender, env_of_t<_Receiver&>>; 389 constexpr bool _NothrowTfxSender = 390 __nothrow_callable<get_env_t, _Receiver&> && 391 __nothrow_callable<transform_sender_t, _Domain, _Sender, 392 env_of_t<_Receiver&>>; 393 using _TfxSender = __tfx_sndr<_Sender, _Receiver>; 394 if constexpr (__next_connectable_with_tag_invoke<_TfxSender, _Receiver>) 395 { 396 using _Result = 397 tag_invoke_result_t<connect_t, 398 next_sender_of_t<_Receiver, _TfxSender>, 399 __stopped_means_break_t<_Receiver>>; 400 constexpr bool _Nothrow = 401 nothrow_tag_invocable<connect_t, 402 next_sender_of_t<_Receiver, _TfxSender>, 403 __stopped_means_break_t<_Receiver>>; 404 return static_cast<_Result (*)() noexcept(_Nothrow)>(nullptr); 405 } 406 else if constexpr (__subscribeable_with_tag_invoke<_TfxSender, 407 _Receiver>) 408 { 409 using _Result = 410 tag_invoke_result_t<subscribe_t, _TfxSender, _Receiver>; 411 constexpr bool _Nothrow = // 412 _NothrowTfxSender && 413 nothrow_tag_invocable<subscribe_t, _TfxSender, _Receiver>; 414 return static_cast<_Result (*)() noexcept(_Nothrow)>(nullptr); 415 } 416 else 417 { 418 return static_cast<__debug::__debug_operation (*)() noexcept>( 419 nullptr); 420 } 421 } 422 423 template <class _Sender, class _Receiver> 424 using __select_impl_t = decltype(__select_impl<_Sender, _Receiver>()); 425 426 template <sender _Sender, receiver _Receiver> 427 requires __next_connectable_with_tag_invoke< 428 __tfx_sndr<_Sender, _Receiver>, _Receiver> || 429 __subscribeable_with_tag_invoke<__tfx_sndr<_Sender, _Receiver>, 430 _Receiver> || 431 __is_debug_env<env_of_t<_Receiver>> 432 auto operator()(_Sender&& __sndr, _Receiver&& __rcvr) const 433 noexcept(__nothrow_callable<__select_impl_t<_Sender, _Receiver>>) 434 -> __call_result_t<__select_impl_t<_Sender, _Receiver>> 435 { 436 using _TfxSender = __tfx_sndr<_Sender, _Receiver>; 437 auto&& __env = get_env(__rcvr); 438 auto __domain = __get_late_domain(__sndr, __env); 439 if constexpr (__next_connectable_with_tag_invoke<_TfxSender, _Receiver>) 440 { 441 static_assert( 442 operation_state<tag_invoke_result_t< 443 connect_t, next_sender_of_t<_Receiver, _TfxSender>, 444 __stopped_means_break_t<_Receiver>>>, 445 "stdexec::connect(sender, receiver) must return a type that " 446 "satisfies the operation_state concept"); 447 next_sender_of_t<_Receiver, _TfxSender> __next = set_next( 448 __rcvr, transform_sender(__domain, (_Sender&&)__sndr, __env)); 449 return tag_invoke( 450 connect_t{}, 451 static_cast<next_sender_of_t<_Receiver, _TfxSender>&&>(__next), 452 __stopped_means_break_t<_Receiver>{(_Receiver&&)__rcvr}); 453 } 454 else if constexpr (__subscribeable_with_tag_invoke<_TfxSender, 455 _Receiver>) 456 { 457 static_assert( 458 operation_state< 459 tag_invoke_result_t<subscribe_t, _TfxSender, _Receiver>>, 460 "exec::subscribe(sender, receiver) must return a type that " 461 "satisfies the operation_state concept"); 462 return tag_invoke( 463 subscribe_t{}, 464 transform_sender(__domain, (_Sender&&)__sndr, __env), 465 (_Receiver&&)__rcvr); 466 } 467 else if constexpr (enable_sequence_sender< 468 stdexec::__decay_t<_TfxSender>>) 469 { 470 // This should generate an instantiate backtrace that contains 471 // useful debugging information. 472 using __tag_invoke::tag_invoke; 473 tag_invoke(*this, 474 transform_sender(__domain, (_Sender&&)__sndr, __env), 475 (_Receiver&&)__rcvr); 476 } 477 else 478 { 479 next_sender_of_t<_Receiver, _TfxSender> __next = set_next( 480 __rcvr, transform_sender(__domain, (_Sender&&)__sndr, __env)); 481 return tag_invoke( 482 connect_t{}, 483 static_cast<next_sender_of_t<_Receiver, _TfxSender>&&>(__next), 484 __stopped_means_break_t<_Receiver>{(_Receiver&&)__rcvr}); 485 } 486 } 487 488 friend constexpr bool tag_invoke(forwarding_query_t, subscribe_t) noexcept 489 { 490 return false; 491 } 492 }; 493 494 template <class _Sender, class _Receiver> 495 using subscribe_result_t = __call_result_t<subscribe_t, _Sender, _Receiver>; 496 } // namespace __sequence_sndr 497 498 using __sequence_sndr::__single_sender_completion_sigs; 499 500 using __sequence_sndr::subscribe_t; 501 inline constexpr subscribe_t subscribe; 502 503 using __sequence_sndr::subscribe_result_t; 504 505 template <class _Sender, class _Receiver> 506 concept sequence_sender_to = 507 sequence_receiver_from<_Receiver, _Sender> && // 508 requires(_Sender&& __sndr, _Receiver&& __rcvr) { 509 { 510 subscribe((_Sender&&)__sndr, (_Receiver&&)__rcvr) 511 }; 512 }; 513 514 template <class _Receiver> 515 concept __stoppable_receiver = // 516 stdexec::__callable<stdexec::set_value_t, _Receiver> && // 517 (stdexec::unstoppable_token< 518 stdexec::stop_token_of_t<stdexec::env_of_t<_Receiver>>> || 519 stdexec::__callable<stdexec::set_stopped_t, _Receiver>); 520 521 template <class _Receiver> 522 requires __stoppable_receiver<_Receiver> 523 void __set_value_unless_stopped(_Receiver&& __rcvr) 524 { 525 using token_type = stdexec::stop_token_of_t<stdexec::env_of_t<_Receiver>>; 526 if constexpr (stdexec::unstoppable_token<token_type>) 527 { 528 stdexec::set_value(static_cast<_Receiver&&>(__rcvr)); 529 } 530 else 531 { 532 auto token = stdexec::get_stop_token(stdexec::get_env(__rcvr)); 533 if (!token.stop_requested()) 534 { 535 stdexec::set_value(static_cast<_Receiver&&>(__rcvr)); 536 } 537 else 538 { 539 stdexec::set_stopped(static_cast<_Receiver&&>(__rcvr)); 540 } 541 } 542 } 543 } // namespace exec 544