1 /*
2 * Copyright (c) 2023 Maikel Nadolski
3 * Copyright (c) 2023 NVIDIA Corporation
4 *
5 * Licensed under the Apache License Version 2.0 with LLVM Exceptions
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * https://llvm.org/LICENSE.txt
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17 #pragma once
18
19 #include "../stdexec/execution.hpp"
20
21 namespace exec
22 {
23 struct sequence_sender_t : stdexec::sender_t
24 {};
25
26 using sequence_tag [[deprecated("Renamed to exec::sequence_sender_t")]] =
27 exec::sequence_sender_t;
28
29 namespace __sequence_sndr
30 {
31 using namespace stdexec;
32
33 template <class _Haystack>
34 struct __mall_contained_in_impl
35 {
36 template <class... _Needles>
37 using __f = __mand<__mapply<__contains<_Needles>, _Haystack>...>;
38 };
39
40 template <class _Needles, class _Haystack>
41 using __mall_contained_in =
42 __mapply<__mall_contained_in_impl<_Haystack>, _Needles>;
43
44 template <class _Needles, class _Haystack>
45 concept __all_contained_in = __mall_contained_in<_Needles, _Haystack>::value;
46
47 // This concept checks if a given sender satisfies the requirements to be
48 // returned from `set_next`.
49 template <class _Sender, class _Env = empty_env>
50 concept next_sender = //
51 sender_in<_Sender, _Env> //
52 &&
53 __all_contained_in<completion_signatures_of_t<_Sender, _Env>,
54 completion_signatures<set_value_t(), set_stopped_t()>>;
55
56 // This is a sequence-receiver CPO that is used to apply algorithms on an input
57 // sender and it returns a next-sender. `set_next` is usually called in a
58 // context where a sender will be connected to a receiver. Since calling
59 // `set_next` usually involves constructing senders it is allowed to throw an
60 // exception, which needs to be handled by a calling sequence-operation. The
61 // returned object is a sender that can complete with `set_value_t()` or
62 // `set_stopped_t()`.
63 struct set_next_t
64 {
65 template <receiver _Receiver, sender _Item>
66 requires tag_invocable<set_next_t, _Receiver&, _Item>
operator ()exec::__sequence_sndr::set_next_t67 auto operator()(_Receiver& __rcvr, _Item&& __item) const
68 noexcept(nothrow_tag_invocable<set_next_t, _Receiver&, _Item>)
69 -> tag_invoke_result_t<set_next_t, _Receiver&, _Item>
70 {
71 static_assert(
72 next_sender<tag_invoke_result_t<set_next_t, _Receiver&, _Item>>,
73 "The sender returned from set_next is required to complete with set_value_t() or "
74 "set_stopped_t()");
75 return tag_invoke(*this, __rcvr, static_cast<_Item&&>(__item));
76 }
77 };
78 } // namespace __sequence_sndr
79
80 using __sequence_sndr::set_next_t;
81 inline constexpr set_next_t set_next;
82
83 template <class _Receiver, class _Sender>
84 using next_sender_of_t = decltype(exec::set_next(
85 stdexec::__declval<stdexec::__decay_t<_Receiver>&>(),
86 stdexec::__declval<_Sender>()));
87
88 namespace __sequence_sndr
89 {
90
91 template <class _ReceiverId>
92 struct __stopped_means_break
93 {
94 struct __t
95 {
96 using receiver_concept = stdexec::receiver_t;
97 using __id = __stopped_means_break;
98 using _Receiver = stdexec::__t<_ReceiverId>;
99 using _Token = stop_token_of_t<env_of_t<_Receiver>>;
100 STDEXEC_ATTRIBUTE((no_unique_address))
101 _Receiver __rcvr_;
102
103 template <same_as<get_env_t> _GetEnv, same_as<__t> _Self>
tag_invokeexec::__sequence_sndr::__stopped_means_break104 friend auto tag_invoke(_GetEnv, const _Self& __self) noexcept
105 -> env_of_t<_Receiver>
106 {
107 return stdexec::get_env(__self.__rcvr_);
108 }
109
110 template <same_as<set_value_t> _SetValue, same_as<__t> _Self>
111 requires __callable<set_value_t, _Receiver&&>
tag_invokeexec::__sequence_sndr::__stopped_means_break112 friend void tag_invoke(_SetValue, _Self&& __self) noexcept
113 {
114 return stdexec::set_value(static_cast<_Receiver&&>(__self.__rcvr_));
115 }
116
117 template <same_as<set_stopped_t> _SetStopped, same_as<__t> _Self>
118 requires __callable<set_value_t, _Receiver&&> &&
119 (unstoppable_token<_Token> ||
120 __callable<set_stopped_t, _Receiver &&>)
tag_invokeexec::__sequence_sndr::__stopped_means_break121 friend void tag_invoke(_SetStopped, _Self&& __self) noexcept
122 {
123 if constexpr (unstoppable_token<_Token>)
124 {
125 stdexec::set_value(static_cast<_Receiver&&>(__self.__rcvr_));
126 }
127 else
128 {
129 auto __token =
130 stdexec::get_stop_token(stdexec::get_env(__self.__rcvr_));
131 if (__token.stop_requested())
132 {
133 stdexec::set_stopped(
134 static_cast<_Receiver&&>(__self.__rcvr_));
135 }
136 else
137 {
138 stdexec::set_value(
139 static_cast<_Receiver&&>(__self.__rcvr_));
140 }
141 }
142 }
143 };
144 };
145
146 template <class _Rcvr>
147 using __stopped_means_break_t =
148 __t<__stopped_means_break<__id<__decay_t<_Rcvr>>>>;
149 } // namespace __sequence_sndr
150
151 template <class _Sender>
152 concept __enable_sequence_sender = //
153 requires { typename _Sender::sender_concept; } && //
154 stdexec::same_as<typename _Sender::sender_concept, sequence_sender_t>;
155
156 template <class _Sender>
157 inline constexpr bool enable_sequence_sender =
158 __enable_sequence_sender<_Sender>;
159
160 template <class... _Senders>
161 struct item_types
162 {};
163
164 template <class _Tp>
165 concept __has_item_typedef = requires { typename _Tp::item_types; };
166
167 /////////////////////////////////////////////////////////////////////////////
168 // [execution.sndtraits]
169 namespace __sequence_sndr
170 {
171 struct get_item_types_t;
172 template <class _Sender, class _Env>
173 using __tfx_sender =
174 transform_sender_result_t<__late_domain_of_t<_Sender, _Env>, _Sender, _Env>;
175
176 template <class _Sender, class _Env>
177 concept __with_tag_invoke = //
178 tag_invocable<get_item_types_t, __tfx_sender<_Sender, _Env>, _Env>;
179 template <class _Sender, class _Env>
180 using __member_alias_t = //
181 typename __decay_t<__tfx_sender<_Sender, _Env>>::item_types;
182
183 template <class _Sender, class _Env>
184 concept __with_member_alias = __mvalid<__member_alias_t, _Sender, _Env>;
185
186 struct get_item_types_t
187 {
188 template <class _Sender, class _Env>
__implexec::__sequence_sndr::get_item_types_t189 static auto __impl()
190 {
191 static_assert(sizeof(_Sender),
192 "Incomplete type used with get_item_types");
193 static_assert(sizeof(_Env), "Incomplete type used with get_item_types");
194 using _TfxSender = __tfx_sender<_Sender, _Env>;
195 if constexpr (__with_tag_invoke<_Sender, _Env>)
196 {
197 using _Result =
198 tag_invoke_result_t<get_item_types_t, _TfxSender, _Env>;
199 return static_cast<_Result (*)()>(nullptr);
200 }
201 else if constexpr (__with_member_alias<_TfxSender, _Env>)
202 {
203 using _Result = __member_alias_t<_TfxSender, _Env>;
204 return static_cast<_Result (*)()>(nullptr);
205 }
206 else if constexpr (sender_in<_TfxSender, _Env> &&
207 !enable_sequence_sender<
208 stdexec::__decay_t<_TfxSender>>)
209 {
210 using _Result = item_types<stdexec::__decay_t<_TfxSender>>;
211 return static_cast<_Result (*)()>(nullptr);
212 }
213 else if constexpr (__is_debug_env<_Env>)
214 {
215 using __tag_invoke::tag_invoke;
216 // This ought to cause a hard error that indicates where the problem
217 // is.
218 using _Completions [[maybe_unused]] =
219 tag_invoke_result_t<get_item_types_t,
220 __tfx_sender<_Sender, _Env>, _Env>;
221 return static_cast<__debug::__completion_signatures (*)()>(nullptr);
222 }
223 else
224 {
225 using _Result =
226 __mexception<_UNRECOGNIZED_SENDER_TYPE_<>,
227 _WITH_SENDER_<_Sender>, _WITH_ENVIRONMENT_<_Env>>;
228 return static_cast<_Result (*)()>(nullptr);
229 }
230 }
231
232 template <class _Sender, class _Env = empty_env>
operator ()exec::__sequence_sndr::get_item_types_t233 constexpr auto operator()(_Sender&&, const _Env&) const noexcept
234 -> decltype(__impl<_Sender, _Env>()())
235 {
236 return {};
237 }
238 };
239 } // namespace __sequence_sndr
240
241 using __sequence_sndr::get_item_types_t;
242 inline constexpr get_item_types_t get_item_types{};
243
244 template <class _Sender, class _Env>
245 using item_types_of_t = decltype(get_item_types(stdexec::__declval<_Sender>(),
246 stdexec::__declval<_Env>()));
247
248 template <class _Sender, class _Env>
249 concept sequence_sender = //
250 stdexec::sender_in<_Sender, _Env> && //
251 enable_sequence_sender<stdexec::__decay_t<_Sender>>;
252
253 template <class _Sender, class _Env>
254 concept has_sequence_item_types =
255 requires(_Sender&& __sndr, _Env&& __env) {
256 get_item_types(static_cast<_Sender&&>(__sndr),
257 static_cast<_Env&&>(__env));
258 };
259
260 template <class _Sender, class _Env>
261 concept sequence_sender_in = //
262 stdexec::sender_in<_Sender, _Env> && //
263 has_sequence_item_types<_Sender, _Env> && //
264 sequence_sender<_Sender, _Env>;
265
266 template <class _Receiver>
267 struct _WITH_RECEIVER_
268 {};
269
270 template <class _Item>
271 struct _MISSING_SET_NEXT_OVERLOAD_FOR_ITEM_
272 {};
273
274 template <class _Receiver, class _Item>
275 auto __try_item(_Item*)
276 -> stdexec::__mexception<_MISSING_SET_NEXT_OVERLOAD_FOR_ITEM_<_Item>,
277 _WITH_RECEIVER_<_Receiver>>;
278
279 template <class _Receiver, class _Item>
280 requires stdexec::__callable<set_next_t, _Receiver&, _Item>
281 auto __try_item(_Item*) -> stdexec::__msuccess;
282
283 template <class _Receiver, class... _Items>
284 auto __try_items(exec::item_types<_Items...>*)
285 -> decltype((stdexec::__msuccess(), ...,
286 exec::__try_item<_Receiver>(static_cast<_Items*>(nullptr))));
287
288 template <class _Receiver, class _Items>
289 concept __sequence_receiver_of =
290 requires(_Items* __items) {
291 {
292 exec::__try_items<stdexec::__decay_t<_Receiver>>(__items)
293 } -> stdexec::__ok;
294 };
295
296 template <class _Receiver, class _SequenceItems>
297 concept sequence_receiver_of = //
298 stdexec::receiver<_Receiver> && //
299 __sequence_receiver_of<_Receiver, _SequenceItems>;
300
301 template <class _Items, class _Env>
302 using __concat_item_signatures_t = stdexec::__mapply<
303 stdexec::__q<stdexec::__concat_completion_signatures_t>,
304 stdexec::__mapply<stdexec::__transform<stdexec::__mbind_back_q<
305 stdexec::completion_signatures_of_t, _Env>>,
306 _Items>>;
307
308 template <class _Completions>
309 using __gather_error_signals =
310 stdexec::__only_gather_signal<stdexec::set_error_t, _Completions>;
311
312 template <class _Completions>
313 using __gather_stopped_signals =
314 stdexec::__only_gather_signal<stdexec::set_stopped_t, _Completions>;
315
316 template <class _Completions>
317 using __to_sequence_completions_t = stdexec::__concat_completion_signatures_t<
318 stdexec::completion_signatures<stdexec::set_value_t()>,
319 __gather_error_signals<_Completions>,
320 __gather_stopped_signals<_Completions>>;
321
322 template <class _Sender, class _Env>
323 using __to_sequence_completion_signatures = stdexec::make_completion_signatures<
324 _Sender, _Env, stdexec::completion_signatures<stdexec::set_value_t()>,
325 stdexec::__mconst<stdexec::completion_signatures<>>::__f>;
326
327 template <class _Sequence, class _Env>
328 using __sequence_completion_signatures_of_t =
329 stdexec::__concat_completion_signatures_t<
330 stdexec::__try_make_completion_signatures<
331 _Sequence, _Env,
332 stdexec::completion_signatures<stdexec::set_value_t()>,
333 stdexec::__mconst<stdexec::completion_signatures<>>>,
334 stdexec::__mapply<
335 stdexec::__q<stdexec::__concat_completion_signatures_t>,
336 stdexec::__mapply<stdexec::__transform<stdexec::__mbind_back_q<
337 __to_sequence_completion_signatures, _Env>>,
338 item_types_of_t<_Sequence, _Env>>>>;
339
340 template <class _Receiver, class _Sender>
341 concept sequence_receiver_from = //
342 stdexec::receiver<_Receiver> && //
343 stdexec::sender_in<_Sender, stdexec::env_of_t<_Receiver>> && //
344 sequence_receiver_of<
345 _Receiver, item_types_of_t<_Sender, stdexec::env_of_t<_Receiver>>> && //
346 ((sequence_sender_in<_Sender, stdexec::env_of_t<_Receiver>> &&
347 stdexec::receiver_of<_Receiver,
348 stdexec::completion_signatures_of_t<
349 _Sender, stdexec::env_of_t<_Receiver>>>) || //
350 (!sequence_sender_in<_Sender, stdexec::env_of_t<_Receiver>> &&
351 stdexec::__receiver_from<
352 __sequence_sndr::__stopped_means_break_t<_Receiver>,
353 next_sender_of_t<_Receiver, _Sender>>));
354
355 namespace __sequence_sndr
356 {
357 struct subscribe_t;
358
359 template <class _Env>
360 using __single_sender_completion_sigs =
361 __if_c<unstoppable_token<stop_token_of_t<_Env>>,
362 completion_signatures<set_value_t()>,
363 completion_signatures<set_value_t(), set_stopped_t()>>;
364
365 template <class _Sender, class _Receiver>
366 concept __next_connectable_with_tag_invoke =
367 receiver<_Receiver> && //
368 sender_in<_Sender, env_of_t<_Receiver>> && //
369 !sequence_sender_in<_Sender, env_of_t<_Receiver>> && //
370 sequence_receiver_of<_Receiver,
371 item_types<stdexec::__decay_t<_Sender>>> && //
372 __receiver_from<__stopped_means_break_t<_Receiver>,
373 next_sender_of_t<_Receiver, _Sender>> && //
374 __connect::__connectable_with_tag_invoke<
375 next_sender_of_t<_Receiver, _Sender>&&,
376 __stopped_means_break_t<_Receiver>>;
377
378 template <class _Sender, class _Receiver>
379 concept __subscribeable_with_tag_invoke =
380 receiver<_Receiver> && //
381 sequence_sender_in<_Sender, env_of_t<_Receiver>> && //
382 sequence_receiver_from<_Receiver, _Sender> && //
383 tag_invocable<subscribe_t, _Sender, _Receiver>;
384
385 struct subscribe_t
386 {
387 template <class _Sender, class _Receiver>
388 using __tfx_sndr = __tfx_sender<_Sender, env_of_t<_Receiver>>;
389
390 template <class _Sender, class _Receiver>
__select_implexec::__sequence_sndr::subscribe_t391 static constexpr auto __select_impl() noexcept
392 {
393 using _Domain = __late_domain_of_t<_Sender, env_of_t<_Receiver&>>;
394 constexpr bool _NothrowTfxSender =
395 __nothrow_callable<get_env_t, _Receiver&> &&
396 __nothrow_callable<transform_sender_t, _Domain, _Sender,
397 env_of_t<_Receiver&>>;
398 using _TfxSender = __tfx_sndr<_Sender, _Receiver>;
399 if constexpr (__next_connectable_with_tag_invoke<_TfxSender, _Receiver>)
400 {
401 using _Result =
402 tag_invoke_result_t<connect_t,
403 next_sender_of_t<_Receiver, _TfxSender>,
404 __stopped_means_break_t<_Receiver>>;
405 constexpr bool _Nothrow =
406 nothrow_tag_invocable<connect_t,
407 next_sender_of_t<_Receiver, _TfxSender>,
408 __stopped_means_break_t<_Receiver>>;
409 return static_cast<_Result (*)() noexcept(_Nothrow)>(nullptr);
410 }
411 else if constexpr (__subscribeable_with_tag_invoke<_TfxSender,
412 _Receiver>)
413 {
414 using _Result =
415 tag_invoke_result_t<subscribe_t, _TfxSender, _Receiver>;
416 constexpr bool _Nothrow = //
417 _NothrowTfxSender &&
418 nothrow_tag_invocable<subscribe_t, _TfxSender, _Receiver>;
419 return static_cast<_Result (*)() noexcept(_Nothrow)>(nullptr);
420 }
421 else
422 {
423 return static_cast<__debug::__debug_operation (*)() noexcept>(
424 nullptr);
425 }
426 }
427
428 template <class _Sender, class _Receiver>
429 using __select_impl_t = decltype(__select_impl<_Sender, _Receiver>());
430
431 template <sender _Sender, receiver _Receiver>
432 requires __next_connectable_with_tag_invoke<
433 __tfx_sndr<_Sender, _Receiver>, _Receiver> ||
434 __subscribeable_with_tag_invoke<__tfx_sndr<_Sender, _Receiver>,
435 _Receiver> ||
436 __is_debug_env<env_of_t<_Receiver>>
437 auto operator()(_Sender&& __sndr, _Receiver&& __rcvr) const
438 noexcept(__nothrow_callable<__select_impl_t<_Sender, _Receiver>>)
439 -> __call_result_t<__select_impl_t<_Sender, _Receiver>>
440 {
441 using _TfxSender = __tfx_sndr<_Sender, _Receiver>;
442 auto&& __env = get_env(__rcvr);
443 auto __domain = __get_late_domain(__sndr, __env);
444 if constexpr (__next_connectable_with_tag_invoke<_TfxSender, _Receiver>)
445 {
446 static_assert(
447 operation_state<tag_invoke_result_t<
448 connect_t, next_sender_of_t<_Receiver, _TfxSender>,
449 __stopped_means_break_t<_Receiver>>>,
450 "stdexec::connect(sender, receiver) must return a type that "
451 "satisfies the operation_state concept");
452 next_sender_of_t<_Receiver, _TfxSender> __next = set_next(
453 __rcvr, transform_sender(
454 __domain, static_cast<_Sender&&>(__sndr), __env));
455 return tag_invoke(
456 connect_t{},
457 static_cast<next_sender_of_t<_Receiver, _TfxSender>&&>(__next),
458 __stopped_means_break_t<_Receiver>{
459 static_cast<_Receiver&&>(__rcvr)});
460 }
461 else if constexpr (__subscribeable_with_tag_invoke<_TfxSender,
462 _Receiver>)
463 {
464 static_assert(
465 operation_state<
466 tag_invoke_result_t<subscribe_t, _TfxSender, _Receiver>>,
467 "exec::subscribe(sender, receiver) must return a type that "
468 "satisfies the operation_state concept");
469 return tag_invoke(subscribe_t{},
470 transform_sender(__domain,
471 static_cast<_Sender&&>(__sndr),
472 __env),
473 static_cast<_Receiver&&>(__rcvr));
474 }
475 else if constexpr (enable_sequence_sender<
476 stdexec::__decay_t<_TfxSender>>)
477 {
478 // This should generate an instantiate backtrace that contains
479 // useful debugging information.
480 using __tag_invoke::tag_invoke;
481 tag_invoke(*this,
482 transform_sender(__domain,
483 static_cast<_Sender&&>(__sndr), __env),
484 static_cast<_Receiver&&>(__rcvr));
485 }
486 else
487 {
488 next_sender_of_t<_Receiver, _TfxSender> __next = set_next(
489 __rcvr, transform_sender(
490 __domain, static_cast<_Sender&&>(__sndr), __env));
491 return tag_invoke(
492 connect_t{},
493 static_cast<next_sender_of_t<_Receiver, _TfxSender>&&>(__next),
494 __stopped_means_break_t<_Receiver>{
495 static_cast<_Receiver&&>(__rcvr)});
496 }
497 }
498
tag_invoke(forwarding_query_t,subscribe_t)499 friend constexpr auto tag_invoke(forwarding_query_t, subscribe_t) noexcept
500 -> bool
501 {
502 return false;
503 }
504 };
505
506 template <class _Sender, class _Receiver>
507 using subscribe_result_t = __call_result_t<subscribe_t, _Sender, _Receiver>;
508 } // namespace __sequence_sndr
509
510 using __sequence_sndr::__single_sender_completion_sigs;
511
512 using __sequence_sndr::subscribe_t;
513 inline constexpr subscribe_t subscribe;
514
515 using __sequence_sndr::subscribe_result_t;
516
517 template <class _Sender, class _Receiver>
518 concept sequence_sender_to = sequence_receiver_from<_Receiver, _Sender> && //
519 requires(_Sender&& __sndr, _Receiver&& __rcvr) {
520 {
521 subscribe(static_cast<_Sender&&>(__sndr),
522 static_cast<_Receiver&&>(__rcvr))
523 };
524 };
525
526 template <class _Receiver>
527 concept __stoppable_receiver = //
528 stdexec::__callable<stdexec::set_value_t, _Receiver> && //
529 (stdexec::unstoppable_token<
530 stdexec::stop_token_of_t<stdexec::env_of_t<_Receiver>>> ||
531 stdexec::__callable<stdexec::set_stopped_t, _Receiver>);
532
533 template <class _Receiver>
534 requires __stoppable_receiver<_Receiver>
__set_value_unless_stopped(_Receiver && __rcvr)535 void __set_value_unless_stopped(_Receiver&& __rcvr)
536 {
537 using token_type = stdexec::stop_token_of_t<stdexec::env_of_t<_Receiver>>;
538 if constexpr (stdexec::unstoppable_token<token_type>)
539 {
540 stdexec::set_value(static_cast<_Receiver&&>(__rcvr));
541 }
542 else
543 {
544 auto token = stdexec::get_stop_token(stdexec::get_env(__rcvr));
545 if (!token.stop_requested())
546 {
547 stdexec::set_value(static_cast<_Receiver&&>(__rcvr));
548 }
549 else
550 {
551 stdexec::set_stopped(static_cast<_Receiver&&>(__rcvr));
552 }
553 }
554 }
555 } // namespace exec
556