1 /*
2  * Copyright (c) 2023 Maikel Nadolski
3  * Copyright (c) 2023 NVIDIA Corporation
4  *
5  * Licensed under the Apache License Version 2.0 with LLVM Exceptions
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  *   https://llvm.org/LICENSE.txt
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 #pragma once
18 
19 #include "../stdexec/execution.hpp"
20 
21 namespace exec
22 {
23 struct sequence_sender_t : stdexec::sender_t
24 {};
25 
26 using sequence_tag [[deprecated("Renamed to exec::sequence_sender_t")]] =
27     exec::sequence_sender_t;
28 
29 namespace __sequence_sndr
30 {
31 using namespace stdexec;
32 
33 template <class _Haystack>
34 struct __mall_contained_in_impl
35 {
36     template <class... _Needles>
37     using __f = __mand<__mapply<__mcontains<_Needles>, _Haystack>...>;
38 };
39 
40 template <class _Needles, class _Haystack>
41 using __mall_contained_in =
42     __mapply<__mall_contained_in_impl<_Haystack>, _Needles>;
43 
44 template <class _Needles, class _Haystack>
45 concept __all_contained_in = __v<__mall_contained_in<_Needles, _Haystack>>;
46 
47 // This concept checks if a given sender satisfies the requirements to be
48 // returned from `set_next`.
49 template <class _Sender, class _Env = empty_env>
50 concept next_sender =        //
51     sender_in<_Sender, _Env> //
52     &&
53     __all_contained_in<completion_signatures_of_t<_Sender, _Env>,
54                        completion_signatures<set_value_t(), set_stopped_t()>>;
55 
56 // This is a sequence-receiver CPO that is used to apply algorithms on an input
57 // sender and it returns a next-sender. `set_next` is usually called in a
58 // context where a sender will be connected to a receiver. Since calling
59 // `set_next` usually involves constructing senders it is allowed to throw an
60 // excpetion, which needs to be handled by a calling sequence-operation. The
61 // returned object is a sender that can complete with `set_value_t()` or
62 // `set_stopped_t()`.
63 struct set_next_t
64 {
65     template <receiver _Receiver, sender _Item>
66         requires tag_invocable<set_next_t, _Receiver&, _Item>
operator ()exec::__sequence_sndr::set_next_t67     auto operator()(_Receiver& __rcvr, _Item&& __item) const
68         noexcept(nothrow_tag_invocable<set_next_t, _Receiver&, _Item>)
69             -> tag_invoke_result_t<set_next_t, _Receiver&, _Item>
70     {
71         static_assert(
72             next_sender<tag_invoke_result_t<set_next_t, _Receiver&, _Item>>,
73             "The sender returned from set_next is required to complete with set_value_t() or "
74             "set_stopped_t()");
75         return tag_invoke(*this, __rcvr, static_cast<_Item&&>(__item));
76     }
77 };
78 } // namespace __sequence_sndr
79 
80 using __sequence_sndr::set_next_t;
81 inline constexpr set_next_t set_next;
82 
83 template <class _Receiver, class _Sender>
84 using next_sender_of_t = decltype(exec::set_next(
85     stdexec::__declval<stdexec::__decay_t<_Receiver>&>(),
86     stdexec::__declval<_Sender>()));
87 
88 namespace __sequence_sndr
89 {
90 
91 template <class _ReceiverId>
92 struct __stopped_means_break
93 {
94     struct __t
95     {
96         using receiver_concept = stdexec::receiver_t;
97         using __id = __stopped_means_break;
98         using _Receiver = stdexec::__t<_ReceiverId>;
99         using _Token = stop_token_of_t<env_of_t<_Receiver>>;
100         STDEXEC_ATTRIBUTE((no_unique_address))
101         _Receiver __rcvr_;
102 
get_envexec::__sequence_sndr::__stopped_means_break::__t103         auto get_env() const noexcept -> env_of_t<_Receiver>
104         {
105             return stdexec::get_env(__rcvr_);
106         }
107 
set_valueexec::__sequence_sndr::__stopped_means_break::__t108         void set_value() noexcept
109             requires __callable<set_value_t, _Receiver>
110         {
111             return stdexec::set_value(static_cast<_Receiver&&>(__rcvr_));
112         }
113 
set_stoppedexec::__sequence_sndr::__stopped_means_break::__t114         void set_stopped() noexcept
115             requires __callable<set_value_t, _Receiver> &&
116                      (unstoppable_token<_Token> ||
117                       __callable<set_stopped_t, _Receiver>)
118         {
119             if constexpr (unstoppable_token<_Token>)
120             {
121                 stdexec::set_value(static_cast<_Receiver&&>(__rcvr_));
122             }
123             else
124             {
125                 auto __token =
126                     stdexec::get_stop_token(stdexec::get_env(__rcvr_));
127                 if (__token.stop_requested())
128                 {
129                     stdexec::set_stopped(static_cast<_Receiver&&>(__rcvr_));
130                 }
131                 else
132                 {
133                     stdexec::set_value(static_cast<_Receiver&&>(__rcvr_));
134                 }
135             }
136         }
137     };
138 };
139 
140 template <class _Rcvr>
141 using __stopped_means_break_t =
142     __t<__stopped_means_break<__id<__decay_t<_Rcvr>>>>;
143 } // namespace __sequence_sndr
144 
145 template <class _Sender>
146 concept __enable_sequence_sender =                    //
147     requires { typename _Sender::sender_concept; } && //
148     stdexec::same_as<typename _Sender::sender_concept, sequence_sender_t>;
149 
150 template <class _Sender>
151 inline constexpr bool enable_sequence_sender =
152     __enable_sequence_sender<_Sender>;
153 
154 template <class... _Senders>
155 struct item_types
156 {};
157 
158 template <class _Tp>
159 concept __has_item_typedef = requires { typename _Tp::item_types; };
160 
161 /////////////////////////////////////////////////////////////////////////////
162 // [execution.sndtraits]
163 namespace __sequence_sndr
164 {
165 struct get_item_types_t;
166 template <class _Sender, class _Env>
167 using __tfx_sender =
168     transform_sender_result_t<__late_domain_of_t<_Sender, _Env>, _Sender, _Env>;
169 
170 template <class _Sender, class _Env>
171 concept __with_tag_invoke = //
172     tag_invocable<get_item_types_t, __tfx_sender<_Sender, _Env>, _Env>;
173 template <class _Sender, class _Env>
174 using __member_alias_t = //
175     typename __decay_t<__tfx_sender<_Sender, _Env>>::item_types;
176 
177 template <class _Sender, class _Env>
178 concept __with_member_alias = __mvalid<__member_alias_t, _Sender, _Env>;
179 
180 struct get_item_types_t
181 {
182     template <class _Sender, class _Env>
__implexec::__sequence_sndr::get_item_types_t183     static auto __impl()
184     {
185         static_assert(sizeof(_Sender),
186                       "Incomplete type used with get_item_types");
187         static_assert(sizeof(_Env), "Incomplete type used with get_item_types");
188         using _TfxSender = __tfx_sender<_Sender, _Env>;
189         if constexpr (__with_tag_invoke<_Sender, _Env>)
190         {
191             using _Result =
192                 tag_invoke_result_t<get_item_types_t, _TfxSender, _Env>;
193             return static_cast<_Result (*)()>(nullptr);
194         }
195         else if constexpr (__with_member_alias<_TfxSender, _Env>)
196         {
197             using _Result = __member_alias_t<_TfxSender, _Env>;
198             return static_cast<_Result (*)()>(nullptr);
199         }
200         else if constexpr (sender_in<_TfxSender, _Env> &&
201                            !enable_sequence_sender<
202                                stdexec::__decay_t<_TfxSender>>)
203         {
204             using _Result = item_types<stdexec::__decay_t<_TfxSender>>;
205             return static_cast<_Result (*)()>(nullptr);
206         }
207         else if constexpr (__is_debug_env<_Env>)
208         {
209             using __tag_invoke::tag_invoke;
210             // This ought to cause a hard error that indicates where the problem
211             // is.
212             using _Completions [[maybe_unused]] =
213                 tag_invoke_result_t<get_item_types_t,
214                                     __tfx_sender<_Sender, _Env>, _Env>;
215             return static_cast<__debug::__completion_signatures (*)()>(nullptr);
216         }
217         else
218         {
219             using _Result =
220                 __mexception<_UNRECOGNIZED_SENDER_TYPE_<>,
221                              _WITH_SENDER_<_Sender>, _WITH_ENVIRONMENT_<_Env>>;
222             return static_cast<_Result (*)()>(nullptr);
223         }
224     }
225 
226     template <class _Sender, class _Env = empty_env>
operator ()exec::__sequence_sndr::get_item_types_t227     constexpr auto operator()(_Sender&&, _Env&& = {}) const noexcept
228         -> decltype(__impl<_Sender, _Env>()())
229     {
230         return {};
231     }
232 };
233 } // namespace __sequence_sndr
234 
235 using __sequence_sndr::get_item_types_t;
236 inline constexpr get_item_types_t get_item_types{};
237 
238 template <class _Sender, class... _Env>
239 using item_types_of_t = decltype(get_item_types(stdexec::__declval<_Sender>(),
240                                                 stdexec::__declval<_Env>()...));
241 
242 template <class _Sender, class... _Env>
243 concept sequence_sender =                   //
244     stdexec::sender_in<_Sender, _Env...> && //
245     enable_sequence_sender<stdexec::__decay_t<_Sender>>;
246 
247 template <class _Sender, class... _Env>
248 concept has_sequence_item_types =
249     requires(_Sender&& __sndr, _Env&&... __env) {
250         get_item_types(static_cast<_Sender&&>(__sndr),
251                        static_cast<_Env&&>(__env)...);
252     };
253 
254 template <class _Sender, class... _Env>
255 concept sequence_sender_in =                     //
256     stdexec::sender_in<_Sender, _Env...> &&      //
257     has_sequence_item_types<_Sender, _Env...> && //
258     sequence_sender<_Sender, _Env...>;
259 
260 template <class _Receiver>
261 struct _WITH_RECEIVER_
262 {};
263 
264 template <class _Item>
265 struct _MISSING_SET_NEXT_OVERLOAD_FOR_ITEM_
266 {};
267 
268 template <class _Receiver, class _Item>
269 auto __try_item(_Item*) //
270     -> stdexec::__mexception<_MISSING_SET_NEXT_OVERLOAD_FOR_ITEM_<_Item>,
271                              _WITH_RECEIVER_<_Receiver>>;
272 
273 template <class _Receiver, class _Item>
274     requires stdexec::__callable<set_next_t, _Receiver&, _Item>
275 auto __try_item(_Item*) -> stdexec::__msuccess;
276 
277 template <class _Receiver, class... _Items>
278 auto __try_items(exec::item_types<_Items...>*) //
279     -> decltype((stdexec::__msuccess(), ...,
280                  exec::__try_item<_Receiver>(static_cast<_Items*>(nullptr))));
281 
282 template <class _Receiver, class _Items>
283 concept __sequence_receiver_of =
284     requires(_Items* __items) {
285         {
286             exec::__try_items<stdexec::__decay_t<_Receiver>>(__items)
287         } -> stdexec::__ok;
288     };
289 
290 template <class _Receiver, class _SequenceItems>
291 concept sequence_receiver_of =      //
292     stdexec::receiver<_Receiver> && //
293     __sequence_receiver_of<_Receiver, _SequenceItems>;
294 
295 template <class _Completions>
296 using __to_sequence_completions_t = //
297     stdexec::__transform_completion_signatures<
298         _Completions,
299         stdexec::__mconst<
300             stdexec::completion_signatures<stdexec::set_value_t()>>::__f,
301         stdexec::__sigs::__default_set_error,
302         stdexec::completion_signatures<stdexec::set_stopped_t()>,
303         stdexec::__concat_completion_signatures>;
304 
305 template <class _Sender, class... _Env>
306 using __item_completion_signatures = //
307     stdexec::transform_completion_signatures<
308         stdexec::__completion_signatures_of_t<_Sender, _Env...>,
309         stdexec::completion_signatures<stdexec::set_value_t()>,
310         stdexec::__mconst<stdexec::completion_signatures<>>::__f>;
311 
312 template <class _Sequence, class... _Env>
313 using __sequence_completion_signatures = //
314     stdexec::transform_completion_signatures<
315         stdexec::__completion_signatures_of_t<_Sequence, _Env...>,
316         stdexec::completion_signatures<stdexec::set_value_t()>,
317         stdexec::__mconst<stdexec::completion_signatures<>>::__f>;
318 
319 template <class _Sequence, class... _Env>
320 using __sequence_completion_signatures_of_t = //
321     stdexec::__mapply<
322         stdexec::__mtransform<
323             stdexec::__mbind_back_q<__item_completion_signatures, _Env...>,
324             stdexec::__mbind_back<
325                 stdexec::__mtry_q<stdexec::__concat_completion_signatures>,
326                 __sequence_completion_signatures<_Sequence, _Env...>>>,
327         item_types_of_t<_Sequence, _Env...>>;
328 
329 template <class _Receiver, class _Sender>
330 concept sequence_receiver_from =                                              //
331     stdexec::receiver<_Receiver> &&                                           //
332     stdexec::sender_in<_Sender, stdexec::env_of_t<_Receiver>> &&              //
333     sequence_receiver_of<
334         _Receiver, item_types_of_t<_Sender, stdexec::env_of_t<_Receiver>>> && //
335     ((sequence_sender_in<_Sender, stdexec::env_of_t<_Receiver>> &&
336       stdexec::receiver_of<_Receiver,
337                            stdexec::completion_signatures_of_t<
338                                _Sender, stdexec::env_of_t<_Receiver>>>) || //
339      (!sequence_sender_in<_Sender, stdexec::env_of_t<_Receiver>> &&
340       stdexec::__receiver_from<
341           __sequence_sndr::__stopped_means_break_t<_Receiver>,
342           next_sender_of_t<_Receiver, _Sender>>));
343 
344 namespace __sequence_sndr
345 {
346 struct subscribe_t;
347 
348 template <class _Env>
349 using __single_sender_completion_sigs =
350     __if_c<unstoppable_token<stop_token_of_t<_Env>>,
351            completion_signatures<set_value_t()>,
352            completion_signatures<set_value_t(), set_stopped_t()>>;
353 
354 template <class _Sender, class _Receiver>
355 concept __next_connectable =
356     receiver<_Receiver> &&                                           //
357     sender_in<_Sender, env_of_t<_Receiver>> &&                       //
358     !sequence_sender_in<_Sender, env_of_t<_Receiver>> &&             //
359     sequence_receiver_of<_Receiver,
360                          item_types<stdexec::__decay_t<_Sender>>> && //
361     sender_to<next_sender_of_t<_Receiver, _Sender>,
362               __stopped_means_break_t<_Receiver>>;
363 
364 template <class _Sender, class _Receiver>
365 concept __subscribeable_with_tag_invoke =
366     receiver<_Receiver> &&                              //
367     sequence_sender_in<_Sender, env_of_t<_Receiver>> && //
368     sequence_receiver_from<_Receiver, _Sender> &&       //
369     tag_invocable<subscribe_t, _Sender, _Receiver>;
370 
371 struct subscribe_t
372 {
373     template <class _Sender, class _Receiver>
374     using __tfx_sndr = __tfx_sender<_Sender, env_of_t<_Receiver>>;
375 
376     template <class _Sender, class _Receiver>
__select_implexec::__sequence_sndr::subscribe_t377     static constexpr auto __select_impl() noexcept
378     {
379         using _Domain = __late_domain_of_t<_Sender, env_of_t<_Receiver&>>;
380         constexpr bool _NothrowTfxSender =
381             __nothrow_callable<transform_sender_t, _Domain, _Sender,
382                                env_of_t<_Receiver&>>;
383         using _TfxSender = __tfx_sndr<_Sender, _Receiver>;
384         if constexpr (__next_connectable<_TfxSender, _Receiver>)
385         {
386             using _Result =
387                 connect_result_t<next_sender_of_t<_Receiver, _TfxSender>,
388                                  __stopped_means_break_t<_Receiver>>;
389             constexpr bool _Nothrow =
390                 __nothrow_connectable<next_sender_of_t<_Receiver, _TfxSender>,
391                                       __stopped_means_break_t<_Receiver>>;
392             return static_cast<_Result (*)() noexcept(_Nothrow)>(nullptr);
393         }
394         else if constexpr (__subscribeable_with_tag_invoke<_TfxSender,
395                                                            _Receiver>)
396         {
397             using _Result =
398                 tag_invoke_result_t<subscribe_t, _TfxSender, _Receiver>;
399             constexpr bool _Nothrow = //
400                 _NothrowTfxSender &&
401                 nothrow_tag_invocable<subscribe_t, _TfxSender, _Receiver>;
402             return static_cast<_Result (*)() noexcept(_Nothrow)>(nullptr);
403         }
404         else
405         {
406             return static_cast<__debug::__debug_operation (*)() noexcept>(
407                 nullptr);
408         }
409     }
410 
411     template <class _Sender, class _Receiver>
412     using __select_impl_t = decltype(__select_impl<_Sender, _Receiver>());
413 
414     template <sender _Sender, receiver _Receiver>
415         requires __next_connectable<__tfx_sndr<_Sender, _Receiver>,
416                                     _Receiver> ||
417                      __subscribeable_with_tag_invoke<
418                          __tfx_sndr<_Sender, _Receiver>, _Receiver> ||
419                      __is_debug_env<env_of_t<_Receiver>>
420     auto operator()(_Sender&& __sndr, _Receiver&& __rcvr) const
421         noexcept(__nothrow_callable<__select_impl_t<_Sender, _Receiver>>)
422             -> __call_result_t<__select_impl_t<_Sender, _Receiver>>
423     {
424         using _TfxSender = __tfx_sndr<_Sender, _Receiver>;
425         auto&& __env = get_env(__rcvr);
426         auto __domain = __get_late_domain(__sndr, __env);
427         if constexpr (__next_connectable<_TfxSender, _Receiver>)
428         {
429             static_assert(
430                 operation_state<
431                     connect_result_t<next_sender_of_t<_Receiver, _TfxSender>,
432                                      __stopped_means_break_t<_Receiver>>>,
433                 "stdexec::connect(sender, receiver) must return a type that "
434                 "satisfies the operation_state concept");
435             next_sender_of_t<_Receiver, _TfxSender> __next = set_next(
436                 __rcvr, transform_sender(
437                             __domain, static_cast<_Sender&&>(__sndr), __env));
438             return stdexec::connect(
439                 static_cast<next_sender_of_t<_Receiver, _TfxSender>&&>(__next),
440                 __stopped_means_break_t<_Receiver>{
441                     static_cast<_Receiver&&>(__rcvr)});
442         }
443         else if constexpr (__subscribeable_with_tag_invoke<_TfxSender,
444                                                            _Receiver>)
445         {
446             static_assert(
447                 operation_state<
448                     tag_invoke_result_t<subscribe_t, _TfxSender, _Receiver>>,
449                 "exec::subscribe(sender, receiver) must return a type that "
450                 "satisfies the operation_state concept");
451             return tag_invoke(
452                 subscribe_t{},
453                 transform_sender(__domain, static_cast<_Sender&&>(__sndr),
454                                  __env),
455                 static_cast<_Receiver&&>(__rcvr));
456         }
457         else if constexpr (enable_sequence_sender<
458                                stdexec::__decay_t<_TfxSender>>)
459         {
460             // This should generate an instantiate backtrace that contains
461             // useful debugging information.
462             using __tag_invoke::tag_invoke;
463             tag_invoke(*this,
464                        transform_sender(__domain,
465                                         static_cast<_Sender&&>(__sndr), __env),
466                        static_cast<_Receiver&&>(__rcvr));
467         }
468         else
469         {
470             next_sender_of_t<_Receiver, _TfxSender> __next = set_next(
471                 __rcvr, transform_sender(
472                             __domain, static_cast<_Sender&&>(__sndr), __env));
473             return tag_invoke(
474                 connect_t{},
475                 static_cast<next_sender_of_t<_Receiver, _TfxSender>&&>(__next),
476                 __stopped_means_break_t<_Receiver>{
477                     static_cast<_Receiver&&>(__rcvr)});
478         }
479     }
480 
queryexec::__sequence_sndr::subscribe_t481     static constexpr auto query(stdexec::forwarding_query_t) noexcept -> bool
482     {
483         return false;
484     }
485 };
486 
487 template <class _Sender, class _Receiver>
488 using subscribe_result_t = __call_result_t<subscribe_t, _Sender, _Receiver>;
489 } // namespace __sequence_sndr
490 
491 using __sequence_sndr::__single_sender_completion_sigs;
492 
493 using __sequence_sndr::subscribe_t;
494 inline constexpr subscribe_t subscribe;
495 
496 using __sequence_sndr::subscribe_result_t;
497 
498 template <class _Sender, class _Receiver>
499 concept sequence_sender_to =
500     sequence_receiver_from<_Receiver, _Sender> && //
501     requires(_Sender&& __sndr, _Receiver&& __rcvr) {
502         subscribe(static_cast<_Sender&&>(__sndr),
503                   static_cast<_Receiver&&>(__rcvr));
504     };
505 
506 template <class _Receiver>
507 concept __stoppable_receiver =                              //
508     stdexec::__callable<stdexec::set_value_t, _Receiver> && //
509     (stdexec::unstoppable_token<
510          stdexec::stop_token_of_t<stdexec::env_of_t<_Receiver>>> ||
511      stdexec::__callable<stdexec::set_stopped_t, _Receiver>);
512 
513 template <class _Receiver>
514     requires __stoppable_receiver<_Receiver>
__set_value_unless_stopped(_Receiver && __rcvr)515 void __set_value_unless_stopped(_Receiver&& __rcvr)
516 {
517     using token_type = stdexec::stop_token_of_t<stdexec::env_of_t<_Receiver>>;
518     if constexpr (stdexec::unstoppable_token<token_type>)
519     {
520         stdexec::set_value(static_cast<_Receiver&&>(__rcvr));
521     }
522     else
523     {
524         auto token = stdexec::get_stop_token(stdexec::get_env(__rcvr));
525         if (!token.stop_requested())
526         {
527             stdexec::set_value(static_cast<_Receiver&&>(__rcvr));
528         }
529         else
530         {
531             stdexec::set_stopped(static_cast<_Receiver&&>(__rcvr));
532         }
533     }
534 }
535 } // namespace exec
536