1 /* 2 * Copyright (c) 2021-2024 NVIDIA Corporation 3 * 4 * Licensed under the Apache License Version 2.0 with LLVM Exceptions 5 * (the "License"); you may not use this file except in compliance with 6 * the License. You may obtain a copy of the License at 7 * 8 * https://llvm.org/LICENSE.txt 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 #pragma once 17 18 #include "__concepts.hpp" 19 #include "__env.hpp" 20 #include "__execution_fwd.hpp" 21 #include "__meta.hpp" 22 #include "__receivers.hpp" 23 #include "__senders.hpp" 24 #include "__type_traits.hpp" 25 26 #include <memory> 27 28 namespace stdexec 29 { 30 namespace 31 { 32 inline constexpr auto __ref = []<class _Ty>(_Ty& __ty) noexcept { 33 return [__ty = &__ty]() noexcept -> decltype(auto) { return (*__ty); }; 34 }; 35 } // namespace 36 37 template <class _Ty> 38 using __ref_t = decltype(__ref(__declval<_Ty&>())); 39 40 ///////////////////////////////////////////////////////////////////////////// 41 // NOT TO SPEC: __submit 42 namespace __submit_ 43 { 44 template <class _OpRef> 45 struct __receiver 46 { 47 using receiver_concept = receiver_t; 48 using __t = __receiver; 49 using __id = __receiver; 50 51 using _Operation = __decay_t<__call_result_t<_OpRef>>; 52 using _Receiver = stdexec::__t<__mapply<__q<__msecond>, _Operation>>; 53 54 _OpRef __opref_; 55 __delete_opstdexec::__submit_::__receiver56 void __delete_op() noexcept 57 { 58 _Operation* __op = &__opref_(); 59 if constexpr (__callable<get_allocator_t, env_of_t<_Receiver>>) 60 { 61 auto&& __env = stdexec::get_env(__op->__rcvr_); 62 auto __alloc = stdexec::get_allocator(__env); 63 using _Alloc = decltype(__alloc); 64 using _OpAlloc = typename std::allocator_traits< 65 _Alloc>::template rebind_alloc<_Operation>; 66 _OpAlloc __op_alloc{__alloc}; 67 std::allocator_traits<_OpAlloc>::destroy(__op_alloc, __op); 68 std::allocator_traits<_OpAlloc>::deallocate(__op_alloc, __op, 1); 69 } 70 else 71 { 72 delete __op; 73 } 74 } 75 76 // Forward all the receiver ops, and delete the operation state. 77 template <class... _As> set_valuestdexec::__submit_::__receiver78 void set_value(_As&&... __as) noexcept 79 { 80 stdexec::set_value(static_cast<_Receiver&&>(__opref_().__rcvr_), 81 static_cast<_As&&>(__as)...); 82 __delete_op(); 83 } 84 85 template <class _Error> set_errorstdexec::__submit_::__receiver86 void set_error(_Error&& __err) noexcept 87 { 88 stdexec::set_error(static_cast<_Receiver&&>(__opref_().__rcvr_), 89 static_cast<_Error&&>(__err)); 90 __delete_op(); 91 } 92 set_stoppedstdexec::__submit_::__receiver93 void set_stopped() noexcept 94 { 95 stdexec::set_stopped(__opref_().__rcvr_); 96 __delete_op(); 97 } 98 99 // Forward all receiever queries. get_envstdexec::__submit_::__receiver100 auto get_env() const noexcept -> env_of_t<_Receiver&> 101 { 102 return stdexec::get_env(__opref_().__rcvr_); 103 } 104 }; 105 106 template <class _SenderId, class _ReceiverId> 107 struct __operation 108 { 109 using _Sender = stdexec::__t<_SenderId>; 110 using _Receiver = stdexec::__t<_ReceiverId>; 111 using __receiver_t = __receiver<__ref_t<__operation>>; 112 113 STDEXEC_ATTRIBUTE((no_unique_address)) 114 _Receiver __rcvr_; 115 connect_result_t<_Sender, __receiver_t> __op_state_; 116 __operationstdexec::__submit_::__operation117 __operation(_Sender&& __sndr, _Receiver __rcvr) : 118 __rcvr_(static_cast<_Receiver&&>(__rcvr)), 119 __op_state_( 120 connect(static_cast<_Sender&&>(__sndr), __receiver_t{__ref(*this)})) 121 {} 122 }; 123 124 struct __submit_t 125 { 126 template <receiver _Receiver, sender_to<_Receiver> _Sender> operator ()stdexec::__submit_::__submit_t127 void operator()(_Sender&& __sndr, _Receiver __rcvr) const noexcept(false) 128 { 129 if constexpr (__callable<get_allocator_t, env_of_t<_Receiver>>) 130 { 131 auto&& __env = get_env(__rcvr); 132 auto __alloc = get_allocator(__env); 133 using _Alloc = decltype(__alloc); 134 using _Op = __operation<__id<_Sender>, __id<_Receiver>>; 135 using _OpAlloc = typename std::allocator_traits< 136 _Alloc>::template rebind_alloc<_Op>; 137 _OpAlloc __op_alloc{__alloc}; 138 auto __op = 139 std::allocator_traits<_OpAlloc>::allocate(__op_alloc, 1); 140 try 141 { 142 std::allocator_traits<_OpAlloc>::construct( 143 __op_alloc, __op, static_cast<_Sender&&>(__sndr), 144 static_cast<_Receiver&&>(__rcvr)); 145 stdexec::start(__op->__op_state_); 146 } 147 catch (...) 148 { 149 std::allocator_traits<_OpAlloc>::deallocate(__op_alloc, __op, 150 1); 151 throw; 152 } 153 } 154 else 155 { 156 start((new __operation<__id<_Sender>, __id<_Receiver>>{ 157 static_cast<_Sender&&>(__sndr), 158 static_cast<_Receiver&&>(__rcvr)}) 159 ->__op_state_); 160 } 161 } 162 }; 163 } // namespace __submit_ 164 165 using __submit_::__submit_t; 166 inline constexpr __submit_t __submit{}; 167 } // namespace stdexec 168