1*88ada3bcSV-Sanjana #pragma once 2*88ada3bcSV-Sanjana #include "async_resolve.hpp" 3*88ada3bcSV-Sanjana #include "async_resp.hpp" 4*88ada3bcSV-Sanjana #include "http_request.hpp" 5*88ada3bcSV-Sanjana #include "http_response.hpp" 6*88ada3bcSV-Sanjana 7*88ada3bcSV-Sanjana #include <boost/algorithm/string/predicate.hpp> 8*88ada3bcSV-Sanjana #include <boost/asio/buffer.hpp> 9*88ada3bcSV-Sanjana #include <boost/asio/steady_timer.hpp> 10*88ada3bcSV-Sanjana #include <boost/beast/core/multi_buffer.hpp> 11*88ada3bcSV-Sanjana #include <boost/beast/http/buffer_body.hpp> 12*88ada3bcSV-Sanjana #include <boost/beast/websocket.hpp> 13*88ada3bcSV-Sanjana 14*88ada3bcSV-Sanjana #include <array> 15*88ada3bcSV-Sanjana #include <functional> 16*88ada3bcSV-Sanjana 17*88ada3bcSV-Sanjana #ifdef BMCWEB_ENABLE_SSL 18*88ada3bcSV-Sanjana #include <boost/beast/websocket/ssl.hpp> 19*88ada3bcSV-Sanjana #endif 20*88ada3bcSV-Sanjana 21*88ada3bcSV-Sanjana namespace crow 22*88ada3bcSV-Sanjana { 23*88ada3bcSV-Sanjana 24*88ada3bcSV-Sanjana namespace sse_socket 25*88ada3bcSV-Sanjana { 26*88ada3bcSV-Sanjana static constexpr const std::array<const char*, 1> sseRoutes = { 27*88ada3bcSV-Sanjana "/redfish/v1/EventService/SSE"}; 28*88ada3bcSV-Sanjana 29*88ada3bcSV-Sanjana struct Connection : std::enable_shared_from_this<Connection> 30*88ada3bcSV-Sanjana { 31*88ada3bcSV-Sanjana public: 32*88ada3bcSV-Sanjana explicit Connection(const crow::Request& reqIn) : req(reqIn) {} 33*88ada3bcSV-Sanjana 34*88ada3bcSV-Sanjana Connection(const Connection&) = delete; 35*88ada3bcSV-Sanjana Connection(Connection&&) = delete; 36*88ada3bcSV-Sanjana Connection& operator=(const Connection&) = delete; 37*88ada3bcSV-Sanjana Connection& operator=(const Connection&&) = delete; 38*88ada3bcSV-Sanjana virtual ~Connection() = default; 39*88ada3bcSV-Sanjana 40*88ada3bcSV-Sanjana virtual boost::asio::io_context& getIoContext() = 0; 41*88ada3bcSV-Sanjana virtual void sendSSEHeader() = 0; 42*88ada3bcSV-Sanjana virtual void completeRequest(crow::Response& thisRes) = 0; 43*88ada3bcSV-Sanjana virtual void close(std::string_view msg = "quit") = 0; 44*88ada3bcSV-Sanjana virtual void sendEvent(std::string_view id, std::string_view msg) = 0; 45*88ada3bcSV-Sanjana 46*88ada3bcSV-Sanjana crow::Request req; 47*88ada3bcSV-Sanjana }; 48*88ada3bcSV-Sanjana 49*88ada3bcSV-Sanjana template <typename Adaptor> 50*88ada3bcSV-Sanjana class ConnectionImpl : public Connection 51*88ada3bcSV-Sanjana { 52*88ada3bcSV-Sanjana public: 53*88ada3bcSV-Sanjana ConnectionImpl( 54*88ada3bcSV-Sanjana const crow::Request& reqIn, Adaptor adaptorIn, 55*88ada3bcSV-Sanjana std::function<void(std::shared_ptr<Connection>&, const crow::Request&, 56*88ada3bcSV-Sanjana const std::shared_ptr<bmcweb::AsyncResp>&)> 57*88ada3bcSV-Sanjana openHandlerIn, 58*88ada3bcSV-Sanjana std::function<void(std::shared_ptr<Connection>&)> closeHandlerIn) : 59*88ada3bcSV-Sanjana Connection(reqIn), 60*88ada3bcSV-Sanjana adaptor(std::move(adaptorIn)), openHandler(std::move(openHandlerIn)), 61*88ada3bcSV-Sanjana closeHandler(std::move(closeHandlerIn)) 62*88ada3bcSV-Sanjana { 63*88ada3bcSV-Sanjana BMCWEB_LOG_DEBUG << "SseConnectionImpl: SSE constructor " << this; 64*88ada3bcSV-Sanjana } 65*88ada3bcSV-Sanjana 66*88ada3bcSV-Sanjana ConnectionImpl(const ConnectionImpl&) = delete; 67*88ada3bcSV-Sanjana ConnectionImpl(const ConnectionImpl&&) = delete; 68*88ada3bcSV-Sanjana ConnectionImpl& operator=(const ConnectionImpl&) = delete; 69*88ada3bcSV-Sanjana ConnectionImpl& operator=(const ConnectionImpl&&) = delete; 70*88ada3bcSV-Sanjana 71*88ada3bcSV-Sanjana ~ConnectionImpl() override 72*88ada3bcSV-Sanjana { 73*88ada3bcSV-Sanjana BMCWEB_LOG_DEBUG << "SSE ConnectionImpl: SSE destructor " << this; 74*88ada3bcSV-Sanjana } 75*88ada3bcSV-Sanjana 76*88ada3bcSV-Sanjana boost::asio::io_context& getIoContext() override 77*88ada3bcSV-Sanjana { 78*88ada3bcSV-Sanjana return static_cast<boost::asio::io_context&>( 79*88ada3bcSV-Sanjana adaptor.get_executor().context()); 80*88ada3bcSV-Sanjana } 81*88ada3bcSV-Sanjana 82*88ada3bcSV-Sanjana void start() 83*88ada3bcSV-Sanjana { 84*88ada3bcSV-Sanjana if (openHandler) 85*88ada3bcSV-Sanjana { 86*88ada3bcSV-Sanjana auto asyncResp = std::make_shared<bmcweb::AsyncResp>(); 87*88ada3bcSV-Sanjana std::shared_ptr<Connection> self = this->shared_from_this(); 88*88ada3bcSV-Sanjana 89*88ada3bcSV-Sanjana asyncResp->res.setCompleteRequestHandler( 90*88ada3bcSV-Sanjana [self(shared_from_this())](crow::Response& thisRes) { 91*88ada3bcSV-Sanjana if (thisRes.resultInt() != 200) 92*88ada3bcSV-Sanjana { 93*88ada3bcSV-Sanjana self->completeRequest(thisRes); 94*88ada3bcSV-Sanjana } 95*88ada3bcSV-Sanjana }); 96*88ada3bcSV-Sanjana 97*88ada3bcSV-Sanjana openHandler(self, req, asyncResp); 98*88ada3bcSV-Sanjana } 99*88ada3bcSV-Sanjana } 100*88ada3bcSV-Sanjana 101*88ada3bcSV-Sanjana void close(const std::string_view msg) override 102*88ada3bcSV-Sanjana { 103*88ada3bcSV-Sanjana BMCWEB_LOG_DEBUG << "Closing SSE connection " << this << " - " << msg; 104*88ada3bcSV-Sanjana boost::beast::get_lowest_layer(adaptor).close(); 105*88ada3bcSV-Sanjana 106*88ada3bcSV-Sanjana // send notification to handler for cleanup 107*88ada3bcSV-Sanjana if (closeHandler) 108*88ada3bcSV-Sanjana { 109*88ada3bcSV-Sanjana std::shared_ptr<Connection> self = shared_from_this(); 110*88ada3bcSV-Sanjana closeHandler(self); 111*88ada3bcSV-Sanjana } 112*88ada3bcSV-Sanjana } 113*88ada3bcSV-Sanjana 114*88ada3bcSV-Sanjana void sendSSEHeader() override 115*88ada3bcSV-Sanjana { 116*88ada3bcSV-Sanjana BMCWEB_LOG_DEBUG << "Starting SSE connection"; 117*88ada3bcSV-Sanjana auto asyncResp = std::make_shared<bmcweb::AsyncResp>(); 118*88ada3bcSV-Sanjana using BodyType = boost::beast::http::buffer_body; 119*88ada3bcSV-Sanjana auto response = 120*88ada3bcSV-Sanjana std::make_shared<boost::beast::http::response<BodyType>>( 121*88ada3bcSV-Sanjana boost::beast::http::status::ok, 11); 122*88ada3bcSV-Sanjana 123*88ada3bcSV-Sanjana serializer.emplace(*asyncResp->res.stringResponse); 124*88ada3bcSV-Sanjana 125*88ada3bcSV-Sanjana response->set(boost::beast::http::field::content_type, 126*88ada3bcSV-Sanjana "text/event-stream"); 127*88ada3bcSV-Sanjana response->body().more = true; 128*88ada3bcSV-Sanjana 129*88ada3bcSV-Sanjana boost::beast::http::async_write_header( 130*88ada3bcSV-Sanjana adaptor, *serializer, 131*88ada3bcSV-Sanjana std::bind_front(&ConnectionImpl::sendSSEHeaderCallback, this, 132*88ada3bcSV-Sanjana shared_from_this())); 133*88ada3bcSV-Sanjana } 134*88ada3bcSV-Sanjana 135*88ada3bcSV-Sanjana void sendSSEHeaderCallback(const std::shared_ptr<Connection>& /*self*/, 136*88ada3bcSV-Sanjana const boost::beast::error_code& ec, 137*88ada3bcSV-Sanjana const std::size_t& /*unused*/) 138*88ada3bcSV-Sanjana { 139*88ada3bcSV-Sanjana if (ec) 140*88ada3bcSV-Sanjana { 141*88ada3bcSV-Sanjana BMCWEB_LOG_ERROR << "Error sending header" << ec; 142*88ada3bcSV-Sanjana close("async_write_header failed"); 143*88ada3bcSV-Sanjana return; 144*88ada3bcSV-Sanjana } 145*88ada3bcSV-Sanjana BMCWEB_LOG_DEBUG << "SSE header sent - Connection established"; 146*88ada3bcSV-Sanjana 147*88ada3bcSV-Sanjana serializer.reset(); 148*88ada3bcSV-Sanjana 149*88ada3bcSV-Sanjana // SSE stream header sent, So let us setup monitor. 150*88ada3bcSV-Sanjana // Any read data on this stream will be error in case of SSE. 151*88ada3bcSV-Sanjana setupRead(); 152*88ada3bcSV-Sanjana } 153*88ada3bcSV-Sanjana 154*88ada3bcSV-Sanjana void setupRead() 155*88ada3bcSV-Sanjana { 156*88ada3bcSV-Sanjana std::weak_ptr<Connection> weakSelf = weak_from_this(); 157*88ada3bcSV-Sanjana 158*88ada3bcSV-Sanjana boost::beast::http::async_read_some( 159*88ada3bcSV-Sanjana adaptor, outputBuffer, *parser, 160*88ada3bcSV-Sanjana std::bind_front(&ConnectionImpl::setupReadCallback, this, 161*88ada3bcSV-Sanjana weak_from_this())); 162*88ada3bcSV-Sanjana } 163*88ada3bcSV-Sanjana 164*88ada3bcSV-Sanjana void setupReadCallback(const std::weak_ptr<Connection>& weakSelf, 165*88ada3bcSV-Sanjana const boost::system::error_code& ec, 166*88ada3bcSV-Sanjana size_t bytesRead) 167*88ada3bcSV-Sanjana { 168*88ada3bcSV-Sanjana std::shared_ptr<Connection> self = weakSelf.lock(); 169*88ada3bcSV-Sanjana BMCWEB_LOG_DEBUG << "async_read_some: Read " << bytesRead << " bytes"; 170*88ada3bcSV-Sanjana if (ec) 171*88ada3bcSV-Sanjana { 172*88ada3bcSV-Sanjana BMCWEB_LOG_ERROR << "Read error: " << ec; 173*88ada3bcSV-Sanjana } 174*88ada3bcSV-Sanjana 175*88ada3bcSV-Sanjana // After establishing SSE stream, Reading data on this 176*88ada3bcSV-Sanjana // stream means client is disobeys the SSE protocol. 177*88ada3bcSV-Sanjana // Read the data to avoid buffer attacks and close connection. 178*88ada3bcSV-Sanjana 179*88ada3bcSV-Sanjana self->close("Close SSE connection"); 180*88ada3bcSV-Sanjana } 181*88ada3bcSV-Sanjana 182*88ada3bcSV-Sanjana void doWrite() 183*88ada3bcSV-Sanjana { 184*88ada3bcSV-Sanjana onTimeout(); 185*88ada3bcSV-Sanjana 186*88ada3bcSV-Sanjana if (doingWrite) 187*88ada3bcSV-Sanjana { 188*88ada3bcSV-Sanjana return; 189*88ada3bcSV-Sanjana } 190*88ada3bcSV-Sanjana if (inputBuffer.size() == 0) 191*88ada3bcSV-Sanjana { 192*88ada3bcSV-Sanjana BMCWEB_LOG_DEBUG << "inputBuffer is empty... Bailing out"; 193*88ada3bcSV-Sanjana return; 194*88ada3bcSV-Sanjana } 195*88ada3bcSV-Sanjana doingWrite = true; 196*88ada3bcSV-Sanjana 197*88ada3bcSV-Sanjana adaptor.async_write_some( 198*88ada3bcSV-Sanjana inputBuffer.data(), 199*88ada3bcSV-Sanjana std::bind_front(&ConnectionImpl::doWriteCallback, this, 200*88ada3bcSV-Sanjana shared_from_this())); 201*88ada3bcSV-Sanjana } 202*88ada3bcSV-Sanjana 203*88ada3bcSV-Sanjana void doWriteCallback(const std::shared_ptr<Connection>& /*self*/, 204*88ada3bcSV-Sanjana const boost::beast::error_code& ec, 205*88ada3bcSV-Sanjana const size_t bytesTransferred) 206*88ada3bcSV-Sanjana { 207*88ada3bcSV-Sanjana doingWrite = false; 208*88ada3bcSV-Sanjana inputBuffer.consume(bytesTransferred); 209*88ada3bcSV-Sanjana 210*88ada3bcSV-Sanjana if (ec == boost::asio::error::eof) 211*88ada3bcSV-Sanjana { 212*88ada3bcSV-Sanjana BMCWEB_LOG_ERROR << "async_write_some() SSE stream closed"; 213*88ada3bcSV-Sanjana close("SSE stream closed"); 214*88ada3bcSV-Sanjana return; 215*88ada3bcSV-Sanjana } 216*88ada3bcSV-Sanjana 217*88ada3bcSV-Sanjana if (ec) 218*88ada3bcSV-Sanjana { 219*88ada3bcSV-Sanjana BMCWEB_LOG_ERROR << "async_write_some() failed: " << ec.message(); 220*88ada3bcSV-Sanjana close("async_write_some failed"); 221*88ada3bcSV-Sanjana return; 222*88ada3bcSV-Sanjana } 223*88ada3bcSV-Sanjana BMCWEB_LOG_DEBUG << "async_write_some() bytes transferred: " 224*88ada3bcSV-Sanjana << bytesTransferred; 225*88ada3bcSV-Sanjana 226*88ada3bcSV-Sanjana doWrite(); 227*88ada3bcSV-Sanjana } 228*88ada3bcSV-Sanjana 229*88ada3bcSV-Sanjana void completeRequest(crow::Response& thisRes) override 230*88ada3bcSV-Sanjana { 231*88ada3bcSV-Sanjana auto asyncResp = std::make_shared<bmcweb::AsyncResp>(); 232*88ada3bcSV-Sanjana asyncResp->res = std::move(thisRes); 233*88ada3bcSV-Sanjana 234*88ada3bcSV-Sanjana if (asyncResp->res.body().empty() && !asyncResp->res.jsonValue.empty()) 235*88ada3bcSV-Sanjana { 236*88ada3bcSV-Sanjana asyncResp->res.addHeader(boost::beast::http::field::content_type, 237*88ada3bcSV-Sanjana "application/json"); 238*88ada3bcSV-Sanjana asyncResp->res.body() = asyncResp->res.jsonValue.dump( 239*88ada3bcSV-Sanjana 2, ' ', true, nlohmann::json::error_handler_t::replace); 240*88ada3bcSV-Sanjana } 241*88ada3bcSV-Sanjana 242*88ada3bcSV-Sanjana asyncResp->res.preparePayload(); 243*88ada3bcSV-Sanjana 244*88ada3bcSV-Sanjana serializer.emplace(*asyncResp->res.stringResponse); 245*88ada3bcSV-Sanjana 246*88ada3bcSV-Sanjana boost::beast::http::async_write_some( 247*88ada3bcSV-Sanjana adaptor, *serializer, 248*88ada3bcSV-Sanjana std::bind_front(&ConnectionImpl::completeRequestCallback, this, 249*88ada3bcSV-Sanjana shared_from_this())); 250*88ada3bcSV-Sanjana } 251*88ada3bcSV-Sanjana 252*88ada3bcSV-Sanjana void completeRequestCallback(const std::shared_ptr<Connection>& /*self*/, 253*88ada3bcSV-Sanjana const boost::system::error_code& ec, 254*88ada3bcSV-Sanjana std::size_t bytesTransferred) 255*88ada3bcSV-Sanjana { 256*88ada3bcSV-Sanjana auto asyncResp = std::make_shared<bmcweb::AsyncResp>(); 257*88ada3bcSV-Sanjana BMCWEB_LOG_DEBUG << this << " async_write " << bytesTransferred 258*88ada3bcSV-Sanjana << " bytes"; 259*88ada3bcSV-Sanjana if (ec) 260*88ada3bcSV-Sanjana { 261*88ada3bcSV-Sanjana BMCWEB_LOG_DEBUG << this << " from async_write failed"; 262*88ada3bcSV-Sanjana return; 263*88ada3bcSV-Sanjana } 264*88ada3bcSV-Sanjana 265*88ada3bcSV-Sanjana BMCWEB_LOG_DEBUG << this << " Closing SSE connection - Request invalid"; 266*88ada3bcSV-Sanjana serializer.reset(); 267*88ada3bcSV-Sanjana close("Request invalid"); 268*88ada3bcSV-Sanjana asyncResp->res.releaseCompleteRequestHandler(); 269*88ada3bcSV-Sanjana } 270*88ada3bcSV-Sanjana 271*88ada3bcSV-Sanjana void sendEvent(std::string_view id, std::string_view msg) override 272*88ada3bcSV-Sanjana { 273*88ada3bcSV-Sanjana if (msg.empty()) 274*88ada3bcSV-Sanjana { 275*88ada3bcSV-Sanjana BMCWEB_LOG_DEBUG << "Empty data, bailing out."; 276*88ada3bcSV-Sanjana return; 277*88ada3bcSV-Sanjana } 278*88ada3bcSV-Sanjana 279*88ada3bcSV-Sanjana dataFormat(id); 280*88ada3bcSV-Sanjana 281*88ada3bcSV-Sanjana doWrite(); 282*88ada3bcSV-Sanjana } 283*88ada3bcSV-Sanjana 284*88ada3bcSV-Sanjana void dataFormat(std::string_view id) 285*88ada3bcSV-Sanjana { 286*88ada3bcSV-Sanjana std::string_view msg; 287*88ada3bcSV-Sanjana std::string rawData; 288*88ada3bcSV-Sanjana if (!id.empty()) 289*88ada3bcSV-Sanjana { 290*88ada3bcSV-Sanjana rawData += "id: "; 291*88ada3bcSV-Sanjana rawData.append(id.begin(), id.end()); 292*88ada3bcSV-Sanjana rawData += "\n"; 293*88ada3bcSV-Sanjana } 294*88ada3bcSV-Sanjana 295*88ada3bcSV-Sanjana rawData += "data: "; 296*88ada3bcSV-Sanjana for (char character : msg) 297*88ada3bcSV-Sanjana { 298*88ada3bcSV-Sanjana rawData += character; 299*88ada3bcSV-Sanjana if (character == '\n') 300*88ada3bcSV-Sanjana { 301*88ada3bcSV-Sanjana rawData += "data: "; 302*88ada3bcSV-Sanjana } 303*88ada3bcSV-Sanjana } 304*88ada3bcSV-Sanjana rawData += "\n\n"; 305*88ada3bcSV-Sanjana 306*88ada3bcSV-Sanjana boost::asio::buffer_copy(inputBuffer.prepare(rawData.size()), 307*88ada3bcSV-Sanjana boost::asio::buffer(rawData)); 308*88ada3bcSV-Sanjana inputBuffer.commit(rawData.size()); 309*88ada3bcSV-Sanjana } 310*88ada3bcSV-Sanjana 311*88ada3bcSV-Sanjana void onTimeout() 312*88ada3bcSV-Sanjana { 313*88ada3bcSV-Sanjana boost::asio::steady_timer timer(ioc); 314*88ada3bcSV-Sanjana std::weak_ptr<Connection> weakSelf = weak_from_this(); 315*88ada3bcSV-Sanjana timer.expires_after(std::chrono::seconds(30)); 316*88ada3bcSV-Sanjana timer.async_wait(std::bind_front(&ConnectionImpl::onTimeoutCallback, 317*88ada3bcSV-Sanjana this, weak_from_this())); 318*88ada3bcSV-Sanjana } 319*88ada3bcSV-Sanjana 320*88ada3bcSV-Sanjana void onTimeoutCallback(const std::weak_ptr<Connection>& weakSelf, 321*88ada3bcSV-Sanjana const boost::system::error_code ec) 322*88ada3bcSV-Sanjana { 323*88ada3bcSV-Sanjana std::shared_ptr<Connection> self = weakSelf.lock(); 324*88ada3bcSV-Sanjana if (!self) 325*88ada3bcSV-Sanjana { 326*88ada3bcSV-Sanjana BMCWEB_LOG_CRITICAL << self << " Failed to capture connection"; 327*88ada3bcSV-Sanjana return; 328*88ada3bcSV-Sanjana } 329*88ada3bcSV-Sanjana 330*88ada3bcSV-Sanjana if (ec == boost::asio::error::operation_aborted) 331*88ada3bcSV-Sanjana { 332*88ada3bcSV-Sanjana BMCWEB_LOG_DEBUG << "operation aborted"; 333*88ada3bcSV-Sanjana // Canceled wait means the path succeeeded. 334*88ada3bcSV-Sanjana return; 335*88ada3bcSV-Sanjana } 336*88ada3bcSV-Sanjana if (ec) 337*88ada3bcSV-Sanjana { 338*88ada3bcSV-Sanjana BMCWEB_LOG_CRITICAL << self << " timer failed " << ec; 339*88ada3bcSV-Sanjana } 340*88ada3bcSV-Sanjana 341*88ada3bcSV-Sanjana BMCWEB_LOG_WARNING << self << "Connection timed out, closing"; 342*88ada3bcSV-Sanjana 343*88ada3bcSV-Sanjana self->close("closing connection"); 344*88ada3bcSV-Sanjana } 345*88ada3bcSV-Sanjana 346*88ada3bcSV-Sanjana private: 347*88ada3bcSV-Sanjana Adaptor adaptor; 348*88ada3bcSV-Sanjana 349*88ada3bcSV-Sanjana boost::beast::multi_buffer outputBuffer; 350*88ada3bcSV-Sanjana boost::beast::multi_buffer inputBuffer; 351*88ada3bcSV-Sanjana 352*88ada3bcSV-Sanjana std::optional<boost::beast::http::response_serializer< 353*88ada3bcSV-Sanjana boost::beast::http::string_body>> 354*88ada3bcSV-Sanjana serializer; 355*88ada3bcSV-Sanjana boost::asio::io_context& ioc = 356*88ada3bcSV-Sanjana crow::connections::systemBus->get_io_context(); 357*88ada3bcSV-Sanjana bool doingWrite = false; 358*88ada3bcSV-Sanjana std::optional< 359*88ada3bcSV-Sanjana boost::beast::http::request_parser<boost::beast::http::string_body>> 360*88ada3bcSV-Sanjana parser; 361*88ada3bcSV-Sanjana 362*88ada3bcSV-Sanjana std::function<void(std::shared_ptr<Connection>&, const crow::Request&, 363*88ada3bcSV-Sanjana const std::shared_ptr<bmcweb::AsyncResp>&)> 364*88ada3bcSV-Sanjana openHandler; 365*88ada3bcSV-Sanjana std::function<void(std::shared_ptr<Connection>&)> closeHandler; 366*88ada3bcSV-Sanjana }; 367*88ada3bcSV-Sanjana } // namespace sse_socket 368*88ada3bcSV-Sanjana } // namespace crow 369