1 /*
2 * Copyright (c) 2023 Maikel Nadolski
3 * Copyright (c) 2023 NVIDIA Corporation
4 *
5 * Licensed under the Apache License Version 2.0 with LLVM Exceptions
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * https://llvm.org/LICENSE.txt
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17 #pragma once
18
19 #include "../stdexec/execution.hpp"
20
21 namespace exec
22 {
23 struct sequence_sender_t : stdexec::sender_t
24 {};
25
26 using sequence_tag [[deprecated("Renamed to exec::sequence_sender_t")]] =
27 exec::sequence_sender_t;
28
29 namespace __sequence_sndr
30 {
31 using namespace stdexec;
32
33 template <class _Haystack>
34 struct __mall_contained_in_impl
35 {
36 template <class... _Needles>
37 using __f = __mand<__mapply<__mcontains<_Needles>, _Haystack>...>;
38 };
39
40 template <class _Needles, class _Haystack>
41 using __mall_contained_in =
42 __mapply<__mall_contained_in_impl<_Haystack>, _Needles>;
43
44 template <class _Needles, class _Haystack>
45 concept __all_contained_in = __v<__mall_contained_in<_Needles, _Haystack>>;
46
47 // This concept checks if a given sender satisfies the requirements to be
48 // returned from `set_next`.
49 template <class _Sender, class _Env = empty_env>
50 concept next_sender = //
51 sender_in<_Sender, _Env> //
52 &&
53 __all_contained_in<completion_signatures_of_t<_Sender, _Env>,
54 completion_signatures<set_value_t(), set_stopped_t()>>;
55
56 // This is a sequence-receiver CPO that is used to apply algorithms on an input
57 // sender and it returns a next-sender. `set_next` is usually called in a
58 // context where a sender will be connected to a receiver. Since calling
59 // `set_next` usually involves constructing senders it is allowed to throw an
60 // excpetion, which needs to be handled by a calling sequence-operation. The
61 // returned object is a sender that can complete with `set_value_t()` or
62 // `set_stopped_t()`.
63 struct set_next_t
64 {
65 template <receiver _Receiver, sender _Item>
66 requires tag_invocable<set_next_t, _Receiver&, _Item>
operator ()exec::__sequence_sndr::set_next_t67 auto operator()(_Receiver& __rcvr, _Item&& __item) const
68 noexcept(nothrow_tag_invocable<set_next_t, _Receiver&, _Item>)
69 -> tag_invoke_result_t<set_next_t, _Receiver&, _Item>
70 {
71 static_assert(
72 next_sender<tag_invoke_result_t<set_next_t, _Receiver&, _Item>>,
73 "The sender returned from set_next is required to complete with set_value_t() or "
74 "set_stopped_t()");
75 return tag_invoke(*this, __rcvr, static_cast<_Item&&>(__item));
76 }
77 };
78 } // namespace __sequence_sndr
79
80 using __sequence_sndr::set_next_t;
81 inline constexpr set_next_t set_next;
82
83 template <class _Receiver, class _Sender>
84 using next_sender_of_t = decltype(exec::set_next(
85 stdexec::__declval<stdexec::__decay_t<_Receiver>&>(),
86 stdexec::__declval<_Sender>()));
87
88 namespace __sequence_sndr
89 {
90
91 template <class _ReceiverId>
92 struct __stopped_means_break
93 {
94 struct __t
95 {
96 using receiver_concept = stdexec::receiver_t;
97 using __id = __stopped_means_break;
98 using _Receiver = stdexec::__t<_ReceiverId>;
99 using _Token = stop_token_of_t<env_of_t<_Receiver>>;
100 STDEXEC_ATTRIBUTE((no_unique_address))
101 _Receiver __rcvr_;
102
get_envexec::__sequence_sndr::__stopped_means_break::__t103 auto get_env() const noexcept -> env_of_t<_Receiver>
104 {
105 return stdexec::get_env(__rcvr_);
106 }
107
set_valueexec::__sequence_sndr::__stopped_means_break::__t108 void set_value() noexcept
109 requires __callable<set_value_t, _Receiver>
110 {
111 return stdexec::set_value(static_cast<_Receiver&&>(__rcvr_));
112 }
113
set_stoppedexec::__sequence_sndr::__stopped_means_break::__t114 void set_stopped() noexcept
115 requires __callable<set_value_t, _Receiver> &&
116 (unstoppable_token<_Token> ||
117 __callable<set_stopped_t, _Receiver>)
118 {
119 if constexpr (unstoppable_token<_Token>)
120 {
121 stdexec::set_value(static_cast<_Receiver&&>(__rcvr_));
122 }
123 else
124 {
125 auto __token =
126 stdexec::get_stop_token(stdexec::get_env(__rcvr_));
127 if (__token.stop_requested())
128 {
129 stdexec::set_stopped(static_cast<_Receiver&&>(__rcvr_));
130 }
131 else
132 {
133 stdexec::set_value(static_cast<_Receiver&&>(__rcvr_));
134 }
135 }
136 }
137 };
138 };
139
140 template <class _Rcvr>
141 using __stopped_means_break_t =
142 __t<__stopped_means_break<__id<__decay_t<_Rcvr>>>>;
143 } // namespace __sequence_sndr
144
145 template <class _Sender>
146 concept __enable_sequence_sender = //
147 requires { typename _Sender::sender_concept; } && //
148 stdexec::same_as<typename _Sender::sender_concept, sequence_sender_t>;
149
150 template <class _Sender>
151 inline constexpr bool enable_sequence_sender =
152 __enable_sequence_sender<_Sender>;
153
154 template <class... _Senders>
155 struct item_types
156 {};
157
158 template <class _Tp>
159 concept __has_item_typedef = requires { typename _Tp::item_types; };
160
161 /////////////////////////////////////////////////////////////////////////////
162 // [execution.sndtraits]
163 namespace __sequence_sndr
164 {
165 struct get_item_types_t;
166 template <class _Sender, class _Env>
167 using __tfx_sender =
168 transform_sender_result_t<__late_domain_of_t<_Sender, _Env>, _Sender, _Env>;
169
170 template <class _Sender, class _Env>
171 concept __with_tag_invoke = //
172 tag_invocable<get_item_types_t, __tfx_sender<_Sender, _Env>, _Env>;
173 template <class _Sender, class _Env>
174 using __member_alias_t = //
175 typename __decay_t<__tfx_sender<_Sender, _Env>>::item_types;
176
177 template <class _Sender, class _Env>
178 concept __with_member_alias = __mvalid<__member_alias_t, _Sender, _Env>;
179
180 struct get_item_types_t
181 {
182 template <class _Sender, class _Env>
__implexec::__sequence_sndr::get_item_types_t183 static auto __impl()
184 {
185 static_assert(sizeof(_Sender),
186 "Incomplete type used with get_item_types");
187 static_assert(sizeof(_Env), "Incomplete type used with get_item_types");
188 using _TfxSender = __tfx_sender<_Sender, _Env>;
189 if constexpr (__with_tag_invoke<_Sender, _Env>)
190 {
191 using _Result =
192 tag_invoke_result_t<get_item_types_t, _TfxSender, _Env>;
193 return static_cast<_Result (*)()>(nullptr);
194 }
195 else if constexpr (__with_member_alias<_TfxSender, _Env>)
196 {
197 using _Result = __member_alias_t<_TfxSender, _Env>;
198 return static_cast<_Result (*)()>(nullptr);
199 }
200 else if constexpr (sender_in<_TfxSender, _Env> &&
201 !enable_sequence_sender<
202 stdexec::__decay_t<_TfxSender>>)
203 {
204 using _Result = item_types<stdexec::__decay_t<_TfxSender>>;
205 return static_cast<_Result (*)()>(nullptr);
206 }
207 else if constexpr (__is_debug_env<_Env>)
208 {
209 using __tag_invoke::tag_invoke;
210 // This ought to cause a hard error that indicates where the problem
211 // is.
212 using _Completions [[maybe_unused]] =
213 tag_invoke_result_t<get_item_types_t,
214 __tfx_sender<_Sender, _Env>, _Env>;
215 return static_cast<__debug::__completion_signatures (*)()>(nullptr);
216 }
217 else
218 {
219 using _Result =
220 __mexception<_UNRECOGNIZED_SENDER_TYPE_<>,
221 _WITH_SENDER_<_Sender>, _WITH_ENVIRONMENT_<_Env>>;
222 return static_cast<_Result (*)()>(nullptr);
223 }
224 }
225
226 template <class _Sender, class _Env = empty_env>
operator ()exec::__sequence_sndr::get_item_types_t227 constexpr auto operator()(_Sender&&, _Env&& = {}) const noexcept
228 -> decltype(__impl<_Sender, _Env>()())
229 {
230 return {};
231 }
232 };
233 } // namespace __sequence_sndr
234
235 using __sequence_sndr::get_item_types_t;
236 inline constexpr get_item_types_t get_item_types{};
237
238 template <class _Sender, class... _Env>
239 using item_types_of_t = decltype(get_item_types(stdexec::__declval<_Sender>(),
240 stdexec::__declval<_Env>()...));
241
242 template <class _Sender, class... _Env>
243 concept sequence_sender = //
244 stdexec::sender_in<_Sender, _Env...> && //
245 enable_sequence_sender<stdexec::__decay_t<_Sender>>;
246
247 template <class _Sender, class... _Env>
248 concept has_sequence_item_types =
249 requires(_Sender&& __sndr, _Env&&... __env) {
250 get_item_types(static_cast<_Sender&&>(__sndr),
251 static_cast<_Env&&>(__env)...);
252 };
253
254 template <class _Sender, class... _Env>
255 concept sequence_sender_in = //
256 stdexec::sender_in<_Sender, _Env...> && //
257 has_sequence_item_types<_Sender, _Env...> && //
258 sequence_sender<_Sender, _Env...>;
259
260 template <class _Receiver>
261 struct _WITH_RECEIVER_
262 {};
263
264 template <class _Item>
265 struct _MISSING_SET_NEXT_OVERLOAD_FOR_ITEM_
266 {};
267
268 template <class _Receiver, class _Item>
269 auto __try_item(_Item*) //
270 -> stdexec::__mexception<_MISSING_SET_NEXT_OVERLOAD_FOR_ITEM_<_Item>,
271 _WITH_RECEIVER_<_Receiver>>;
272
273 template <class _Receiver, class _Item>
274 requires stdexec::__callable<set_next_t, _Receiver&, _Item>
275 auto __try_item(_Item*) -> stdexec::__msuccess;
276
277 template <class _Receiver, class... _Items>
278 auto __try_items(exec::item_types<_Items...>*) //
279 -> decltype((stdexec::__msuccess(), ...,
280 exec::__try_item<_Receiver>(static_cast<_Items*>(nullptr))));
281
282 template <class _Receiver, class _Items>
283 concept __sequence_receiver_of =
284 requires(_Items* __items) {
285 {
286 exec::__try_items<stdexec::__decay_t<_Receiver>>(__items)
287 } -> stdexec::__ok;
288 };
289
290 template <class _Receiver, class _SequenceItems>
291 concept sequence_receiver_of = //
292 stdexec::receiver<_Receiver> && //
293 __sequence_receiver_of<_Receiver, _SequenceItems>;
294
295 template <class _Completions>
296 using __to_sequence_completions_t = //
297 stdexec::__transform_completion_signatures<
298 _Completions,
299 stdexec::__mconst<
300 stdexec::completion_signatures<stdexec::set_value_t()>>::__f,
301 stdexec::__sigs::__default_set_error,
302 stdexec::completion_signatures<stdexec::set_stopped_t()>,
303 stdexec::__concat_completion_signatures>;
304
305 template <class _Sender, class... _Env>
306 using __item_completion_signatures = //
307 stdexec::transform_completion_signatures<
308 stdexec::__completion_signatures_of_t<_Sender, _Env...>,
309 stdexec::completion_signatures<stdexec::set_value_t()>,
310 stdexec::__mconst<stdexec::completion_signatures<>>::__f>;
311
312 template <class _Sequence, class... _Env>
313 using __sequence_completion_signatures = //
314 stdexec::transform_completion_signatures<
315 stdexec::__completion_signatures_of_t<_Sequence, _Env...>,
316 stdexec::completion_signatures<stdexec::set_value_t()>,
317 stdexec::__mconst<stdexec::completion_signatures<>>::__f>;
318
319 template <class _Sequence, class... _Env>
320 using __sequence_completion_signatures_of_t = //
321 stdexec::__mapply<
322 stdexec::__mtransform<
323 stdexec::__mbind_back_q<__item_completion_signatures, _Env...>,
324 stdexec::__mbind_back<
325 stdexec::__mtry_q<stdexec::__concat_completion_signatures>,
326 __sequence_completion_signatures<_Sequence, _Env...>>>,
327 item_types_of_t<_Sequence, _Env...>>;
328
329 template <class _Receiver, class _Sender>
330 concept sequence_receiver_from = //
331 stdexec::receiver<_Receiver> && //
332 stdexec::sender_in<_Sender, stdexec::env_of_t<_Receiver>> && //
333 sequence_receiver_of<
334 _Receiver, item_types_of_t<_Sender, stdexec::env_of_t<_Receiver>>> && //
335 ((sequence_sender_in<_Sender, stdexec::env_of_t<_Receiver>> &&
336 stdexec::receiver_of<_Receiver,
337 stdexec::completion_signatures_of_t<
338 _Sender, stdexec::env_of_t<_Receiver>>>) || //
339 (!sequence_sender_in<_Sender, stdexec::env_of_t<_Receiver>> &&
340 stdexec::__receiver_from<
341 __sequence_sndr::__stopped_means_break_t<_Receiver>,
342 next_sender_of_t<_Receiver, _Sender>>));
343
344 namespace __sequence_sndr
345 {
346 struct subscribe_t;
347
348 template <class _Env>
349 using __single_sender_completion_sigs =
350 __if_c<unstoppable_token<stop_token_of_t<_Env>>,
351 completion_signatures<set_value_t()>,
352 completion_signatures<set_value_t(), set_stopped_t()>>;
353
354 template <class _Sender, class _Receiver>
355 concept __next_connectable =
356 receiver<_Receiver> && //
357 sender_in<_Sender, env_of_t<_Receiver>> && //
358 !sequence_sender_in<_Sender, env_of_t<_Receiver>> && //
359 sequence_receiver_of<_Receiver,
360 item_types<stdexec::__decay_t<_Sender>>> && //
361 sender_to<next_sender_of_t<_Receiver, _Sender>,
362 __stopped_means_break_t<_Receiver>>;
363
364 template <class _Sender, class _Receiver>
365 concept __subscribeable_with_tag_invoke =
366 receiver<_Receiver> && //
367 sequence_sender_in<_Sender, env_of_t<_Receiver>> && //
368 sequence_receiver_from<_Receiver, _Sender> && //
369 tag_invocable<subscribe_t, _Sender, _Receiver>;
370
371 struct subscribe_t
372 {
373 template <class _Sender, class _Receiver>
374 using __tfx_sndr = __tfx_sender<_Sender, env_of_t<_Receiver>>;
375
376 template <class _Sender, class _Receiver>
__select_implexec::__sequence_sndr::subscribe_t377 static constexpr auto __select_impl() noexcept
378 {
379 using _Domain = __late_domain_of_t<_Sender, env_of_t<_Receiver&>>;
380 constexpr bool _NothrowTfxSender =
381 __nothrow_callable<transform_sender_t, _Domain, _Sender,
382 env_of_t<_Receiver&>>;
383 using _TfxSender = __tfx_sndr<_Sender, _Receiver>;
384 if constexpr (__next_connectable<_TfxSender, _Receiver>)
385 {
386 using _Result =
387 connect_result_t<next_sender_of_t<_Receiver, _TfxSender>,
388 __stopped_means_break_t<_Receiver>>;
389 constexpr bool _Nothrow =
390 __nothrow_connectable<next_sender_of_t<_Receiver, _TfxSender>,
391 __stopped_means_break_t<_Receiver>>;
392 return static_cast<_Result (*)() noexcept(_Nothrow)>(nullptr);
393 }
394 else if constexpr (__subscribeable_with_tag_invoke<_TfxSender,
395 _Receiver>)
396 {
397 using _Result =
398 tag_invoke_result_t<subscribe_t, _TfxSender, _Receiver>;
399 constexpr bool _Nothrow = //
400 _NothrowTfxSender &&
401 nothrow_tag_invocable<subscribe_t, _TfxSender, _Receiver>;
402 return static_cast<_Result (*)() noexcept(_Nothrow)>(nullptr);
403 }
404 else
405 {
406 return static_cast<__debug::__debug_operation (*)() noexcept>(
407 nullptr);
408 }
409 }
410
411 template <class _Sender, class _Receiver>
412 using __select_impl_t = decltype(__select_impl<_Sender, _Receiver>());
413
414 template <sender _Sender, receiver _Receiver>
415 requires __next_connectable<__tfx_sndr<_Sender, _Receiver>,
416 _Receiver> ||
417 __subscribeable_with_tag_invoke<
418 __tfx_sndr<_Sender, _Receiver>, _Receiver> ||
419 __is_debug_env<env_of_t<_Receiver>>
420 auto operator()(_Sender&& __sndr, _Receiver&& __rcvr) const
421 noexcept(__nothrow_callable<__select_impl_t<_Sender, _Receiver>>)
422 -> __call_result_t<__select_impl_t<_Sender, _Receiver>>
423 {
424 using _TfxSender = __tfx_sndr<_Sender, _Receiver>;
425 auto&& __env = get_env(__rcvr);
426 auto __domain = __get_late_domain(__sndr, __env);
427 if constexpr (__next_connectable<_TfxSender, _Receiver>)
428 {
429 static_assert(
430 operation_state<
431 connect_result_t<next_sender_of_t<_Receiver, _TfxSender>,
432 __stopped_means_break_t<_Receiver>>>,
433 "stdexec::connect(sender, receiver) must return a type that "
434 "satisfies the operation_state concept");
435 next_sender_of_t<_Receiver, _TfxSender> __next = set_next(
436 __rcvr, transform_sender(
437 __domain, static_cast<_Sender&&>(__sndr), __env));
438 return stdexec::connect(
439 static_cast<next_sender_of_t<_Receiver, _TfxSender>&&>(__next),
440 __stopped_means_break_t<_Receiver>{
441 static_cast<_Receiver&&>(__rcvr)});
442 }
443 else if constexpr (__subscribeable_with_tag_invoke<_TfxSender,
444 _Receiver>)
445 {
446 static_assert(
447 operation_state<
448 tag_invoke_result_t<subscribe_t, _TfxSender, _Receiver>>,
449 "exec::subscribe(sender, receiver) must return a type that "
450 "satisfies the operation_state concept");
451 return tag_invoke(
452 subscribe_t{},
453 transform_sender(__domain, static_cast<_Sender&&>(__sndr),
454 __env),
455 static_cast<_Receiver&&>(__rcvr));
456 }
457 else if constexpr (enable_sequence_sender<
458 stdexec::__decay_t<_TfxSender>>)
459 {
460 // This should generate an instantiate backtrace that contains
461 // useful debugging information.
462 using __tag_invoke::tag_invoke;
463 tag_invoke(*this,
464 transform_sender(__domain,
465 static_cast<_Sender&&>(__sndr), __env),
466 static_cast<_Receiver&&>(__rcvr));
467 }
468 else
469 {
470 next_sender_of_t<_Receiver, _TfxSender> __next = set_next(
471 __rcvr, transform_sender(
472 __domain, static_cast<_Sender&&>(__sndr), __env));
473 return tag_invoke(
474 connect_t{},
475 static_cast<next_sender_of_t<_Receiver, _TfxSender>&&>(__next),
476 __stopped_means_break_t<_Receiver>{
477 static_cast<_Receiver&&>(__rcvr)});
478 }
479 }
480
queryexec::__sequence_sndr::subscribe_t481 static constexpr auto query(stdexec::forwarding_query_t) noexcept -> bool
482 {
483 return false;
484 }
485 };
486
487 template <class _Sender, class _Receiver>
488 using subscribe_result_t = __call_result_t<subscribe_t, _Sender, _Receiver>;
489 } // namespace __sequence_sndr
490
491 using __sequence_sndr::__single_sender_completion_sigs;
492
493 using __sequence_sndr::subscribe_t;
494 inline constexpr subscribe_t subscribe;
495
496 using __sequence_sndr::subscribe_result_t;
497
498 template <class _Sender, class _Receiver>
499 concept sequence_sender_to =
500 sequence_receiver_from<_Receiver, _Sender> && //
501 requires(_Sender&& __sndr, _Receiver&& __rcvr) {
502 subscribe(static_cast<_Sender&&>(__sndr),
503 static_cast<_Receiver&&>(__rcvr));
504 };
505
506 template <class _Receiver>
507 concept __stoppable_receiver = //
508 stdexec::__callable<stdexec::set_value_t, _Receiver> && //
509 (stdexec::unstoppable_token<
510 stdexec::stop_token_of_t<stdexec::env_of_t<_Receiver>>> ||
511 stdexec::__callable<stdexec::set_stopped_t, _Receiver>);
512
513 template <class _Receiver>
514 requires __stoppable_receiver<_Receiver>
__set_value_unless_stopped(_Receiver && __rcvr)515 void __set_value_unless_stopped(_Receiver&& __rcvr)
516 {
517 using token_type = stdexec::stop_token_of_t<stdexec::env_of_t<_Receiver>>;
518 if constexpr (stdexec::unstoppable_token<token_type>)
519 {
520 stdexec::set_value(static_cast<_Receiver&&>(__rcvr));
521 }
522 else
523 {
524 auto token = stdexec::get_stop_token(stdexec::get_env(__rcvr));
525 if (!token.stop_requested())
526 {
527 stdexec::set_value(static_cast<_Receiver&&>(__rcvr));
528 }
529 else
530 {
531 stdexec::set_stopped(static_cast<_Receiver&&>(__rcvr));
532 }
533 }
534 }
535 } // namespace exec
536