1 /*
2  * Copyright (c) 2021-2024 NVIDIA Corporation
3  *
4  * Licensed under the Apache License Version 2.0 with LLVM Exceptions
5  * (the "License"); you may not use this file except in compliance with
6  * the License. You may obtain a copy of the License at
7  *
8  *   https://llvm.org/LICENSE.txt
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
18 #include "__concepts.hpp"
19 #include "__env.hpp"
20 #include "__execution_fwd.hpp"
21 #include "__meta.hpp"
22 #include "__receivers.hpp"
23 #include "__senders.hpp"
24 #include "__type_traits.hpp"
25 
26 #include <memory>
27 
28 namespace stdexec
29 {
30 namespace
31 {
32 inline constexpr auto __ref = []<class _Ty>(_Ty& __ty) noexcept {
33     return [__ty = &__ty]() noexcept -> decltype(auto) { return (*__ty); };
34 };
35 } // namespace
36 
37 template <class _Ty>
38 using __ref_t = decltype(__ref(__declval<_Ty&>()));
39 
40 /////////////////////////////////////////////////////////////////////////////
41 // NOT TO SPEC: __submit
42 namespace __submit_
43 {
44 template <class _OpRef>
45 struct __receiver
46 {
47     using receiver_concept = receiver_t;
48     using __t = __receiver;
49     using __id = __receiver;
50 
51     using _Operation = __decay_t<__call_result_t<_OpRef>>;
52     using _Receiver = stdexec::__t<__mapply<__q<__msecond>, _Operation>>;
53 
54     _OpRef __opref_;
55 
__delete_opstdexec::__submit_::__receiver56     void __delete_op() noexcept
57     {
58         _Operation* __op = &__opref_();
59         if constexpr (__callable<get_allocator_t, env_of_t<_Receiver>>)
60         {
61             auto&& __env = stdexec::get_env(__op->__rcvr_);
62             auto __alloc = stdexec::get_allocator(__env);
63             using _Alloc = decltype(__alloc);
64             using _OpAlloc = typename std::allocator_traits<
65                 _Alloc>::template rebind_alloc<_Operation>;
66             _OpAlloc __op_alloc{__alloc};
67             std::allocator_traits<_OpAlloc>::destroy(__op_alloc, __op);
68             std::allocator_traits<_OpAlloc>::deallocate(__op_alloc, __op, 1);
69         }
70         else
71         {
72             delete __op;
73         }
74     }
75 
76     // Forward all the receiver ops, and delete the operation state.
77     template <class... _As>
set_valuestdexec::__submit_::__receiver78     void set_value(_As&&... __as) noexcept
79     {
80         stdexec::set_value(static_cast<_Receiver&&>(__opref_().__rcvr_),
81                            static_cast<_As&&>(__as)...);
82         __delete_op();
83     }
84 
85     template <class _Error>
set_errorstdexec::__submit_::__receiver86     void set_error(_Error&& __err) noexcept
87     {
88         stdexec::set_error(static_cast<_Receiver&&>(__opref_().__rcvr_),
89                            static_cast<_Error&&>(__err));
90         __delete_op();
91     }
92 
set_stoppedstdexec::__submit_::__receiver93     void set_stopped() noexcept
94     {
95         stdexec::set_stopped(__opref_().__rcvr_);
96         __delete_op();
97     }
98 
99     // Forward all receiever queries.
get_envstdexec::__submit_::__receiver100     auto get_env() const noexcept -> env_of_t<_Receiver&>
101     {
102         return stdexec::get_env(__opref_().__rcvr_);
103     }
104 };
105 
106 template <class _SenderId, class _ReceiverId>
107 struct __operation
108 {
109     using _Sender = stdexec::__t<_SenderId>;
110     using _Receiver = stdexec::__t<_ReceiverId>;
111     using __receiver_t = __receiver<__ref_t<__operation>>;
112 
113     STDEXEC_ATTRIBUTE((no_unique_address))
114     _Receiver __rcvr_;
115     connect_result_t<_Sender, __receiver_t> __op_state_;
116 
__operationstdexec::__submit_::__operation117     __operation(_Sender&& __sndr, _Receiver __rcvr) :
118         __rcvr_(static_cast<_Receiver&&>(__rcvr)),
119         __op_state_(
120             connect(static_cast<_Sender&&>(__sndr), __receiver_t{__ref(*this)}))
121     {}
122 };
123 
124 struct __submit_t
125 {
126     template <receiver _Receiver, sender_to<_Receiver> _Sender>
operator ()stdexec::__submit_::__submit_t127     void operator()(_Sender&& __sndr, _Receiver __rcvr) const noexcept(false)
128     {
129         if constexpr (__callable<get_allocator_t, env_of_t<_Receiver>>)
130         {
131             auto&& __env = get_env(__rcvr);
132             auto __alloc = get_allocator(__env);
133             using _Alloc = decltype(__alloc);
134             using _Op = __operation<__id<_Sender>, __id<_Receiver>>;
135             using _OpAlloc = typename std::allocator_traits<
136                 _Alloc>::template rebind_alloc<_Op>;
137             _OpAlloc __op_alloc{__alloc};
138             auto __op =
139                 std::allocator_traits<_OpAlloc>::allocate(__op_alloc, 1);
140             try
141             {
142                 std::allocator_traits<_OpAlloc>::construct(
143                     __op_alloc, __op, static_cast<_Sender&&>(__sndr),
144                     static_cast<_Receiver&&>(__rcvr));
145                 stdexec::start(__op->__op_state_);
146             }
147             catch (...)
148             {
149                 std::allocator_traits<_OpAlloc>::deallocate(__op_alloc, __op,
150                                                             1);
151                 throw;
152             }
153         }
154         else
155         {
156             start((new __operation<__id<_Sender>, __id<_Receiver>>{
157                        static_cast<_Sender&&>(__sndr),
158                        static_cast<_Receiver&&>(__rcvr)})
159                       ->__op_state_);
160         }
161     }
162 };
163 } // namespace __submit_
164 
165 using __submit_::__submit_t;
166 inline constexpr __submit_t __submit{};
167 } // namespace stdexec
168