xref: /openbmc/sdbusplus/include/sdbusplus/async/stdexec/sequence_senders.hpp (revision 2bf0bb29cf5c44da8fc94cee8ef87e23def03dba)
1 /*
2  * Copyright (c) 2023 Maikel Nadolski
3  * Copyright (c) 2023 NVIDIA Corporation
4  *
5  * Licensed under the Apache License Version 2.0 with LLVM Exceptions
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  *   https://llvm.org/LICENSE.txt
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 #pragma once
18 
19 #include "../stdexec/execution.hpp"
20 
21 namespace exec
22 {
23 struct sequence_sender_t : stdexec::sender_t
24 {};
25 
26 using sequence_tag [[deprecated("Renamed to exec::sequence_sender_t")]] =
27     exec::sequence_sender_t;
28 
29 namespace __sequence_sndr
30 {
31 using namespace stdexec;
32 
33 template <class _Haystack>
34 struct __mall_contained_in_impl
35 {
36     template <class... _Needles>
37     using __f = __mand<__mapply<__contains<_Needles>, _Haystack>...>;
38 };
39 
40 template <class _Needles, class _Haystack>
41 using __mall_contained_in =
42     __mapply<__mall_contained_in_impl<_Haystack>, _Needles>;
43 
44 template <class _Needles, class _Haystack>
45 concept __all_contained_in = __mall_contained_in<_Needles, _Haystack>::value;
46 
47 // This concept checks if a given sender satisfies the requirements to be
48 // returned from `set_next`.
49 template <class _Sender, class _Env = empty_env>
50 concept next_sender =        //
51     sender_in<_Sender, _Env> //
52     &&
53     __all_contained_in<completion_signatures_of_t<_Sender, _Env>,
54                        completion_signatures<set_value_t(), set_stopped_t()>>;
55 
56 // This is a sequence-receiver CPO that is used to apply algorithms on an input
57 // sender and it returns a next-sender. `set_next` is usually called in a
58 // context where a sender will be connected to a receiver. Since calling
59 // `set_next` usually involves constructing senders it is allowed to throw an
60 // excpetion, which needs to be handled by a calling sequence-operation. The
61 // returned object is a sender that can complete with `set_value_t()` or
62 // `set_stopped_t()`.
63 struct set_next_t
64 {
65     template <receiver _Receiver, sender _Item>
66         requires tag_invocable<set_next_t, _Receiver&, _Item>
67     auto operator()(_Receiver& __rcvr, _Item&& __item) const
68         noexcept(nothrow_tag_invocable<set_next_t, _Receiver&, _Item>)
69             -> tag_invoke_result_t<set_next_t, _Receiver&, _Item>
70     {
71         static_assert(
72             next_sender<tag_invoke_result_t<set_next_t, _Receiver&, _Item>>,
73             "The sender returned from set_next is required to complete with set_value_t() or "
74             "set_stopped_t()");
75         return tag_invoke(*this, __rcvr, (_Item&&)__item);
76     }
77 };
78 } // namespace __sequence_sndr
79 
80 using __sequence_sndr::set_next_t;
81 inline constexpr set_next_t set_next;
82 
83 template <class _Receiver, class _Sender>
84 using next_sender_of_t = decltype(exec::set_next(
85     stdexec::__declval<stdexec::__decay_t<_Receiver>&>(),
86     stdexec::__declval<_Sender>()));
87 
88 namespace __sequence_sndr
89 {
90 
91 template <class _ReceiverId>
92 struct __stopped_means_break
93 {
94     struct __t
95     {
96         using receiver_concept = stdexec::receiver_t;
97         using __id = __stopped_means_break;
98         using _Receiver = stdexec::__t<_ReceiverId>;
99         using _Token = stop_token_of_t<env_of_t<_Receiver>>;
100         STDEXEC_ATTRIBUTE((no_unique_address)) _Receiver __rcvr_;
101 
102         template <same_as<get_env_t> _GetEnv, same_as<__t> _Self>
103         friend env_of_t<_Receiver> tag_invoke(_GetEnv,
104                                               const _Self& __self) noexcept
105         {
106             return stdexec::get_env(__self.__rcvr_);
107         }
108 
109         template <same_as<set_value_t> _SetValue, same_as<__t> _Self>
110             requires __callable<set_value_t, _Receiver&&>
111         friend void tag_invoke(_SetValue, _Self&& __self) noexcept
112         {
113             return stdexec::set_value(static_cast<_Receiver&&>(__self.__rcvr_));
114         }
115 
116         template <same_as<set_stopped_t> _SetStopped, same_as<__t> _Self>
117             requires __callable<set_value_t, _Receiver&&> &&
118                      (unstoppable_token<_Token> ||
119                       __callable<set_stopped_t, _Receiver &&>)
120         friend void tag_invoke(_SetStopped, _Self&& __self) noexcept
121         {
122             if constexpr (unstoppable_token<_Token>)
123             {
124                 stdexec::set_value(static_cast<_Receiver&&>(__self.__rcvr_));
125             }
126             else
127             {
128                 auto __token =
129                     stdexec::get_stop_token(stdexec::get_env(__self.__rcvr_));
130                 if (__token.stop_requested())
131                 {
132                     stdexec::set_stopped(
133                         static_cast<_Receiver&&>(__self.__rcvr_));
134                 }
135                 else
136                 {
137                     stdexec::set_value(
138                         static_cast<_Receiver&&>(__self.__rcvr_));
139                 }
140             }
141         }
142     };
143 };
144 
145 template <class _Rcvr>
146 using __stopped_means_break_t =
147     __t<__stopped_means_break<__id<__decay_t<_Rcvr>>>>;
148 } // namespace __sequence_sndr
149 
150 template <class _Sender>
151 concept __enable_sequence_sender =                    //
152     requires { typename _Sender::sender_concept; } && //
153     stdexec::same_as<typename _Sender::sender_concept, sequence_sender_t>;
154 
155 template <class _Sender>
156 inline constexpr bool enable_sequence_sender =
157     __enable_sequence_sender<_Sender>;
158 
159 template <class... _Senders>
160 struct item_types
161 {};
162 
163 template <class _Tp>
164 concept __has_item_typedef = requires { typename _Tp::item_types; };
165 
166 /////////////////////////////////////////////////////////////////////////////
167 // [execution.sndtraits]
168 namespace __sequence_sndr
169 {
170 struct get_item_types_t;
171 template <class _Sender, class _Env>
172 using __tfx_sender =
173     transform_sender_result_t<__late_domain_of_t<_Sender, _Env>, _Sender, _Env>;
174 
175 template <class _Sender, class _Env>
176 concept __with_tag_invoke = //
177     tag_invocable<get_item_types_t, __tfx_sender<_Sender, _Env>, _Env>;
178 template <class _Sender, class _Env>
179 using __member_alias_t = //
180     typename __decay_t<__tfx_sender<_Sender, _Env>>::item_types;
181 
182 template <class _Sender, class _Env>
183 concept __with_member_alias = __mvalid<__member_alias_t, _Sender, _Env>;
184 
185 struct get_item_types_t
186 {
187     template <class _Sender, class _Env>
188     static auto __impl()
189     {
190         static_assert(sizeof(_Sender),
191                       "Incomplete type used with get_item_types");
192         static_assert(sizeof(_Env), "Incomplete type used with get_item_types");
193         using _TfxSender = __tfx_sender<_Sender, _Env>;
194         if constexpr (__with_tag_invoke<_Sender, _Env>)
195         {
196             using _Result =
197                 tag_invoke_result_t<get_item_types_t, _TfxSender, _Env>;
198             return (_Result(*)()) nullptr;
199         }
200         else if constexpr (__with_member_alias<_TfxSender, _Env>)
201         {
202             using _Result = __member_alias_t<_TfxSender, _Env>;
203             return (_Result(*)()) nullptr;
204         }
205         else if constexpr (sender_in<_TfxSender, _Env> &&
206                            !enable_sequence_sender<
207                                stdexec::__decay_t<_TfxSender>>)
208         {
209             using _Result = item_types<stdexec::__decay_t<_TfxSender>>;
210             return (_Result(*)()) nullptr;
211         }
212         else if constexpr (__is_debug_env<_Env>)
213         {
214             using __tag_invoke::tag_invoke;
215             // This ought to cause a hard error that indicates where the problem
216             // is.
217             using _Completions [[maybe_unused]] =
218                 tag_invoke_result_t<get_item_types_t,
219                                     __tfx_sender<_Sender, _Env>, _Env>;
220             return (__debug::__completion_signatures(*)()) nullptr;
221         }
222         else
223         {
224             using _Result =
225                 __mexception<_UNRECOGNIZED_SENDER_TYPE_<>,
226                              _WITH_SENDER_<_Sender>, _WITH_ENVIRONMENT_<_Env>>;
227             return (_Result(*)()) nullptr;
228         }
229     }
230 
231     template <class _Sender, class _Env = __default_env>
232     constexpr auto operator()(_Sender&&, const _Env&) const noexcept
233         -> decltype(__impl<_Sender, _Env>()())
234     {
235         return {};
236     }
237 };
238 } // namespace __sequence_sndr
239 
240 using __sequence_sndr::get_item_types_t;
241 inline constexpr get_item_types_t get_item_types{};
242 
243 template <class _Sender, class _Env>
244 using item_types_of_t = decltype(get_item_types(stdexec::__declval<_Sender>(),
245                                                 stdexec::__declval<_Env>()));
246 
247 template <class _Sender, class _Env>
248 concept sequence_sender =                //
249     stdexec::sender_in<_Sender, _Env> && //
250     enable_sequence_sender<stdexec::__decay_t<_Sender>>;
251 
252 template <class _Sender, class _Env>
253 concept has_sequence_item_types =
254     requires(_Sender&& __sndr, _Env&& __env) {
255         get_item_types((_Sender&&)__sndr, (_Env&&)__env);
256     };
257 
258 template <class _Sender, class _Env>
259 concept sequence_sender_in =                  //
260     stdexec::sender_in<_Sender, _Env> &&      //
261     has_sequence_item_types<_Sender, _Env> && //
262     sequence_sender<_Sender, _Env>;
263 
264 template <class _Receiver>
265 struct _WITH_RECEIVER_
266 {};
267 
268 template <class _Item>
269 struct _MISSING_SET_NEXT_OVERLOAD_FOR_ITEM_
270 {};
271 
272 template <class _Receiver, class _Item>
273 auto __try_item(_Item*)
274     -> stdexec::__mexception<_MISSING_SET_NEXT_OVERLOAD_FOR_ITEM_<_Item>,
275                              _WITH_RECEIVER_<_Receiver>>;
276 
277 template <class _Receiver, class _Item>
278     requires stdexec::__callable<set_next_t, _Receiver&, _Item>
279 stdexec::__msuccess __try_item(_Item*);
280 
281 template <class _Receiver, class... _Items>
282 auto __try_items(exec::item_types<_Items...>*)
283     -> decltype((stdexec::__msuccess(), ...,
284                  exec::__try_item<_Receiver>((_Items*)nullptr)));
285 
286 template <class _Receiver, class _Items>
287 concept __sequence_receiver_of =
288     requires(_Items* __items) {
289         {
290             exec::__try_items<stdexec::__decay_t<_Receiver>>(__items)
291         } -> stdexec::__ok;
292     };
293 
294 template <class _Receiver, class _SequenceItems>
295 concept sequence_receiver_of =      //
296     stdexec::receiver<_Receiver> && //
297     __sequence_receiver_of<_Receiver, _SequenceItems>;
298 
299 template <class _Items, class _Env>
300 using __concat_item_signatures_t = stdexec::__mapply<
301     stdexec::__q<stdexec::__concat_completion_signatures_t>,
302     stdexec::__mapply<stdexec::__transform<stdexec::__mbind_back_q<
303                           stdexec::completion_signatures_of_t, _Env>>,
304                       _Items>>;
305 
306 template <class _Completions>
307 using __gather_error_signals =
308     stdexec::__only_gather_signal<stdexec::set_error_t, _Completions>;
309 
310 template <class _Completions>
311 using __gather_stopped_signals =
312     stdexec::__only_gather_signal<stdexec::set_stopped_t, _Completions>;
313 
314 template <class _Completions>
315 using __to_sequence_completions_t = stdexec::__concat_completion_signatures_t<
316     stdexec::completion_signatures<stdexec::set_value_t()>,
317     __gather_error_signals<_Completions>,
318     __gather_stopped_signals<_Completions>>;
319 
320 template <class _Sender, class _Env>
321 using __to_sequence_completion_signatures = stdexec::make_completion_signatures<
322     _Sender, _Env, stdexec::completion_signatures<stdexec::set_value_t()>,
323     stdexec::__mconst<stdexec::completion_signatures<>>::__f>;
324 
325 template <class _Sequence, class _Env>
326 using __sequence_completion_signatures_of_t =
327     stdexec::__concat_completion_signatures_t<
328         stdexec::__try_make_completion_signatures<
329             _Sequence, _Env,
330             stdexec::completion_signatures<stdexec::set_value_t()>,
331             stdexec::__mconst<stdexec::completion_signatures<>>>,
332         stdexec::__mapply<
333             stdexec::__q<stdexec::__concat_completion_signatures_t>,
334             stdexec::__mapply<stdexec::__transform<stdexec::__mbind_back_q<
335                                   __to_sequence_completion_signatures, _Env>>,
336                               item_types_of_t<_Sequence, _Env>>>>;
337 
338 template <class _Receiver, class _Sender>
339 concept sequence_receiver_from =                                              //
340     stdexec::receiver<_Receiver> &&                                           //
341     stdexec::sender_in<_Sender, stdexec::env_of_t<_Receiver>> &&              //
342     sequence_receiver_of<
343         _Receiver, item_types_of_t<_Sender, stdexec::env_of_t<_Receiver>>> && //
344     ((sequence_sender_in<_Sender, stdexec::env_of_t<_Receiver>> &&
345       stdexec::receiver_of<_Receiver,
346                            stdexec::completion_signatures_of_t<
347                                _Sender, stdexec::env_of_t<_Receiver>>>) || //
348      (!sequence_sender_in<_Sender, stdexec::env_of_t<_Receiver>> &&
349       stdexec::__receiver_from<
350           __sequence_sndr::__stopped_means_break_t<_Receiver>,
351           next_sender_of_t<_Receiver, _Sender>>));
352 
353 namespace __sequence_sndr
354 {
355 struct subscribe_t;
356 
357 template <class _Env>
358 using __single_sender_completion_sigs =
359     __if_c<unstoppable_token<stop_token_of_t<_Env>>,
360            completion_signatures<set_value_t()>,
361            completion_signatures<set_value_t(), set_stopped_t()>>;
362 
363 template <class _Sender, class _Receiver>
364 concept __next_connectable_with_tag_invoke =
365     receiver<_Receiver> &&                                           //
366     sender_in<_Sender, env_of_t<_Receiver>> &&                       //
367     !sequence_sender_in<_Sender, env_of_t<_Receiver>> &&             //
368     sequence_receiver_of<_Receiver,
369                          item_types<stdexec::__decay_t<_Sender>>> && //
370     __receiver_from<__stopped_means_break_t<_Receiver>,
371                     next_sender_of_t<_Receiver, _Sender>> &&         //
372     __connect::__connectable_with_tag_invoke<
373         next_sender_of_t<_Receiver, _Sender>&&,
374         __stopped_means_break_t<_Receiver>>;
375 
376 template <class _Sender, class _Receiver>
377 concept __subscribeable_with_tag_invoke =
378     receiver<_Receiver> &&                              //
379     sequence_sender_in<_Sender, env_of_t<_Receiver>> && //
380     sequence_receiver_from<_Receiver, _Sender> &&       //
381     tag_invocable<subscribe_t, _Sender, _Receiver>;
382 
383 struct subscribe_t
384 {
385     template <class _Sender, class _Receiver>
386     using __tfx_sndr = __tfx_sender<_Sender, env_of_t<_Receiver>>;
387 
388     template <class _Sender, class _Receiver>
389     static constexpr auto __select_impl() noexcept
390     {
391         using _Domain = __late_domain_of_t<_Sender, env_of_t<_Receiver&>>;
392         constexpr bool _NothrowTfxSender =
393             __nothrow_callable<get_env_t, _Receiver&> &&
394             __nothrow_callable<transform_sender_t, _Domain, _Sender,
395                                env_of_t<_Receiver&>>;
396         using _TfxSender = __tfx_sndr<_Sender, _Receiver>;
397         if constexpr (__next_connectable_with_tag_invoke<_TfxSender, _Receiver>)
398         {
399             using _Result =
400                 tag_invoke_result_t<connect_t,
401                                     next_sender_of_t<_Receiver, _TfxSender>,
402                                     __stopped_means_break_t<_Receiver>>;
403             constexpr bool _Nothrow =
404                 nothrow_tag_invocable<connect_t,
405                                       next_sender_of_t<_Receiver, _TfxSender>,
406                                       __stopped_means_break_t<_Receiver>>;
407             return static_cast<_Result (*)() noexcept(_Nothrow)>(nullptr);
408         }
409         else if constexpr (__subscribeable_with_tag_invoke<_TfxSender,
410                                                            _Receiver>)
411         {
412             using _Result =
413                 tag_invoke_result_t<subscribe_t, _TfxSender, _Receiver>;
414             constexpr bool _Nothrow = //
415                 _NothrowTfxSender &&
416                 nothrow_tag_invocable<subscribe_t, _TfxSender, _Receiver>;
417             return static_cast<_Result (*)() noexcept(_Nothrow)>(nullptr);
418         }
419         else
420         {
421             return static_cast<__debug::__debug_operation (*)() noexcept>(
422                 nullptr);
423         }
424     }
425 
426     template <class _Sender, class _Receiver>
427     using __select_impl_t = decltype(__select_impl<_Sender, _Receiver>());
428 
429     template <sender _Sender, receiver _Receiver>
430         requires __next_connectable_with_tag_invoke<
431                      __tfx_sndr<_Sender, _Receiver>, _Receiver> ||
432                  __subscribeable_with_tag_invoke<__tfx_sndr<_Sender, _Receiver>,
433                                                  _Receiver> ||
434                  __is_debug_env<env_of_t<_Receiver>>
435     auto operator()(_Sender&& __sndr, _Receiver&& __rcvr) const
436         noexcept(__nothrow_callable<__select_impl_t<_Sender, _Receiver>>)
437             -> __call_result_t<__select_impl_t<_Sender, _Receiver>>
438     {
439         using _TfxSender = __tfx_sndr<_Sender, _Receiver>;
440         auto&& __env = get_env(__rcvr);
441         auto __domain = __get_late_domain(__sndr, __env);
442         if constexpr (__next_connectable_with_tag_invoke<_TfxSender, _Receiver>)
443         {
444             static_assert(
445                 operation_state<tag_invoke_result_t<
446                     connect_t, next_sender_of_t<_Receiver, _TfxSender>,
447                     __stopped_means_break_t<_Receiver>>>,
448                 "stdexec::connect(sender, receiver) must return a type that "
449                 "satisfies the operation_state concept");
450             next_sender_of_t<_Receiver, _TfxSender> __next = set_next(
451                 __rcvr, transform_sender(__domain, (_Sender&&)__sndr, __env));
452             return tag_invoke(
453                 connect_t{},
454                 static_cast<next_sender_of_t<_Receiver, _TfxSender>&&>(__next),
455                 __stopped_means_break_t<_Receiver>{(_Receiver&&)__rcvr});
456         }
457         else if constexpr (__subscribeable_with_tag_invoke<_TfxSender,
458                                                            _Receiver>)
459         {
460             static_assert(
461                 operation_state<
462                     tag_invoke_result_t<subscribe_t, _TfxSender, _Receiver>>,
463                 "exec::subscribe(sender, receiver) must return a type that "
464                 "satisfies the operation_state concept");
465             return tag_invoke(
466                 subscribe_t{},
467                 transform_sender(__domain, (_Sender&&)__sndr, __env),
468                 (_Receiver&&)__rcvr);
469         }
470         else if constexpr (enable_sequence_sender<
471                                stdexec::__decay_t<_TfxSender>>)
472         {
473             // This should generate an instantiate backtrace that contains
474             // useful debugging information.
475             using __tag_invoke::tag_invoke;
476             tag_invoke(*this,
477                        transform_sender(__domain, (_Sender&&)__sndr, __env),
478                        (_Receiver&&)__rcvr);
479         }
480         else
481         {
482             next_sender_of_t<_Receiver, _TfxSender> __next = set_next(
483                 __rcvr, transform_sender(__domain, (_Sender&&)__sndr, __env));
484             return tag_invoke(
485                 connect_t{},
486                 static_cast<next_sender_of_t<_Receiver, _TfxSender>&&>(__next),
487                 __stopped_means_break_t<_Receiver>{(_Receiver&&)__rcvr});
488         }
489     }
490 
491     friend constexpr bool tag_invoke(forwarding_query_t, subscribe_t) noexcept
492     {
493         return false;
494     }
495 };
496 
497 template <class _Sender, class _Receiver>
498 using subscribe_result_t = __call_result_t<subscribe_t, _Sender, _Receiver>;
499 } // namespace __sequence_sndr
500 
501 using __sequence_sndr::__single_sender_completion_sigs;
502 
503 using __sequence_sndr::subscribe_t;
504 inline constexpr subscribe_t subscribe;
505 
506 using __sequence_sndr::subscribe_result_t;
507 
508 template <class _Sender, class _Receiver>
509 concept sequence_sender_to =
510     sequence_receiver_from<_Receiver, _Sender> && //
511     requires(_Sender&& __sndr, _Receiver&& __rcvr) {
512         {
513             subscribe((_Sender&&)__sndr, (_Receiver&&)__rcvr)
514         };
515     };
516 
517 template <class _Receiver>
518 concept __stoppable_receiver =                              //
519     stdexec::__callable<stdexec::set_value_t, _Receiver> && //
520     (stdexec::unstoppable_token<
521          stdexec::stop_token_of_t<stdexec::env_of_t<_Receiver>>> ||
522      stdexec::__callable<stdexec::set_stopped_t, _Receiver>);
523 
524 template <class _Receiver>
525     requires __stoppable_receiver<_Receiver>
526 void __set_value_unless_stopped(_Receiver&& __rcvr)
527 {
528     using token_type = stdexec::stop_token_of_t<stdexec::env_of_t<_Receiver>>;
529     if constexpr (stdexec::unstoppable_token<token_type>)
530     {
531         stdexec::set_value(static_cast<_Receiver&&>(__rcvr));
532     }
533     else
534     {
535         auto token = stdexec::get_stop_token(stdexec::get_env(__rcvr));
536         if (!token.stop_requested())
537         {
538             stdexec::set_value(static_cast<_Receiver&&>(__rcvr));
539         }
540         else
541         {
542             stdexec::set_stopped(static_cast<_Receiver&&>(__rcvr));
543         }
544     }
545 }
546 } // namespace exec
547