xref: /openbmc/sdbusplus/include/sdbusplus/async/stdexec/sequence_senders.hpp (revision 5f1c0bd59873c0326edce2a0a933a6091deb97fc)
1 /*
2  * Copyright (c) 2023 Maikel Nadolski
3  * Copyright (c) 2023 NVIDIA Corporation
4  *
5  * Licensed under the Apache License Version 2.0 with LLVM Exceptions
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  *   https://llvm.org/LICENSE.txt
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 #pragma once
18 
19 #include "../stdexec/execution.hpp"
20 
21 namespace exec
22 {
23 struct sequence_tag
24 {};
25 
26 namespace __sequence_sndr
27 {
28 using namespace stdexec;
29 
30 template <class _Haystack>
31 struct __mall_contained_in_impl
32 {
33     template <class... _Needles>
34     using __f = __mand<__mapply<__contains<_Needles>, _Haystack>...>;
35 };
36 
37 template <class _Needles, class _Haystack>
38 using __mall_contained_in =
39     __mapply<__mall_contained_in_impl<_Haystack>, _Needles>;
40 
41 template <class _Needles, class _Haystack>
42 concept __all_contained_in = __mall_contained_in<_Needles, _Haystack>::value;
43 
44 // This concept checks if a given sender satisfies the requirements to be
45 // returned from `set_next`.
46 template <class _Sender, class _Env = empty_env>
47 concept next_sender =        //
48     sender_in<_Sender, _Env> //
49     &&
50     __all_contained_in<completion_signatures_of_t<_Sender, _Env>,
51                        completion_signatures<set_value_t(), set_stopped_t()>>;
52 
53 // This is a sequence-receiver CPO that is used to apply algorithms on an input
54 // sender and it returns a next-sender. `set_next` is usually called in a
55 // context where a sender will be connected to a receiver. Since calling
56 // `set_next` usually involves constructing senders it is allowed to throw an
57 // excpetion, which needs to be handled by a calling sequence-operation. The
58 // returned object is a sender that can complete with `set_value_t()` or
59 // `set_stopped_t()`.
60 struct set_next_t
61 {
62     template <receiver _Receiver, sender _Item>
63         requires tag_invocable<set_next_t, _Receiver&, _Item>
64     auto operator()(_Receiver& __rcvr, _Item&& __item) const
65         noexcept(nothrow_tag_invocable<set_next_t, _Receiver&, _Item>)
66             -> tag_invoke_result_t<set_next_t, _Receiver&, _Item>
67     {
68         static_assert(
69             next_sender<tag_invoke_result_t<set_next_t, _Receiver&, _Item>>,
70             "The sender returned from set_next is required to complete with set_value_t() or "
71             "set_stopped_t()");
72         return tag_invoke(*this, __rcvr, (_Item&&)__item);
73     }
74 };
75 } // namespace __sequence_sndr
76 
77 using __sequence_sndr::set_next_t;
78 inline constexpr set_next_t set_next;
79 
80 template <class _Receiver, class _Sender>
81 using next_sender_of_t = decltype(exec::set_next(
82     stdexec::__declval<stdexec::__decay_t<_Receiver>&>(),
83     stdexec::__declval<_Sender>()));
84 
85 namespace __sequence_sndr
86 {
87 
88 template <class _ReceiverId>
89 struct __stopped_means_break
90 {
91     struct __t
92     {
93         using is_receiver = void;
94         using __id = __stopped_means_break;
95         using _Receiver = stdexec::__t<_ReceiverId>;
96         using _Token = stop_token_of_t<env_of_t<_Receiver>>;
97         STDEXEC_ATTRIBUTE((no_unique_address)) _Receiver __rcvr_;
98 
99         template <same_as<get_env_t> _GetEnv, same_as<__t> _Self>
100         friend env_of_t<_Receiver> tag_invoke(_GetEnv,
101                                               const _Self& __self) noexcept
102         {
103             return stdexec::get_env(__self.__rcvr_);
104         }
105 
106         template <same_as<set_value_t> _SetValue, same_as<__t> _Self>
107             requires __callable<set_value_t, _Receiver&&>
108         friend void tag_invoke(_SetValue, _Self&& __self) noexcept
109         {
110             return stdexec::set_value(static_cast<_Receiver&&>(__self.__rcvr_));
111         }
112 
113         template <same_as<set_stopped_t> _SetStopped, same_as<__t> _Self>
114             requires __callable<set_value_t, _Receiver&&> &&
115                      (unstoppable_token<_Token> ||
116                       __callable<set_stopped_t, _Receiver &&>)
117         friend void tag_invoke(_SetStopped, _Self&& __self) noexcept
118         {
119             if constexpr (unstoppable_token<_Token>)
120             {
121                 stdexec::set_value(static_cast<_Receiver&&>(__self.__rcvr_));
122             }
123             else
124             {
125                 auto __token =
126                     stdexec::get_stop_token(stdexec::get_env(__self.__rcvr_));
127                 if (__token.stop_requested())
128                 {
129                     stdexec::set_stopped(
130                         static_cast<_Receiver&&>(__self.__rcvr_));
131                 }
132                 else
133                 {
134                     stdexec::set_value(
135                         static_cast<_Receiver&&>(__self.__rcvr_));
136                 }
137             }
138         }
139     };
140 };
141 
142 template <class _Rcvr>
143 using __stopped_means_break_t =
144     __t<__stopped_means_break<__id<__decay_t<_Rcvr>>>>;
145 } // namespace __sequence_sndr
146 
147 template <class _Sender>
148 concept __enable_sequence_sender =               //
149     requires { typename _Sender::is_sender; } && //
150     stdexec::same_as<typename _Sender::is_sender, sequence_tag>;
151 
152 template <class _Sender>
153 inline constexpr bool enable_sequence_sender =
154     __enable_sequence_sender<_Sender>;
155 
156 template <class... _Senders>
157 struct item_types
158 {};
159 
160 template <class _Tp>
161 concept __has_item_typedef = requires { typename _Tp::item_types; };
162 
163 /////////////////////////////////////////////////////////////////////////////
164 // [execution.sndtraits]
165 namespace __sequence_sndr
166 {
167 struct get_item_types_t;
168 template <class _Sender, class _Env>
169 using __tfx_sender =
170     transform_sender_result_t<__late_domain_of_t<_Sender, _Env>, _Sender, _Env>;
171 
172 template <class _Sender, class _Env>
173 concept __with_tag_invoke = //
174     tag_invocable<get_item_types_t, __tfx_sender<_Sender, _Env>, _Env>;
175 template <class _Sender, class _Env>
176 using __member_alias_t = //
177     typename __decay_t<__tfx_sender<_Sender, _Env>>::item_types;
178 
179 template <class _Sender, class _Env>
180 concept __with_member_alias = __mvalid<__member_alias_t, _Sender, _Env>;
181 
182 struct get_item_types_t
183 {
184     template <class _Sender, class _Env>
185     static auto __impl()
186     {
187         static_assert(sizeof(_Sender),
188                       "Incomplete type used with get_item_types");
189         static_assert(sizeof(_Env), "Incomplete type used with get_item_types");
190         using _TfxSender = __tfx_sender<_Sender, _Env>;
191         if constexpr (__with_tag_invoke<_Sender, _Env>)
192         {
193             using _Result =
194                 tag_invoke_result_t<get_item_types_t, _TfxSender, _Env>;
195             return (_Result(*)()) nullptr;
196         }
197         else if constexpr (__with_member_alias<_TfxSender, _Env>)
198         {
199             using _Result = __member_alias_t<_TfxSender, _Env>;
200             return (_Result(*)()) nullptr;
201         }
202         else if constexpr (sender_in<_TfxSender, _Env> &&
203                            !enable_sequence_sender<
204                                stdexec::__decay_t<_TfxSender>>)
205         {
206             using _Result = item_types<stdexec::__decay_t<_TfxSender>>;
207             return (_Result(*)()) nullptr;
208         }
209         else if constexpr (__is_debug_env<_Env>)
210         {
211             using __tag_invoke::tag_invoke;
212             // This ought to cause a hard error that indicates where the problem
213             // is.
214             using _Completions [[maybe_unused]] =
215                 tag_invoke_result_t<get_item_types_t,
216                                     __tfx_sender<_Sender, _Env>, _Env>;
217             return (__debug::__completion_signatures(*)()) nullptr;
218         }
219         else
220         {
221             using _Result =
222                 __mexception<_UNRECOGNIZED_SENDER_TYPE_<>,
223                              _WITH_SENDER_<_Sender>, _WITH_ENVIRONMENT_<_Env>>;
224             return (_Result(*)()) nullptr;
225         }
226     }
227 
228     template <class _Sender, class _Env = __default_env>
229     constexpr auto operator()(_Sender&&, const _Env&) const noexcept
230         -> decltype(__impl<_Sender, _Env>()())
231     {
232         return {};
233     }
234 };
235 } // namespace __sequence_sndr
236 
237 using __sequence_sndr::get_item_types_t;
238 inline constexpr get_item_types_t get_item_types{};
239 
240 template <class _Sender, class _Env>
241 using item_types_of_t = decltype(get_item_types(stdexec::__declval<_Sender>(),
242                                                 stdexec::__declval<_Env>()));
243 
244 template <class _Sender, class _Env>
245 concept sequence_sender =                //
246     stdexec::sender_in<_Sender, _Env> && //
247     enable_sequence_sender<stdexec::__decay_t<_Sender>>;
248 
249 template <class _Sender, class _Env>
250 concept has_sequence_item_types =
251     requires(_Sender&& __sndr, _Env&& __env) {
252         get_item_types((_Sender&&)__sndr, (_Env&&)__env);
253     };
254 
255 template <class _Sender, class _Env>
256 concept sequence_sender_in =                  //
257     stdexec::sender_in<_Sender, _Env> &&      //
258     has_sequence_item_types<_Sender, _Env> && //
259     sequence_sender<_Sender, _Env>;
260 
261 template <class _Receiver>
262 struct _WITH_RECEIVER_
263 {};
264 
265 template <class _Item>
266 struct _MISSING_SET_NEXT_OVERLOAD_FOR_ITEM_
267 {};
268 
269 template <class _Receiver, class _Item>
270 auto __try_item(_Item*)
271     -> stdexec::__mexception<_MISSING_SET_NEXT_OVERLOAD_FOR_ITEM_<_Item>,
272                              _WITH_RECEIVER_<_Receiver>>;
273 
274 template <class _Receiver, class _Item>
275     requires stdexec::__callable<set_next_t, _Receiver&, _Item>
276 stdexec::__msuccess __try_item(_Item*);
277 
278 template <class _Receiver, class... _Items>
279 auto __try_items(exec::item_types<_Items...>*)
280     -> decltype((stdexec::__msuccess(), ...,
281                  exec::__try_item<_Receiver>((_Items*)nullptr)));
282 
283 template <class _Receiver, class _Items>
284 concept __sequence_receiver_of =
285     requires(_Items* __items) {
286         {
287             exec::__try_items<stdexec::__decay_t<_Receiver>>(__items)
288         } -> stdexec::__ok;
289     };
290 
291 template <class _Receiver, class _SequenceItems>
292 concept sequence_receiver_of =      //
293     stdexec::receiver<_Receiver> && //
294     __sequence_receiver_of<_Receiver, _SequenceItems>;
295 
296 template <class _Items, class _Env>
297 using __concat_item_signatures_t = stdexec::__mapply<
298     stdexec::__q<stdexec::__concat_completion_signatures_t>,
299     stdexec::__mapply<stdexec::__transform<stdexec::__mbind_back_q<
300                           stdexec::completion_signatures_of_t, _Env>>,
301                       _Items>>;
302 
303 template <class _Completions>
304 using __gather_error_signals =
305     stdexec::__only_gather_signal<stdexec::set_error_t, _Completions>;
306 
307 template <class _Completions>
308 using __gather_stopped_signals =
309     stdexec::__only_gather_signal<stdexec::set_stopped_t, _Completions>;
310 
311 template <class _Completions>
312 using __to_sequence_completions_t = stdexec::__concat_completion_signatures_t<
313     stdexec::completion_signatures<stdexec::set_value_t()>,
314     __gather_error_signals<_Completions>,
315     __gather_stopped_signals<_Completions>>;
316 
317 template <class _Sender, class _Env>
318 using __to_sequence_completion_signatures = stdexec::make_completion_signatures<
319     _Sender, _Env, stdexec::completion_signatures<stdexec::set_value_t()>,
320     stdexec::__mconst<stdexec::completion_signatures<>>::__f>;
321 
322 template <class _Sequence, class _Env>
323 using __sequence_completion_signatures_of_t =
324     stdexec::__concat_completion_signatures_t<
325         stdexec::__try_make_completion_signatures<
326             _Sequence, _Env,
327             stdexec::completion_signatures<stdexec::set_value_t()>,
328             stdexec::__mconst<stdexec::completion_signatures<>>>,
329         stdexec::__mapply<
330             stdexec::__q<stdexec::__concat_completion_signatures_t>,
331             stdexec::__mapply<stdexec::__transform<stdexec::__mbind_back_q<
332                                   __to_sequence_completion_signatures, _Env>>,
333                               item_types_of_t<_Sequence, _Env>>>>;
334 
335 template <class _Receiver, class _Sender>
336 concept sequence_receiver_from =                                              //
337     stdexec::receiver<_Receiver> &&                                           //
338     stdexec::sender_in<_Sender, stdexec::env_of_t<_Receiver>> &&              //
339     sequence_receiver_of<
340         _Receiver, item_types_of_t<_Sender, stdexec::env_of_t<_Receiver>>> && //
341     ((sequence_sender_in<_Sender, stdexec::env_of_t<_Receiver>> &&
342       stdexec::receiver_of<_Receiver,
343                            stdexec::completion_signatures_of_t<
344                                _Sender, stdexec::env_of_t<_Receiver>>>) || //
345      (!sequence_sender_in<_Sender, stdexec::env_of_t<_Receiver>> &&
346       stdexec::__receiver_from<
347           __sequence_sndr::__stopped_means_break_t<_Receiver>,
348           next_sender_of_t<_Receiver, _Sender>>));
349 
350 namespace __sequence_sndr
351 {
352 struct subscribe_t;
353 
354 template <class _Env>
355 using __single_sender_completion_sigs =
356     __if_c<unstoppable_token<stop_token_of_t<_Env>>,
357            completion_signatures<set_value_t()>,
358            completion_signatures<set_value_t(), set_stopped_t()>>;
359 
360 template <class _Sender, class _Receiver>
361 concept __next_connectable_with_tag_invoke =
362     receiver<_Receiver> &&                                           //
363     sender_in<_Sender, env_of_t<_Receiver>> &&                       //
364     !sequence_sender_in<_Sender, env_of_t<_Receiver>> &&             //
365     sequence_receiver_of<_Receiver,
366                          item_types<stdexec::__decay_t<_Sender>>> && //
367     __receiver_from<__stopped_means_break_t<_Receiver>,
368                     next_sender_of_t<_Receiver, _Sender>> &&         //
369     __connect::__connectable_with_tag_invoke<
370         next_sender_of_t<_Receiver, _Sender>&&,
371         __stopped_means_break_t<_Receiver>>;
372 
373 template <class _Sender, class _Receiver>
374 concept __subscribeable_with_tag_invoke =
375     receiver<_Receiver> &&                              //
376     sequence_sender_in<_Sender, env_of_t<_Receiver>> && //
377     sequence_receiver_from<_Receiver, _Sender> &&       //
378     tag_invocable<subscribe_t, _Sender, _Receiver>;
379 
380 struct subscribe_t
381 {
382     template <class _Sender, class _Receiver>
383     using __tfx_sndr = __tfx_sender<_Sender, env_of_t<_Receiver>>;
384 
385     template <class _Sender, class _Receiver>
386     static constexpr auto __select_impl() noexcept
387     {
388         using _Domain = __late_domain_of_t<_Sender, env_of_t<_Receiver&>>;
389         constexpr bool _NothrowTfxSender =
390             __nothrow_callable<get_env_t, _Receiver&> &&
391             __nothrow_callable<transform_sender_t, _Domain, _Sender,
392                                env_of_t<_Receiver&>>;
393         using _TfxSender = __tfx_sndr<_Sender, _Receiver>;
394         if constexpr (__next_connectable_with_tag_invoke<_TfxSender, _Receiver>)
395         {
396             using _Result =
397                 tag_invoke_result_t<connect_t,
398                                     next_sender_of_t<_Receiver, _TfxSender>,
399                                     __stopped_means_break_t<_Receiver>>;
400             constexpr bool _Nothrow =
401                 nothrow_tag_invocable<connect_t,
402                                       next_sender_of_t<_Receiver, _TfxSender>,
403                                       __stopped_means_break_t<_Receiver>>;
404             return static_cast<_Result (*)() noexcept(_Nothrow)>(nullptr);
405         }
406         else if constexpr (__subscribeable_with_tag_invoke<_TfxSender,
407                                                            _Receiver>)
408         {
409             using _Result =
410                 tag_invoke_result_t<subscribe_t, _TfxSender, _Receiver>;
411             constexpr bool _Nothrow = //
412                 _NothrowTfxSender &&
413                 nothrow_tag_invocable<subscribe_t, _TfxSender, _Receiver>;
414             return static_cast<_Result (*)() noexcept(_Nothrow)>(nullptr);
415         }
416         else
417         {
418             return static_cast<__debug::__debug_operation (*)() noexcept>(
419                 nullptr);
420         }
421     }
422 
423     template <class _Sender, class _Receiver>
424     using __select_impl_t = decltype(__select_impl<_Sender, _Receiver>());
425 
426     template <sender _Sender, receiver _Receiver>
427         requires __next_connectable_with_tag_invoke<
428                      __tfx_sndr<_Sender, _Receiver>, _Receiver> ||
429                  __subscribeable_with_tag_invoke<__tfx_sndr<_Sender, _Receiver>,
430                                                  _Receiver> ||
431                  __is_debug_env<env_of_t<_Receiver>>
432     auto operator()(_Sender&& __sndr, _Receiver&& __rcvr) const
433         noexcept(__nothrow_callable<__select_impl_t<_Sender, _Receiver>>)
434             -> __call_result_t<__select_impl_t<_Sender, _Receiver>>
435     {
436         using _TfxSender = __tfx_sndr<_Sender, _Receiver>;
437         auto&& __env = get_env(__rcvr);
438         auto __domain = __get_late_domain(__sndr, __env);
439         if constexpr (__next_connectable_with_tag_invoke<_TfxSender, _Receiver>)
440         {
441             static_assert(
442                 operation_state<tag_invoke_result_t<
443                     connect_t, next_sender_of_t<_Receiver, _TfxSender>,
444                     __stopped_means_break_t<_Receiver>>>,
445                 "stdexec::connect(sender, receiver) must return a type that "
446                 "satisfies the operation_state concept");
447             next_sender_of_t<_Receiver, _TfxSender> __next = set_next(
448                 __rcvr, transform_sender(__domain, (_Sender&&)__sndr, __env));
449             return tag_invoke(
450                 connect_t{},
451                 static_cast<next_sender_of_t<_Receiver, _TfxSender>&&>(__next),
452                 __stopped_means_break_t<_Receiver>{(_Receiver&&)__rcvr});
453         }
454         else if constexpr (__subscribeable_with_tag_invoke<_TfxSender,
455                                                            _Receiver>)
456         {
457             static_assert(
458                 operation_state<
459                     tag_invoke_result_t<subscribe_t, _TfxSender, _Receiver>>,
460                 "exec::subscribe(sender, receiver) must return a type that "
461                 "satisfies the operation_state concept");
462             return tag_invoke(
463                 subscribe_t{},
464                 transform_sender(__domain, (_Sender&&)__sndr, __env),
465                 (_Receiver&&)__rcvr);
466         }
467         else if constexpr (enable_sequence_sender<
468                                stdexec::__decay_t<_TfxSender>>)
469         {
470             // This should generate an instantiate backtrace that contains
471             // useful debugging information.
472             using __tag_invoke::tag_invoke;
473             tag_invoke(*this,
474                        transform_sender(__domain, (_Sender&&)__sndr, __env),
475                        (_Receiver&&)__rcvr);
476         }
477         else
478         {
479             next_sender_of_t<_Receiver, _TfxSender> __next = set_next(
480                 __rcvr, transform_sender(__domain, (_Sender&&)__sndr, __env));
481             return tag_invoke(
482                 connect_t{},
483                 static_cast<next_sender_of_t<_Receiver, _TfxSender>&&>(__next),
484                 __stopped_means_break_t<_Receiver>{(_Receiver&&)__rcvr});
485         }
486     }
487 
488     friend constexpr bool tag_invoke(forwarding_query_t, subscribe_t) noexcept
489     {
490         return false;
491     }
492 };
493 
494 template <class _Sender, class _Receiver>
495 using subscribe_result_t = __call_result_t<subscribe_t, _Sender, _Receiver>;
496 } // namespace __sequence_sndr
497 
498 using __sequence_sndr::__single_sender_completion_sigs;
499 
500 using __sequence_sndr::subscribe_t;
501 inline constexpr subscribe_t subscribe;
502 
503 using __sequence_sndr::subscribe_result_t;
504 
505 template <class _Sender, class _Receiver>
506 concept sequence_sender_to =
507     sequence_receiver_from<_Receiver, _Sender> && //
508     requires(_Sender&& __sndr, _Receiver&& __rcvr) {
509         {
510             subscribe((_Sender&&)__sndr, (_Receiver&&)__rcvr)
511         };
512     };
513 
514 template <class _Receiver>
515 concept __stoppable_receiver =                              //
516     stdexec::__callable<stdexec::set_value_t, _Receiver> && //
517     (stdexec::unstoppable_token<
518          stdexec::stop_token_of_t<stdexec::env_of_t<_Receiver>>> ||
519      stdexec::__callable<stdexec::set_stopped_t, _Receiver>);
520 
521 template <class _Receiver>
522     requires __stoppable_receiver<_Receiver>
523 void __set_value_unless_stopped(_Receiver&& __rcvr)
524 {
525     using token_type = stdexec::stop_token_of_t<stdexec::env_of_t<_Receiver>>;
526     if constexpr (stdexec::unstoppable_token<token_type>)
527     {
528         stdexec::set_value(static_cast<_Receiver&&>(__rcvr));
529     }
530     else
531     {
532         auto token = stdexec::get_stop_token(stdexec::get_env(__rcvr));
533         if (!token.stop_requested())
534         {
535             stdexec::set_value(static_cast<_Receiver&&>(__rcvr));
536         }
537         else
538         {
539             stdexec::set_stopped(static_cast<_Receiver&&>(__rcvr));
540         }
541     }
542 }
543 } // namespace exec
544