188ada3bcSV-Sanjana #pragma once 2b2896149SEd Tanous #include "http_body.hpp" 388ada3bcSV-Sanjana #include "http_request.hpp" 488ada3bcSV-Sanjana #include "http_response.hpp" 588ada3bcSV-Sanjana 688ada3bcSV-Sanjana #include <boost/asio/buffer.hpp> 788ada3bcSV-Sanjana #include <boost/asio/steady_timer.hpp> 888ada3bcSV-Sanjana #include <boost/beast/core/multi_buffer.hpp> 988ada3bcSV-Sanjana #include <boost/beast/websocket.hpp> 1088ada3bcSV-Sanjana 1188ada3bcSV-Sanjana #include <array> 128f79c5b6SEd Tanous #include <cstddef> 1388ada3bcSV-Sanjana #include <functional> 148f79c5b6SEd Tanous #include <optional> 1588ada3bcSV-Sanjana 1688ada3bcSV-Sanjana namespace crow 1788ada3bcSV-Sanjana { 1888ada3bcSV-Sanjana 1988ada3bcSV-Sanjana namespace sse_socket 2088ada3bcSV-Sanjana { 2193cf0ac2SEd Tanous struct Connection : public std::enable_shared_from_this<Connection> 2288ada3bcSV-Sanjana { 2388ada3bcSV-Sanjana public: 246fde95faSEd Tanous Connection() = default; 2588ada3bcSV-Sanjana 2688ada3bcSV-Sanjana Connection(const Connection&) = delete; 2788ada3bcSV-Sanjana Connection(Connection&&) = delete; 2888ada3bcSV-Sanjana Connection& operator=(const Connection&) = delete; 2988ada3bcSV-Sanjana Connection& operator=(const Connection&&) = delete; 3088ada3bcSV-Sanjana virtual ~Connection() = default; 3188ada3bcSV-Sanjana 3288ada3bcSV-Sanjana virtual boost::asio::io_context& getIoContext() = 0; 3388ada3bcSV-Sanjana virtual void close(std::string_view msg = "quit") = 0; 3488ada3bcSV-Sanjana virtual void sendEvent(std::string_view id, std::string_view msg) = 0; 3588ada3bcSV-Sanjana }; 3688ada3bcSV-Sanjana 3788ada3bcSV-Sanjana template <typename Adaptor> 3888ada3bcSV-Sanjana class ConnectionImpl : public Connection 3988ada3bcSV-Sanjana { 4088ada3bcSV-Sanjana public: ConnectionImpl(Adaptor && adaptorIn,std::function<void (Connection &)> openHandlerIn,std::function<void (Connection &)> closeHandlerIn)4193cf0ac2SEd Tanous ConnectionImpl(Adaptor&& adaptorIn, 426fde95faSEd Tanous std::function<void(Connection&)> openHandlerIn, 436fde95faSEd Tanous std::function<void(Connection&)> closeHandlerIn) : 446fde95faSEd Tanous adaptor(std::move(adaptorIn)), 4593cf0ac2SEd Tanous timer(static_cast<boost::asio::io_context&>( 4693cf0ac2SEd Tanous adaptor.get_executor().context())), 4793cf0ac2SEd Tanous openHandler(std::move(openHandlerIn)), 4888ada3bcSV-Sanjana closeHandler(std::move(closeHandlerIn)) 498f79c5b6SEd Tanous 5088ada3bcSV-Sanjana { 5162598e31SEd Tanous BMCWEB_LOG_DEBUG("SseConnectionImpl: SSE constructor {}", logPtr(this)); 5288ada3bcSV-Sanjana } 5388ada3bcSV-Sanjana 5488ada3bcSV-Sanjana ConnectionImpl(const ConnectionImpl&) = delete; 5588ada3bcSV-Sanjana ConnectionImpl(const ConnectionImpl&&) = delete; 5688ada3bcSV-Sanjana ConnectionImpl& operator=(const ConnectionImpl&) = delete; 5788ada3bcSV-Sanjana ConnectionImpl& operator=(const ConnectionImpl&&) = delete; 5888ada3bcSV-Sanjana ~ConnectionImpl()5988ada3bcSV-Sanjana ~ConnectionImpl() override 6088ada3bcSV-Sanjana { 6162598e31SEd Tanous BMCWEB_LOG_DEBUG("SSE ConnectionImpl: SSE destructor {}", logPtr(this)); 6288ada3bcSV-Sanjana } 6388ada3bcSV-Sanjana getIoContext()6488ada3bcSV-Sanjana boost::asio::io_context& getIoContext() override 6588ada3bcSV-Sanjana { 6688ada3bcSV-Sanjana return static_cast<boost::asio::io_context&>( 6788ada3bcSV-Sanjana adaptor.get_executor().context()); 6888ada3bcSV-Sanjana } 6988ada3bcSV-Sanjana start()7088ada3bcSV-Sanjana void start() 7188ada3bcSV-Sanjana { 726fde95faSEd Tanous if (!openHandler) 7388ada3bcSV-Sanjana { 7462598e31SEd Tanous BMCWEB_LOG_CRITICAL("No open handler???"); 756fde95faSEd Tanous return; 7688ada3bcSV-Sanjana } 776fde95faSEd Tanous openHandler(*this); 788f79c5b6SEd Tanous sendSSEHeader(); 7988ada3bcSV-Sanjana } 8088ada3bcSV-Sanjana close(const std::string_view msg)8188ada3bcSV-Sanjana void close(const std::string_view msg) override 8288ada3bcSV-Sanjana { 838f79c5b6SEd Tanous BMCWEB_LOG_DEBUG("Closing connection with reason {}", msg); 8488ada3bcSV-Sanjana // send notification to handler for cleanup 8588ada3bcSV-Sanjana if (closeHandler) 8688ada3bcSV-Sanjana { 876fde95faSEd Tanous closeHandler(*this); 8888ada3bcSV-Sanjana } 8962598e31SEd Tanous BMCWEB_LOG_DEBUG("Closing SSE connection {} - {}", logPtr(this), msg); 906fde95faSEd Tanous boost::beast::get_lowest_layer(adaptor).close(); 9188ada3bcSV-Sanjana } 9288ada3bcSV-Sanjana sendSSEHeader()936fde95faSEd Tanous void sendSSEHeader() 9488ada3bcSV-Sanjana { 9562598e31SEd Tanous BMCWEB_LOG_DEBUG("Starting SSE connection"); 968f79c5b6SEd Tanous 976fde95faSEd Tanous res.set(boost::beast::http::field::content_type, "text/event-stream"); 988ece0e45SEd Tanous boost::beast::http::response_serializer<BodyType>& serial = 998f79c5b6SEd Tanous serializer.emplace(res); 10088ada3bcSV-Sanjana 10188ada3bcSV-Sanjana boost::beast::http::async_write_header( 1028ece0e45SEd Tanous adaptor, serial, 10388ada3bcSV-Sanjana std::bind_front(&ConnectionImpl::sendSSEHeaderCallback, this, 10488ada3bcSV-Sanjana shared_from_this())); 10588ada3bcSV-Sanjana } 10688ada3bcSV-Sanjana sendSSEHeaderCallback(const std::shared_ptr<Connection> &,const boost::system::error_code & ec,size_t)10788ada3bcSV-Sanjana void sendSSEHeaderCallback(const std::shared_ptr<Connection>& /*self*/, 1086fde95faSEd Tanous const boost::system::error_code& ec, 1096fde95faSEd Tanous size_t /*bytesSent*/) 11088ada3bcSV-Sanjana { 1116fde95faSEd Tanous serializer.reset(); 11288ada3bcSV-Sanjana if (ec) 11388ada3bcSV-Sanjana { 11462598e31SEd Tanous BMCWEB_LOG_ERROR("Error sending header{}", ec); 11588ada3bcSV-Sanjana close("async_write_header failed"); 11688ada3bcSV-Sanjana return; 11788ada3bcSV-Sanjana } 11862598e31SEd Tanous BMCWEB_LOG_DEBUG("SSE header sent - Connection established"); 11988ada3bcSV-Sanjana 12088ada3bcSV-Sanjana // SSE stream header sent, So let us setup monitor. 12188ada3bcSV-Sanjana // Any read data on this stream will be error in case of SSE. 1228f79c5b6SEd Tanous adaptor.async_read_some(boost::asio::buffer(buffer), 1236fde95faSEd Tanous std::bind_front(&ConnectionImpl::afterReadError, 1246fde95faSEd Tanous this, shared_from_this())); 12588ada3bcSV-Sanjana } 12688ada3bcSV-Sanjana afterReadError(const std::shared_ptr<Connection> &,const boost::system::error_code & ec,size_t bytesRead)1276fde95faSEd Tanous void afterReadError(const std::shared_ptr<Connection>& /*self*/, 1288f79c5b6SEd Tanous const boost::system::error_code& ec, size_t bytesRead) 12988ada3bcSV-Sanjana { 1308f79c5b6SEd Tanous BMCWEB_LOG_DEBUG("Read {}", bytesRead); 1316fde95faSEd Tanous if (ec == boost::asio::error::operation_aborted) 1326fde95faSEd Tanous { 1336fde95faSEd Tanous return; 13488ada3bcSV-Sanjana } 13588ada3bcSV-Sanjana if (ec) 13688ada3bcSV-Sanjana { 13762598e31SEd Tanous BMCWEB_LOG_ERROR("Read error: {}", ec); 13888ada3bcSV-Sanjana } 13988ada3bcSV-Sanjana 1406fde95faSEd Tanous close("Close SSE connection"); 14188ada3bcSV-Sanjana } 14288ada3bcSV-Sanjana doWrite()14388ada3bcSV-Sanjana void doWrite() 14488ada3bcSV-Sanjana { 14588ada3bcSV-Sanjana if (doingWrite) 14688ada3bcSV-Sanjana { 14788ada3bcSV-Sanjana return; 14888ada3bcSV-Sanjana } 14988ada3bcSV-Sanjana if (inputBuffer.size() == 0) 15088ada3bcSV-Sanjana { 15162598e31SEd Tanous BMCWEB_LOG_DEBUG("inputBuffer is empty... Bailing out"); 15288ada3bcSV-Sanjana return; 15388ada3bcSV-Sanjana } 1546fde95faSEd Tanous startTimeout(); 15588ada3bcSV-Sanjana doingWrite = true; 15688ada3bcSV-Sanjana 15788ada3bcSV-Sanjana adaptor.async_write_some( 15888ada3bcSV-Sanjana inputBuffer.data(), 15988ada3bcSV-Sanjana std::bind_front(&ConnectionImpl::doWriteCallback, this, 1608f79c5b6SEd Tanous shared_from_this())); 16188ada3bcSV-Sanjana } 16288ada3bcSV-Sanjana doWriteCallback(const std::shared_ptr<Connection> &,const boost::beast::error_code & ec,size_t bytesTransferred)1638f79c5b6SEd Tanous void doWriteCallback(const std::shared_ptr<Connection>& /*self*/, 16488ada3bcSV-Sanjana const boost::beast::error_code& ec, 1656fde95faSEd Tanous size_t bytesTransferred) 16688ada3bcSV-Sanjana { 1676fde95faSEd Tanous timer.cancel(); 16888ada3bcSV-Sanjana doingWrite = false; 16988ada3bcSV-Sanjana inputBuffer.consume(bytesTransferred); 17088ada3bcSV-Sanjana 17188ada3bcSV-Sanjana if (ec == boost::asio::error::eof) 17288ada3bcSV-Sanjana { 17362598e31SEd Tanous BMCWEB_LOG_ERROR("async_write_some() SSE stream closed"); 17488ada3bcSV-Sanjana close("SSE stream closed"); 17588ada3bcSV-Sanjana return; 17688ada3bcSV-Sanjana } 17788ada3bcSV-Sanjana 17888ada3bcSV-Sanjana if (ec) 17988ada3bcSV-Sanjana { 18062598e31SEd Tanous BMCWEB_LOG_ERROR("async_write_some() failed: {}", ec.message()); 18188ada3bcSV-Sanjana close("async_write_some failed"); 18288ada3bcSV-Sanjana return; 18388ada3bcSV-Sanjana } 18462598e31SEd Tanous BMCWEB_LOG_DEBUG("async_write_some() bytes transferred: {}", 18562598e31SEd Tanous bytesTransferred); 18688ada3bcSV-Sanjana 18788ada3bcSV-Sanjana doWrite(); 18888ada3bcSV-Sanjana } 18988ada3bcSV-Sanjana sendEvent(std::string_view id,std::string_view msg)19088ada3bcSV-Sanjana void sendEvent(std::string_view id, std::string_view msg) override 19188ada3bcSV-Sanjana { 19288ada3bcSV-Sanjana if (msg.empty()) 19388ada3bcSV-Sanjana { 19462598e31SEd Tanous BMCWEB_LOG_DEBUG("Empty data, bailing out."); 19588ada3bcSV-Sanjana return; 19688ada3bcSV-Sanjana } 19788ada3bcSV-Sanjana 1986fde95faSEd Tanous dataFormat(id, msg); 19988ada3bcSV-Sanjana 20088ada3bcSV-Sanjana doWrite(); 20188ada3bcSV-Sanjana } 20288ada3bcSV-Sanjana dataFormat(std::string_view id,std::string_view msg)2036fde95faSEd Tanous void dataFormat(std::string_view id, std::string_view msg) 20488ada3bcSV-Sanjana { 2058f79c5b6SEd Tanous constexpr size_t bufferLimit = 10485760U; // 10MB 2068f79c5b6SEd Tanous if (id.size() + msg.size() + inputBuffer.size() >= bufferLimit) 2078f79c5b6SEd Tanous { 2088f79c5b6SEd Tanous BMCWEB_LOG_ERROR("SSE Buffer overflow while waiting for client"); 2098f79c5b6SEd Tanous close("Buffer overflow"); 2108f79c5b6SEd Tanous return; 2118f79c5b6SEd Tanous } 21288ada3bcSV-Sanjana std::string rawData; 21388ada3bcSV-Sanjana if (!id.empty()) 21488ada3bcSV-Sanjana { 21588ada3bcSV-Sanjana rawData += "id: "; 2166fde95faSEd Tanous rawData.append(id); 21788ada3bcSV-Sanjana rawData += "\n"; 21888ada3bcSV-Sanjana } 21988ada3bcSV-Sanjana 22088ada3bcSV-Sanjana rawData += "data: "; 22188ada3bcSV-Sanjana for (char character : msg) 22288ada3bcSV-Sanjana { 22388ada3bcSV-Sanjana rawData += character; 22488ada3bcSV-Sanjana if (character == '\n') 22588ada3bcSV-Sanjana { 22688ada3bcSV-Sanjana rawData += "data: "; 22788ada3bcSV-Sanjana } 22888ada3bcSV-Sanjana } 22988ada3bcSV-Sanjana rawData += "\n\n"; 23088ada3bcSV-Sanjana 231*44106f34SEd Tanous size_t copied = boost::asio::buffer_copy( 232*44106f34SEd Tanous inputBuffer.prepare(rawData.size()), boost::asio::buffer(rawData)); 233*44106f34SEd Tanous inputBuffer.commit(copied); 23488ada3bcSV-Sanjana } 23588ada3bcSV-Sanjana startTimeout()2366fde95faSEd Tanous void startTimeout() 23788ada3bcSV-Sanjana { 23888ada3bcSV-Sanjana std::weak_ptr<Connection> weakSelf = weak_from_this(); 23988ada3bcSV-Sanjana timer.expires_after(std::chrono::seconds(30)); 24088ada3bcSV-Sanjana timer.async_wait(std::bind_front(&ConnectionImpl::onTimeoutCallback, 24188ada3bcSV-Sanjana this, weak_from_this())); 24288ada3bcSV-Sanjana } 24388ada3bcSV-Sanjana onTimeoutCallback(const std::weak_ptr<Connection> & weakSelf,const boost::system::error_code & ec)24488ada3bcSV-Sanjana void onTimeoutCallback(const std::weak_ptr<Connection>& weakSelf, 2456fde95faSEd Tanous const boost::system::error_code& ec) 24688ada3bcSV-Sanjana { 24788ada3bcSV-Sanjana std::shared_ptr<Connection> self = weakSelf.lock(); 24888ada3bcSV-Sanjana if (!self) 24988ada3bcSV-Sanjana { 25062598e31SEd Tanous BMCWEB_LOG_CRITICAL("{} Failed to capture connection", 25162598e31SEd Tanous logPtr(self.get())); 25288ada3bcSV-Sanjana return; 25388ada3bcSV-Sanjana } 25488ada3bcSV-Sanjana 25588ada3bcSV-Sanjana if (ec == boost::asio::error::operation_aborted) 25688ada3bcSV-Sanjana { 2578f79c5b6SEd Tanous BMCWEB_LOG_DEBUG("Timer operation aborted"); 2588ece0e45SEd Tanous // Canceled wait means the path succeeded. 25988ada3bcSV-Sanjana return; 26088ada3bcSV-Sanjana } 26188ada3bcSV-Sanjana if (ec) 26288ada3bcSV-Sanjana { 26362598e31SEd Tanous BMCWEB_LOG_CRITICAL("{} timer failed {}", logPtr(self.get()), ec); 26488ada3bcSV-Sanjana } 26588ada3bcSV-Sanjana 26662598e31SEd Tanous BMCWEB_LOG_WARNING("{} Connection timed out, closing", 26762598e31SEd Tanous logPtr(self.get())); 26888ada3bcSV-Sanjana 26988ada3bcSV-Sanjana self->close("closing connection"); 27088ada3bcSV-Sanjana } 27188ada3bcSV-Sanjana 27288ada3bcSV-Sanjana private: 2738f79c5b6SEd Tanous std::array<char, 1> buffer{}; 27488ada3bcSV-Sanjana boost::beast::multi_buffer inputBuffer; 27588ada3bcSV-Sanjana 2768f79c5b6SEd Tanous Adaptor adaptor; 2778f79c5b6SEd Tanous 278b2896149SEd Tanous using BodyType = bmcweb::HttpBody; 2798f79c5b6SEd Tanous boost::beast::http::response<BodyType> res; 2808f79c5b6SEd Tanous std::optional<boost::beast::http::response_serializer<BodyType>> serializer; 2816fde95faSEd Tanous boost::asio::steady_timer timer; 28288ada3bcSV-Sanjana bool doingWrite = false; 28388ada3bcSV-Sanjana 2846fde95faSEd Tanous std::function<void(Connection&)> openHandler; 2856fde95faSEd Tanous std::function<void(Connection&)> closeHandler; 28688ada3bcSV-Sanjana }; 28788ada3bcSV-Sanjana } // namespace sse_socket 28888ada3bcSV-Sanjana } // namespace crow 289