1 /*
2  * Copyright (c) 2023 Maikel Nadolski
3  * Copyright (c) 2023 NVIDIA Corporation
4  *
5  * Licensed under the Apache License Version 2.0 with LLVM Exceptions
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  *   https://llvm.org/LICENSE.txt
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 #pragma once
18 
19 #include "../stdexec/execution.hpp"
20 
21 namespace exec
22 {
23 struct sequence_sender_t : stdexec::sender_t
24 {};
25 
26 using sequence_tag [[deprecated("Renamed to exec::sequence_sender_t")]] =
27     exec::sequence_sender_t;
28 
29 namespace __sequence_sndr
30 {
31 using namespace stdexec;
32 
33 template <class _Haystack>
34 struct __mall_contained_in_impl
35 {
36     template <class... _Needles>
37     using __f = __mand<__mapply<__contains<_Needles>, _Haystack>...>;
38 };
39 
40 template <class _Needles, class _Haystack>
41 using __mall_contained_in =
42     __mapply<__mall_contained_in_impl<_Haystack>, _Needles>;
43 
44 template <class _Needles, class _Haystack>
45 concept __all_contained_in = __mall_contained_in<_Needles, _Haystack>::value;
46 
47 // This concept checks if a given sender satisfies the requirements to be
48 // returned from `set_next`.
49 template <class _Sender, class _Env = empty_env>
50 concept next_sender =        //
51     sender_in<_Sender, _Env> //
52     &&
53     __all_contained_in<completion_signatures_of_t<_Sender, _Env>,
54                        completion_signatures<set_value_t(), set_stopped_t()>>;
55 
56 // This is a sequence-receiver CPO that is used to apply algorithms on an input
57 // sender and it returns a next-sender. `set_next` is usually called in a
58 // context where a sender will be connected to a receiver. Since calling
59 // `set_next` usually involves constructing senders it is allowed to throw an
60 // exception, which needs to be handled by a calling sequence-operation. The
61 // returned object is a sender that can complete with `set_value_t()` or
62 // `set_stopped_t()`.
63 struct set_next_t
64 {
65     template <receiver _Receiver, sender _Item>
66         requires tag_invocable<set_next_t, _Receiver&, _Item>
operator ()exec::__sequence_sndr::set_next_t67     auto operator()(_Receiver& __rcvr, _Item&& __item) const
68         noexcept(nothrow_tag_invocable<set_next_t, _Receiver&, _Item>)
69             -> tag_invoke_result_t<set_next_t, _Receiver&, _Item>
70     {
71         static_assert(
72             next_sender<tag_invoke_result_t<set_next_t, _Receiver&, _Item>>,
73             "The sender returned from set_next is required to complete with set_value_t() or "
74             "set_stopped_t()");
75         return tag_invoke(*this, __rcvr, static_cast<_Item&&>(__item));
76     }
77 };
78 } // namespace __sequence_sndr
79 
80 using __sequence_sndr::set_next_t;
81 inline constexpr set_next_t set_next;
82 
83 template <class _Receiver, class _Sender>
84 using next_sender_of_t = decltype(exec::set_next(
85     stdexec::__declval<stdexec::__decay_t<_Receiver>&>(),
86     stdexec::__declval<_Sender>()));
87 
88 namespace __sequence_sndr
89 {
90 
91 template <class _ReceiverId>
92 struct __stopped_means_break
93 {
94     struct __t
95     {
96         using receiver_concept = stdexec::receiver_t;
97         using __id = __stopped_means_break;
98         using _Receiver = stdexec::__t<_ReceiverId>;
99         using _Token = stop_token_of_t<env_of_t<_Receiver>>;
100         STDEXEC_ATTRIBUTE((no_unique_address))
101         _Receiver __rcvr_;
102 
103         template <same_as<get_env_t> _GetEnv, same_as<__t> _Self>
tag_invokeexec::__sequence_sndr::__stopped_means_break104         friend auto tag_invoke(_GetEnv, const _Self& __self) noexcept
105             -> env_of_t<_Receiver>
106         {
107             return stdexec::get_env(__self.__rcvr_);
108         }
109 
110         template <same_as<set_value_t> _SetValue, same_as<__t> _Self>
111             requires __callable<set_value_t, _Receiver&&>
tag_invokeexec::__sequence_sndr::__stopped_means_break112         friend void tag_invoke(_SetValue, _Self&& __self) noexcept
113         {
114             return stdexec::set_value(static_cast<_Receiver&&>(__self.__rcvr_));
115         }
116 
117         template <same_as<set_stopped_t> _SetStopped, same_as<__t> _Self>
118             requires __callable<set_value_t, _Receiver&&> &&
119                      (unstoppable_token<_Token> ||
120                       __callable<set_stopped_t, _Receiver &&>)
tag_invokeexec::__sequence_sndr::__stopped_means_break121         friend void tag_invoke(_SetStopped, _Self&& __self) noexcept
122         {
123             if constexpr (unstoppable_token<_Token>)
124             {
125                 stdexec::set_value(static_cast<_Receiver&&>(__self.__rcvr_));
126             }
127             else
128             {
129                 auto __token =
130                     stdexec::get_stop_token(stdexec::get_env(__self.__rcvr_));
131                 if (__token.stop_requested())
132                 {
133                     stdexec::set_stopped(
134                         static_cast<_Receiver&&>(__self.__rcvr_));
135                 }
136                 else
137                 {
138                     stdexec::set_value(
139                         static_cast<_Receiver&&>(__self.__rcvr_));
140                 }
141             }
142         }
143     };
144 };
145 
146 template <class _Rcvr>
147 using __stopped_means_break_t =
148     __t<__stopped_means_break<__id<__decay_t<_Rcvr>>>>;
149 } // namespace __sequence_sndr
150 
151 template <class _Sender>
152 concept __enable_sequence_sender =                    //
153     requires { typename _Sender::sender_concept; } && //
154     stdexec::same_as<typename _Sender::sender_concept, sequence_sender_t>;
155 
156 template <class _Sender>
157 inline constexpr bool enable_sequence_sender =
158     __enable_sequence_sender<_Sender>;
159 
160 template <class... _Senders>
161 struct item_types
162 {};
163 
164 template <class _Tp>
165 concept __has_item_typedef = requires { typename _Tp::item_types; };
166 
167 /////////////////////////////////////////////////////////////////////////////
168 // [execution.sndtraits]
169 namespace __sequence_sndr
170 {
171 struct get_item_types_t;
172 template <class _Sender, class _Env>
173 using __tfx_sender =
174     transform_sender_result_t<__late_domain_of_t<_Sender, _Env>, _Sender, _Env>;
175 
176 template <class _Sender, class _Env>
177 concept __with_tag_invoke = //
178     tag_invocable<get_item_types_t, __tfx_sender<_Sender, _Env>, _Env>;
179 template <class _Sender, class _Env>
180 using __member_alias_t = //
181     typename __decay_t<__tfx_sender<_Sender, _Env>>::item_types;
182 
183 template <class _Sender, class _Env>
184 concept __with_member_alias = __mvalid<__member_alias_t, _Sender, _Env>;
185 
186 struct get_item_types_t
187 {
188     template <class _Sender, class _Env>
__implexec::__sequence_sndr::get_item_types_t189     static auto __impl()
190     {
191         static_assert(sizeof(_Sender),
192                       "Incomplete type used with get_item_types");
193         static_assert(sizeof(_Env), "Incomplete type used with get_item_types");
194         using _TfxSender = __tfx_sender<_Sender, _Env>;
195         if constexpr (__with_tag_invoke<_Sender, _Env>)
196         {
197             using _Result =
198                 tag_invoke_result_t<get_item_types_t, _TfxSender, _Env>;
199             return static_cast<_Result (*)()>(nullptr);
200         }
201         else if constexpr (__with_member_alias<_TfxSender, _Env>)
202         {
203             using _Result = __member_alias_t<_TfxSender, _Env>;
204             return static_cast<_Result (*)()>(nullptr);
205         }
206         else if constexpr (sender_in<_TfxSender, _Env> &&
207                            !enable_sequence_sender<
208                                stdexec::__decay_t<_TfxSender>>)
209         {
210             using _Result = item_types<stdexec::__decay_t<_TfxSender>>;
211             return static_cast<_Result (*)()>(nullptr);
212         }
213         else if constexpr (__is_debug_env<_Env>)
214         {
215             using __tag_invoke::tag_invoke;
216             // This ought to cause a hard error that indicates where the problem
217             // is.
218             using _Completions [[maybe_unused]] =
219                 tag_invoke_result_t<get_item_types_t,
220                                     __tfx_sender<_Sender, _Env>, _Env>;
221             return static_cast<__debug::__completion_signatures (*)()>(nullptr);
222         }
223         else
224         {
225             using _Result =
226                 __mexception<_UNRECOGNIZED_SENDER_TYPE_<>,
227                              _WITH_SENDER_<_Sender>, _WITH_ENVIRONMENT_<_Env>>;
228             return static_cast<_Result (*)()>(nullptr);
229         }
230     }
231 
232     template <class _Sender, class _Env = empty_env>
operator ()exec::__sequence_sndr::get_item_types_t233     constexpr auto operator()(_Sender&&, const _Env&) const noexcept
234         -> decltype(__impl<_Sender, _Env>()())
235     {
236         return {};
237     }
238 };
239 } // namespace __sequence_sndr
240 
241 using __sequence_sndr::get_item_types_t;
242 inline constexpr get_item_types_t get_item_types{};
243 
244 template <class _Sender, class _Env>
245 using item_types_of_t = decltype(get_item_types(stdexec::__declval<_Sender>(),
246                                                 stdexec::__declval<_Env>()));
247 
248 template <class _Sender, class _Env>
249 concept sequence_sender =                //
250     stdexec::sender_in<_Sender, _Env> && //
251     enable_sequence_sender<stdexec::__decay_t<_Sender>>;
252 
253 template <class _Sender, class _Env>
254 concept has_sequence_item_types =
255     requires(_Sender&& __sndr, _Env&& __env) {
256         get_item_types(static_cast<_Sender&&>(__sndr),
257                        static_cast<_Env&&>(__env));
258     };
259 
260 template <class _Sender, class _Env>
261 concept sequence_sender_in =                  //
262     stdexec::sender_in<_Sender, _Env> &&      //
263     has_sequence_item_types<_Sender, _Env> && //
264     sequence_sender<_Sender, _Env>;
265 
266 template <class _Receiver>
267 struct _WITH_RECEIVER_
268 {};
269 
270 template <class _Item>
271 struct _MISSING_SET_NEXT_OVERLOAD_FOR_ITEM_
272 {};
273 
274 template <class _Receiver, class _Item>
275 auto __try_item(_Item*)
276     -> stdexec::__mexception<_MISSING_SET_NEXT_OVERLOAD_FOR_ITEM_<_Item>,
277                              _WITH_RECEIVER_<_Receiver>>;
278 
279 template <class _Receiver, class _Item>
280     requires stdexec::__callable<set_next_t, _Receiver&, _Item>
281 auto __try_item(_Item*) -> stdexec::__msuccess;
282 
283 template <class _Receiver, class... _Items>
284 auto __try_items(exec::item_types<_Items...>*)
285     -> decltype((stdexec::__msuccess(), ...,
286                  exec::__try_item<_Receiver>(static_cast<_Items*>(nullptr))));
287 
288 template <class _Receiver, class _Items>
289 concept __sequence_receiver_of =
290     requires(_Items* __items) {
291         {
292             exec::__try_items<stdexec::__decay_t<_Receiver>>(__items)
293         } -> stdexec::__ok;
294     };
295 
296 template <class _Receiver, class _SequenceItems>
297 concept sequence_receiver_of =      //
298     stdexec::receiver<_Receiver> && //
299     __sequence_receiver_of<_Receiver, _SequenceItems>;
300 
301 template <class _Items, class _Env>
302 using __concat_item_signatures_t = stdexec::__mapply<
303     stdexec::__q<stdexec::__concat_completion_signatures_t>,
304     stdexec::__mapply<stdexec::__transform<stdexec::__mbind_back_q<
305                           stdexec::completion_signatures_of_t, _Env>>,
306                       _Items>>;
307 
308 template <class _Completions>
309 using __gather_error_signals =
310     stdexec::__only_gather_signal<stdexec::set_error_t, _Completions>;
311 
312 template <class _Completions>
313 using __gather_stopped_signals =
314     stdexec::__only_gather_signal<stdexec::set_stopped_t, _Completions>;
315 
316 template <class _Completions>
317 using __to_sequence_completions_t = stdexec::__concat_completion_signatures_t<
318     stdexec::completion_signatures<stdexec::set_value_t()>,
319     __gather_error_signals<_Completions>,
320     __gather_stopped_signals<_Completions>>;
321 
322 template <class _Sender, class _Env>
323 using __to_sequence_completion_signatures = stdexec::make_completion_signatures<
324     _Sender, _Env, stdexec::completion_signatures<stdexec::set_value_t()>,
325     stdexec::__mconst<stdexec::completion_signatures<>>::__f>;
326 
327 template <class _Sequence, class _Env>
328 using __sequence_completion_signatures_of_t =
329     stdexec::__concat_completion_signatures_t<
330         stdexec::__try_make_completion_signatures<
331             _Sequence, _Env,
332             stdexec::completion_signatures<stdexec::set_value_t()>,
333             stdexec::__mconst<stdexec::completion_signatures<>>>,
334         stdexec::__mapply<
335             stdexec::__q<stdexec::__concat_completion_signatures_t>,
336             stdexec::__mapply<stdexec::__transform<stdexec::__mbind_back_q<
337                                   __to_sequence_completion_signatures, _Env>>,
338                               item_types_of_t<_Sequence, _Env>>>>;
339 
340 template <class _Receiver, class _Sender>
341 concept sequence_receiver_from =                                              //
342     stdexec::receiver<_Receiver> &&                                           //
343     stdexec::sender_in<_Sender, stdexec::env_of_t<_Receiver>> &&              //
344     sequence_receiver_of<
345         _Receiver, item_types_of_t<_Sender, stdexec::env_of_t<_Receiver>>> && //
346     ((sequence_sender_in<_Sender, stdexec::env_of_t<_Receiver>> &&
347       stdexec::receiver_of<_Receiver,
348                            stdexec::completion_signatures_of_t<
349                                _Sender, stdexec::env_of_t<_Receiver>>>) || //
350      (!sequence_sender_in<_Sender, stdexec::env_of_t<_Receiver>> &&
351       stdexec::__receiver_from<
352           __sequence_sndr::__stopped_means_break_t<_Receiver>,
353           next_sender_of_t<_Receiver, _Sender>>));
354 
355 namespace __sequence_sndr
356 {
357 struct subscribe_t;
358 
359 template <class _Env>
360 using __single_sender_completion_sigs =
361     __if_c<unstoppable_token<stop_token_of_t<_Env>>,
362            completion_signatures<set_value_t()>,
363            completion_signatures<set_value_t(), set_stopped_t()>>;
364 
365 template <class _Sender, class _Receiver>
366 concept __next_connectable_with_tag_invoke =
367     receiver<_Receiver> &&                                           //
368     sender_in<_Sender, env_of_t<_Receiver>> &&                       //
369     !sequence_sender_in<_Sender, env_of_t<_Receiver>> &&             //
370     sequence_receiver_of<_Receiver,
371                          item_types<stdexec::__decay_t<_Sender>>> && //
372     __receiver_from<__stopped_means_break_t<_Receiver>,
373                     next_sender_of_t<_Receiver, _Sender>> &&         //
374     __connect::__connectable_with_tag_invoke<
375         next_sender_of_t<_Receiver, _Sender>&&,
376         __stopped_means_break_t<_Receiver>>;
377 
378 template <class _Sender, class _Receiver>
379 concept __subscribeable_with_tag_invoke =
380     receiver<_Receiver> &&                              //
381     sequence_sender_in<_Sender, env_of_t<_Receiver>> && //
382     sequence_receiver_from<_Receiver, _Sender> &&       //
383     tag_invocable<subscribe_t, _Sender, _Receiver>;
384 
385 struct subscribe_t
386 {
387     template <class _Sender, class _Receiver>
388     using __tfx_sndr = __tfx_sender<_Sender, env_of_t<_Receiver>>;
389 
390     template <class _Sender, class _Receiver>
__select_implexec::__sequence_sndr::subscribe_t391     static constexpr auto __select_impl() noexcept
392     {
393         using _Domain = __late_domain_of_t<_Sender, env_of_t<_Receiver&>>;
394         constexpr bool _NothrowTfxSender =
395             __nothrow_callable<get_env_t, _Receiver&> &&
396             __nothrow_callable<transform_sender_t, _Domain, _Sender,
397                                env_of_t<_Receiver&>>;
398         using _TfxSender = __tfx_sndr<_Sender, _Receiver>;
399         if constexpr (__next_connectable_with_tag_invoke<_TfxSender, _Receiver>)
400         {
401             using _Result =
402                 tag_invoke_result_t<connect_t,
403                                     next_sender_of_t<_Receiver, _TfxSender>,
404                                     __stopped_means_break_t<_Receiver>>;
405             constexpr bool _Nothrow =
406                 nothrow_tag_invocable<connect_t,
407                                       next_sender_of_t<_Receiver, _TfxSender>,
408                                       __stopped_means_break_t<_Receiver>>;
409             return static_cast<_Result (*)() noexcept(_Nothrow)>(nullptr);
410         }
411         else if constexpr (__subscribeable_with_tag_invoke<_TfxSender,
412                                                            _Receiver>)
413         {
414             using _Result =
415                 tag_invoke_result_t<subscribe_t, _TfxSender, _Receiver>;
416             constexpr bool _Nothrow = //
417                 _NothrowTfxSender &&
418                 nothrow_tag_invocable<subscribe_t, _TfxSender, _Receiver>;
419             return static_cast<_Result (*)() noexcept(_Nothrow)>(nullptr);
420         }
421         else
422         {
423             return static_cast<__debug::__debug_operation (*)() noexcept>(
424                 nullptr);
425         }
426     }
427 
428     template <class _Sender, class _Receiver>
429     using __select_impl_t = decltype(__select_impl<_Sender, _Receiver>());
430 
431     template <sender _Sender, receiver _Receiver>
432         requires __next_connectable_with_tag_invoke<
433                      __tfx_sndr<_Sender, _Receiver>, _Receiver> ||
434                  __subscribeable_with_tag_invoke<__tfx_sndr<_Sender, _Receiver>,
435                                                  _Receiver> ||
436                  __is_debug_env<env_of_t<_Receiver>>
437     auto operator()(_Sender&& __sndr, _Receiver&& __rcvr) const
438         noexcept(__nothrow_callable<__select_impl_t<_Sender, _Receiver>>)
439             -> __call_result_t<__select_impl_t<_Sender, _Receiver>>
440     {
441         using _TfxSender = __tfx_sndr<_Sender, _Receiver>;
442         auto&& __env = get_env(__rcvr);
443         auto __domain = __get_late_domain(__sndr, __env);
444         if constexpr (__next_connectable_with_tag_invoke<_TfxSender, _Receiver>)
445         {
446             static_assert(
447                 operation_state<tag_invoke_result_t<
448                     connect_t, next_sender_of_t<_Receiver, _TfxSender>,
449                     __stopped_means_break_t<_Receiver>>>,
450                 "stdexec::connect(sender, receiver) must return a type that "
451                 "satisfies the operation_state concept");
452             next_sender_of_t<_Receiver, _TfxSender> __next = set_next(
453                 __rcvr, transform_sender(
454                             __domain, static_cast<_Sender&&>(__sndr), __env));
455             return tag_invoke(
456                 connect_t{},
457                 static_cast<next_sender_of_t<_Receiver, _TfxSender>&&>(__next),
458                 __stopped_means_break_t<_Receiver>{
459                     static_cast<_Receiver&&>(__rcvr)});
460         }
461         else if constexpr (__subscribeable_with_tag_invoke<_TfxSender,
462                                                            _Receiver>)
463         {
464             static_assert(
465                 operation_state<
466                     tag_invoke_result_t<subscribe_t, _TfxSender, _Receiver>>,
467                 "exec::subscribe(sender, receiver) must return a type that "
468                 "satisfies the operation_state concept");
469             return tag_invoke(subscribe_t{},
470                               transform_sender(__domain,
471                                                static_cast<_Sender&&>(__sndr),
472                                                __env),
473                               static_cast<_Receiver&&>(__rcvr));
474         }
475         else if constexpr (enable_sequence_sender<
476                                stdexec::__decay_t<_TfxSender>>)
477         {
478             // This should generate an instantiate backtrace that contains
479             // useful debugging information.
480             using __tag_invoke::tag_invoke;
481             tag_invoke(*this,
482                        transform_sender(__domain,
483                                         static_cast<_Sender&&>(__sndr), __env),
484                        static_cast<_Receiver&&>(__rcvr));
485         }
486         else
487         {
488             next_sender_of_t<_Receiver, _TfxSender> __next = set_next(
489                 __rcvr, transform_sender(
490                             __domain, static_cast<_Sender&&>(__sndr), __env));
491             return tag_invoke(
492                 connect_t{},
493                 static_cast<next_sender_of_t<_Receiver, _TfxSender>&&>(__next),
494                 __stopped_means_break_t<_Receiver>{
495                     static_cast<_Receiver&&>(__rcvr)});
496         }
497     }
498 
tag_invoke(forwarding_query_t,subscribe_t)499     friend constexpr auto tag_invoke(forwarding_query_t, subscribe_t) noexcept
500         -> bool
501     {
502         return false;
503     }
504 };
505 
506 template <class _Sender, class _Receiver>
507 using subscribe_result_t = __call_result_t<subscribe_t, _Sender, _Receiver>;
508 } // namespace __sequence_sndr
509 
510 using __sequence_sndr::__single_sender_completion_sigs;
511 
512 using __sequence_sndr::subscribe_t;
513 inline constexpr subscribe_t subscribe;
514 
515 using __sequence_sndr::subscribe_result_t;
516 
517 template <class _Sender, class _Receiver>
518 concept sequence_sender_to = sequence_receiver_from<_Receiver, _Sender> && //
519                              requires(_Sender&& __sndr, _Receiver&& __rcvr) {
520                                  {
521                                      subscribe(static_cast<_Sender&&>(__sndr),
522                                                static_cast<_Receiver&&>(__rcvr))
523                                  };
524                              };
525 
526 template <class _Receiver>
527 concept __stoppable_receiver =                              //
528     stdexec::__callable<stdexec::set_value_t, _Receiver> && //
529     (stdexec::unstoppable_token<
530          stdexec::stop_token_of_t<stdexec::env_of_t<_Receiver>>> ||
531      stdexec::__callable<stdexec::set_stopped_t, _Receiver>);
532 
533 template <class _Receiver>
534     requires __stoppable_receiver<_Receiver>
__set_value_unless_stopped(_Receiver && __rcvr)535 void __set_value_unless_stopped(_Receiver&& __rcvr)
536 {
537     using token_type = stdexec::stop_token_of_t<stdexec::env_of_t<_Receiver>>;
538     if constexpr (stdexec::unstoppable_token<token_type>)
539     {
540         stdexec::set_value(static_cast<_Receiver&&>(__rcvr));
541     }
542     else
543     {
544         auto token = stdexec::get_stop_token(stdexec::get_env(__rcvr));
545         if (!token.stop_requested())
546         {
547             stdexec::set_value(static_cast<_Receiver&&>(__rcvr));
548         }
549         else
550         {
551             stdexec::set_stopped(static_cast<_Receiver&&>(__rcvr));
552         }
553     }
554 }
555 } // namespace exec
556