1 #pragma once 2 #include "async_resolve.hpp" 3 #include "async_resp.hpp" 4 #include "http_request.hpp" 5 #include "http_response.hpp" 6 7 #include <boost/algorithm/string/predicate.hpp> 8 #include <boost/asio/buffer.hpp> 9 #include <boost/asio/steady_timer.hpp> 10 #include <boost/beast/core/multi_buffer.hpp> 11 #include <boost/beast/http/buffer_body.hpp> 12 #include <boost/beast/websocket.hpp> 13 14 #include <array> 15 #include <functional> 16 17 #ifdef BMCWEB_ENABLE_SSL 18 #include <boost/beast/websocket/ssl.hpp> 19 #endif 20 21 namespace crow 22 { 23 24 namespace sse_socket 25 { 26 static constexpr const std::array<const char*, 1> sseRoutes = { 27 "/redfish/v1/EventService/SSE"}; 28 29 struct Connection : std::enable_shared_from_this<Connection> 30 { 31 public: 32 explicit Connection(const crow::Request& reqIn) : req(reqIn) {} 33 34 Connection(const Connection&) = delete; 35 Connection(Connection&&) = delete; 36 Connection& operator=(const Connection&) = delete; 37 Connection& operator=(const Connection&&) = delete; 38 virtual ~Connection() = default; 39 40 virtual boost::asio::io_context& getIoContext() = 0; 41 virtual void sendSSEHeader() = 0; 42 virtual void completeRequest(crow::Response& thisRes) = 0; 43 virtual void close(std::string_view msg = "quit") = 0; 44 virtual void sendEvent(std::string_view id, std::string_view msg) = 0; 45 46 crow::Request req; 47 }; 48 49 template <typename Adaptor> 50 class ConnectionImpl : public Connection 51 { 52 public: 53 ConnectionImpl( 54 const crow::Request& reqIn, Adaptor adaptorIn, 55 std::function<void(std::shared_ptr<Connection>&, const crow::Request&, 56 const std::shared_ptr<bmcweb::AsyncResp>&)> 57 openHandlerIn, 58 std::function<void(std::shared_ptr<Connection>&)> closeHandlerIn) : 59 Connection(reqIn), 60 adaptor(std::move(adaptorIn)), openHandler(std::move(openHandlerIn)), 61 closeHandler(std::move(closeHandlerIn)) 62 { 63 BMCWEB_LOG_DEBUG << "SseConnectionImpl: SSE constructor " << this; 64 } 65 66 ConnectionImpl(const ConnectionImpl&) = delete; 67 ConnectionImpl(const ConnectionImpl&&) = delete; 68 ConnectionImpl& operator=(const ConnectionImpl&) = delete; 69 ConnectionImpl& operator=(const ConnectionImpl&&) = delete; 70 71 ~ConnectionImpl() override 72 { 73 BMCWEB_LOG_DEBUG << "SSE ConnectionImpl: SSE destructor " << this; 74 } 75 76 boost::asio::io_context& getIoContext() override 77 { 78 return static_cast<boost::asio::io_context&>( 79 adaptor.get_executor().context()); 80 } 81 82 void start() 83 { 84 if (openHandler) 85 { 86 auto asyncResp = std::make_shared<bmcweb::AsyncResp>(); 87 std::shared_ptr<Connection> self = this->shared_from_this(); 88 89 asyncResp->res.setCompleteRequestHandler( 90 [self(shared_from_this())](crow::Response& thisRes) { 91 if (thisRes.resultInt() != 200) 92 { 93 self->completeRequest(thisRes); 94 } 95 }); 96 97 openHandler(self, req, asyncResp); 98 } 99 } 100 101 void close(const std::string_view msg) override 102 { 103 BMCWEB_LOG_DEBUG << "Closing SSE connection " << this << " - " << msg; 104 boost::beast::get_lowest_layer(adaptor).close(); 105 106 // send notification to handler for cleanup 107 if (closeHandler) 108 { 109 std::shared_ptr<Connection> self = shared_from_this(); 110 closeHandler(self); 111 } 112 } 113 114 void sendSSEHeader() override 115 { 116 BMCWEB_LOG_DEBUG << "Starting SSE connection"; 117 auto asyncResp = std::make_shared<bmcweb::AsyncResp>(); 118 using BodyType = boost::beast::http::buffer_body; 119 auto response = 120 std::make_shared<boost::beast::http::response<BodyType>>( 121 boost::beast::http::status::ok, 11); 122 123 serializer.emplace(*asyncResp->res.stringResponse); 124 125 response->set(boost::beast::http::field::content_type, 126 "text/event-stream"); 127 response->body().more = true; 128 129 boost::beast::http::async_write_header( 130 adaptor, *serializer, 131 std::bind_front(&ConnectionImpl::sendSSEHeaderCallback, this, 132 shared_from_this())); 133 } 134 135 void sendSSEHeaderCallback(const std::shared_ptr<Connection>& /*self*/, 136 const boost::beast::error_code& ec, 137 const std::size_t& /*unused*/) 138 { 139 if (ec) 140 { 141 BMCWEB_LOG_ERROR << "Error sending header" << ec; 142 close("async_write_header failed"); 143 return; 144 } 145 BMCWEB_LOG_DEBUG << "SSE header sent - Connection established"; 146 147 serializer.reset(); 148 149 // SSE stream header sent, So let us setup monitor. 150 // Any read data on this stream will be error in case of SSE. 151 setupRead(); 152 } 153 154 void setupRead() 155 { 156 std::weak_ptr<Connection> weakSelf = weak_from_this(); 157 158 boost::beast::http::async_read_some( 159 adaptor, outputBuffer, *parser, 160 std::bind_front(&ConnectionImpl::setupReadCallback, this, 161 weak_from_this())); 162 } 163 164 void setupReadCallback(const std::weak_ptr<Connection>& weakSelf, 165 const boost::system::error_code& ec, 166 size_t bytesRead) 167 { 168 std::shared_ptr<Connection> self = weakSelf.lock(); 169 BMCWEB_LOG_DEBUG << "async_read_some: Read " << bytesRead << " bytes"; 170 if (ec) 171 { 172 BMCWEB_LOG_ERROR << "Read error: " << ec; 173 } 174 175 // After establishing SSE stream, Reading data on this 176 // stream means client is disobeys the SSE protocol. 177 // Read the data to avoid buffer attacks and close connection. 178 179 self->close("Close SSE connection"); 180 } 181 182 void doWrite() 183 { 184 onTimeout(); 185 186 if (doingWrite) 187 { 188 return; 189 } 190 if (inputBuffer.size() == 0) 191 { 192 BMCWEB_LOG_DEBUG << "inputBuffer is empty... Bailing out"; 193 return; 194 } 195 doingWrite = true; 196 197 adaptor.async_write_some( 198 inputBuffer.data(), 199 std::bind_front(&ConnectionImpl::doWriteCallback, this, 200 shared_from_this())); 201 } 202 203 void doWriteCallback(const std::shared_ptr<Connection>& /*self*/, 204 const boost::beast::error_code& ec, 205 const size_t bytesTransferred) 206 { 207 doingWrite = false; 208 inputBuffer.consume(bytesTransferred); 209 210 if (ec == boost::asio::error::eof) 211 { 212 BMCWEB_LOG_ERROR << "async_write_some() SSE stream closed"; 213 close("SSE stream closed"); 214 return; 215 } 216 217 if (ec) 218 { 219 BMCWEB_LOG_ERROR << "async_write_some() failed: " << ec.message(); 220 close("async_write_some failed"); 221 return; 222 } 223 BMCWEB_LOG_DEBUG << "async_write_some() bytes transferred: " 224 << bytesTransferred; 225 226 doWrite(); 227 } 228 229 void completeRequest(crow::Response& thisRes) override 230 { 231 auto asyncResp = std::make_shared<bmcweb::AsyncResp>(); 232 asyncResp->res = std::move(thisRes); 233 234 if (asyncResp->res.body().empty() && !asyncResp->res.jsonValue.empty()) 235 { 236 asyncResp->res.addHeader(boost::beast::http::field::content_type, 237 "application/json"); 238 asyncResp->res.body() = asyncResp->res.jsonValue.dump( 239 2, ' ', true, nlohmann::json::error_handler_t::replace); 240 } 241 242 asyncResp->res.preparePayload(); 243 244 serializer.emplace(*asyncResp->res.stringResponse); 245 246 boost::beast::http::async_write_some( 247 adaptor, *serializer, 248 std::bind_front(&ConnectionImpl::completeRequestCallback, this, 249 shared_from_this())); 250 } 251 252 void completeRequestCallback(const std::shared_ptr<Connection>& /*self*/, 253 const boost::system::error_code& ec, 254 std::size_t bytesTransferred) 255 { 256 auto asyncResp = std::make_shared<bmcweb::AsyncResp>(); 257 BMCWEB_LOG_DEBUG << this << " async_write " << bytesTransferred 258 << " bytes"; 259 if (ec) 260 { 261 BMCWEB_LOG_DEBUG << this << " from async_write failed"; 262 return; 263 } 264 265 BMCWEB_LOG_DEBUG << this << " Closing SSE connection - Request invalid"; 266 serializer.reset(); 267 close("Request invalid"); 268 asyncResp->res.releaseCompleteRequestHandler(); 269 } 270 271 void sendEvent(std::string_view id, std::string_view msg) override 272 { 273 if (msg.empty()) 274 { 275 BMCWEB_LOG_DEBUG << "Empty data, bailing out."; 276 return; 277 } 278 279 dataFormat(id); 280 281 doWrite(); 282 } 283 284 void dataFormat(std::string_view id) 285 { 286 std::string_view msg; 287 std::string rawData; 288 if (!id.empty()) 289 { 290 rawData += "id: "; 291 rawData.append(id.begin(), id.end()); 292 rawData += "\n"; 293 } 294 295 rawData += "data: "; 296 for (char character : msg) 297 { 298 rawData += character; 299 if (character == '\n') 300 { 301 rawData += "data: "; 302 } 303 } 304 rawData += "\n\n"; 305 306 boost::asio::buffer_copy(inputBuffer.prepare(rawData.size()), 307 boost::asio::buffer(rawData)); 308 inputBuffer.commit(rawData.size()); 309 } 310 311 void onTimeout() 312 { 313 boost::asio::steady_timer timer(ioc); 314 std::weak_ptr<Connection> weakSelf = weak_from_this(); 315 timer.expires_after(std::chrono::seconds(30)); 316 timer.async_wait(std::bind_front(&ConnectionImpl::onTimeoutCallback, 317 this, weak_from_this())); 318 } 319 320 void onTimeoutCallback(const std::weak_ptr<Connection>& weakSelf, 321 const boost::system::error_code ec) 322 { 323 std::shared_ptr<Connection> self = weakSelf.lock(); 324 if (!self) 325 { 326 BMCWEB_LOG_CRITICAL << self << " Failed to capture connection"; 327 return; 328 } 329 330 if (ec == boost::asio::error::operation_aborted) 331 { 332 BMCWEB_LOG_DEBUG << "operation aborted"; 333 // Canceled wait means the path succeeeded. 334 return; 335 } 336 if (ec) 337 { 338 BMCWEB_LOG_CRITICAL << self << " timer failed " << ec; 339 } 340 341 BMCWEB_LOG_WARNING << self << "Connection timed out, closing"; 342 343 self->close("closing connection"); 344 } 345 346 private: 347 Adaptor adaptor; 348 349 boost::beast::multi_buffer outputBuffer; 350 boost::beast::multi_buffer inputBuffer; 351 352 std::optional<boost::beast::http::response_serializer< 353 boost::beast::http::string_body>> 354 serializer; 355 boost::asio::io_context& ioc = 356 crow::connections::systemBus->get_io_context(); 357 bool doingWrite = false; 358 std::optional< 359 boost::beast::http::request_parser<boost::beast::http::string_body>> 360 parser; 361 362 std::function<void(std::shared_ptr<Connection>&, const crow::Request&, 363 const std::shared_ptr<bmcweb::AsyncResp>&)> 364 openHandler; 365 std::function<void(std::shared_ptr<Connection>&)> closeHandler; 366 }; 367 } // namespace sse_socket 368 } // namespace crow 369