1 #pragma once 2 #include "dbus_singleton.hpp" 3 #include "http_request.hpp" 4 #include "http_response.hpp" 5 6 #include <boost/algorithm/string/predicate.hpp> 7 #include <boost/asio/buffer.hpp> 8 #include <boost/asio/steady_timer.hpp> 9 #include <boost/beast/core/multi_buffer.hpp> 10 #include <boost/beast/http/buffer_body.hpp> 11 #include <boost/beast/websocket.hpp> 12 13 #include <array> 14 #include <functional> 15 16 #ifdef BMCWEB_ENABLE_SSL 17 #include <boost/beast/websocket/ssl.hpp> 18 #endif 19 20 namespace crow 21 { 22 23 namespace sse_socket 24 { 25 struct Connection : std::enable_shared_from_this<Connection> 26 { 27 public: 28 Connection() = default; 29 30 Connection(const Connection&) = delete; 31 Connection(Connection&&) = delete; 32 Connection& operator=(const Connection&) = delete; 33 Connection& operator=(const Connection&&) = delete; 34 virtual ~Connection() = default; 35 36 virtual boost::asio::io_context& getIoContext() = 0; 37 virtual void close(std::string_view msg = "quit") = 0; 38 virtual void sendEvent(std::string_view id, std::string_view msg) = 0; 39 }; 40 41 template <typename Adaptor> 42 class ConnectionImpl : public Connection 43 { 44 public: 45 ConnectionImpl(Adaptor&& adaptorIn, 46 std::function<void(Connection&)> openHandlerIn, 47 std::function<void(Connection&)> closeHandlerIn) : 48 adaptor(std::move(adaptorIn)), 49 timer(ioc), openHandler(std::move(openHandlerIn)), 50 closeHandler(std::move(closeHandlerIn)) 51 { 52 BMCWEB_LOG_DEBUG("SseConnectionImpl: SSE constructor {}", logPtr(this)); 53 } 54 55 ConnectionImpl(const ConnectionImpl&) = delete; 56 ConnectionImpl(const ConnectionImpl&&) = delete; 57 ConnectionImpl& operator=(const ConnectionImpl&) = delete; 58 ConnectionImpl& operator=(const ConnectionImpl&&) = delete; 59 60 ~ConnectionImpl() override 61 { 62 BMCWEB_LOG_DEBUG("SSE ConnectionImpl: SSE destructor {}", logPtr(this)); 63 } 64 65 boost::asio::io_context& getIoContext() override 66 { 67 return static_cast<boost::asio::io_context&>( 68 adaptor.get_executor().context()); 69 } 70 71 void start() 72 { 73 if (!openHandler) 74 { 75 BMCWEB_LOG_CRITICAL("No open handler???"); 76 return; 77 } 78 openHandler(*this); 79 } 80 81 void close(const std::string_view msg) override 82 { 83 // send notification to handler for cleanup 84 if (closeHandler) 85 { 86 closeHandler(*this); 87 } 88 BMCWEB_LOG_DEBUG("Closing SSE connection {} - {}", logPtr(this), msg); 89 boost::beast::get_lowest_layer(adaptor).close(); 90 } 91 92 void sendSSEHeader() 93 { 94 BMCWEB_LOG_DEBUG("Starting SSE connection"); 95 using BodyType = boost::beast::http::buffer_body; 96 boost::beast::http::response<BodyType> res( 97 boost::beast::http::status::ok, 11, BodyType{}); 98 res.set(boost::beast::http::field::content_type, "text/event-stream"); 99 res.body().more = true; 100 boost::beast::http::response_serializer<BodyType>& ser = 101 serializer.emplace(std::move(res)); 102 103 boost::beast::http::async_write_header( 104 adaptor, ser, 105 std::bind_front(&ConnectionImpl::sendSSEHeaderCallback, this, 106 shared_from_this())); 107 } 108 109 void sendSSEHeaderCallback(const std::shared_ptr<Connection>& /*self*/, 110 const boost::system::error_code& ec, 111 size_t /*bytesSent*/) 112 { 113 serializer.reset(); 114 if (ec) 115 { 116 BMCWEB_LOG_ERROR("Error sending header{}", ec); 117 close("async_write_header failed"); 118 return; 119 } 120 BMCWEB_LOG_DEBUG("SSE header sent - Connection established"); 121 122 serializer.reset(); 123 124 // SSE stream header sent, So let us setup monitor. 125 // Any read data on this stream will be error in case of SSE. 126 127 adaptor.async_wait(boost::asio::ip::tcp::socket::wait_error, 128 std::bind_front(&ConnectionImpl::afterReadError, 129 this, shared_from_this())); 130 } 131 132 void afterReadError(const std::shared_ptr<Connection>& /*self*/, 133 const boost::system::error_code& ec) 134 { 135 if (ec == boost::asio::error::operation_aborted) 136 { 137 return; 138 } 139 if (ec) 140 { 141 BMCWEB_LOG_ERROR("Read error: {}", ec); 142 } 143 144 close("Close SSE connection"); 145 } 146 147 void doWrite() 148 { 149 if (doingWrite) 150 { 151 return; 152 } 153 if (inputBuffer.size() == 0) 154 { 155 BMCWEB_LOG_DEBUG("inputBuffer is empty... Bailing out"); 156 return; 157 } 158 startTimeout(); 159 doingWrite = true; 160 161 adaptor.async_write_some( 162 inputBuffer.data(), 163 std::bind_front(&ConnectionImpl::doWriteCallback, this, 164 weak_from_this())); 165 } 166 167 void doWriteCallback(const std::weak_ptr<Connection>& weak, 168 const boost::beast::error_code& ec, 169 size_t bytesTransferred) 170 { 171 auto self = weak.lock(); 172 if (self == nullptr) 173 { 174 return; 175 } 176 timer.cancel(); 177 doingWrite = false; 178 inputBuffer.consume(bytesTransferred); 179 180 if (ec == boost::asio::error::eof) 181 { 182 BMCWEB_LOG_ERROR("async_write_some() SSE stream closed"); 183 close("SSE stream closed"); 184 return; 185 } 186 187 if (ec) 188 { 189 BMCWEB_LOG_ERROR("async_write_some() failed: {}", ec.message()); 190 close("async_write_some failed"); 191 return; 192 } 193 BMCWEB_LOG_DEBUG("async_write_some() bytes transferred: {}", 194 bytesTransferred); 195 196 doWrite(); 197 } 198 199 void sendEvent(std::string_view id, std::string_view msg) override 200 { 201 if (msg.empty()) 202 { 203 BMCWEB_LOG_DEBUG("Empty data, bailing out."); 204 return; 205 } 206 207 dataFormat(id, msg); 208 209 doWrite(); 210 } 211 212 void dataFormat(std::string_view id, std::string_view msg) 213 { 214 std::string rawData; 215 if (!id.empty()) 216 { 217 rawData += "id: "; 218 rawData.append(id); 219 rawData += "\n"; 220 } 221 222 rawData += "data: "; 223 for (char character : msg) 224 { 225 rawData += character; 226 if (character == '\n') 227 { 228 rawData += "data: "; 229 } 230 } 231 rawData += "\n\n"; 232 233 boost::asio::buffer_copy(inputBuffer.prepare(rawData.size()), 234 boost::asio::buffer(rawData)); 235 inputBuffer.commit(rawData.size()); 236 } 237 238 void startTimeout() 239 { 240 std::weak_ptr<Connection> weakSelf = weak_from_this(); 241 timer.expires_after(std::chrono::seconds(30)); 242 timer.async_wait(std::bind_front(&ConnectionImpl::onTimeoutCallback, 243 this, weak_from_this())); 244 } 245 246 void onTimeoutCallback(const std::weak_ptr<Connection>& weakSelf, 247 const boost::system::error_code& ec) 248 { 249 std::shared_ptr<Connection> self = weakSelf.lock(); 250 if (!self) 251 { 252 BMCWEB_LOG_CRITICAL("{} Failed to capture connection", 253 logPtr(self.get())); 254 return; 255 } 256 257 if (ec == boost::asio::error::operation_aborted) 258 { 259 BMCWEB_LOG_DEBUG("operation aborted"); 260 // Canceled wait means the path succeeeded. 261 return; 262 } 263 if (ec) 264 { 265 BMCWEB_LOG_CRITICAL("{} timer failed {}", logPtr(self.get()), ec); 266 } 267 268 BMCWEB_LOG_WARNING("{}Connection timed out, closing", 269 logPtr(self.get())); 270 271 self->close("closing connection"); 272 } 273 274 private: 275 Adaptor adaptor; 276 277 boost::beast::multi_buffer inputBuffer; 278 279 std::optional<boost::beast::http::response_serializer< 280 boost::beast::http::buffer_body>> 281 serializer; 282 boost::asio::io_context& ioc = 283 crow::connections::systemBus->get_io_context(); 284 boost::asio::steady_timer timer; 285 bool doingWrite = false; 286 287 std::function<void(Connection&)> openHandler; 288 std::function<void(Connection&)> closeHandler; 289 }; 290 } // namespace sse_socket 291 } // namespace crow 292