188ada3bcSV-Sanjana #pragma once 288ada3bcSV-Sanjana #include "http_request.hpp" 388ada3bcSV-Sanjana #include "http_response.hpp" 488ada3bcSV-Sanjana 588ada3bcSV-Sanjana #include <boost/asio/buffer.hpp> 688ada3bcSV-Sanjana #include <boost/asio/steady_timer.hpp> 788ada3bcSV-Sanjana #include <boost/beast/core/multi_buffer.hpp> 888ada3bcSV-Sanjana #include <boost/beast/websocket.hpp> 988ada3bcSV-Sanjana 1088ada3bcSV-Sanjana #include <array> 11*8f79c5b6SEd Tanous #include <cstddef> 1288ada3bcSV-Sanjana #include <functional> 13*8f79c5b6SEd Tanous #include <optional> 1488ada3bcSV-Sanjana 1588ada3bcSV-Sanjana namespace crow 1688ada3bcSV-Sanjana { 1788ada3bcSV-Sanjana 1888ada3bcSV-Sanjana namespace sse_socket 1988ada3bcSV-Sanjana { 2088ada3bcSV-Sanjana struct Connection : std::enable_shared_from_this<Connection> 2188ada3bcSV-Sanjana { 2288ada3bcSV-Sanjana public: 236fde95faSEd Tanous Connection() = default; 2488ada3bcSV-Sanjana 2588ada3bcSV-Sanjana Connection(const Connection&) = delete; 2688ada3bcSV-Sanjana Connection(Connection&&) = delete; 2788ada3bcSV-Sanjana Connection& operator=(const Connection&) = delete; 2888ada3bcSV-Sanjana Connection& operator=(const Connection&&) = delete; 2988ada3bcSV-Sanjana virtual ~Connection() = default; 3088ada3bcSV-Sanjana 3188ada3bcSV-Sanjana virtual boost::asio::io_context& getIoContext() = 0; 3288ada3bcSV-Sanjana virtual void close(std::string_view msg = "quit") = 0; 3388ada3bcSV-Sanjana virtual void sendEvent(std::string_view id, std::string_view msg) = 0; 3488ada3bcSV-Sanjana }; 3588ada3bcSV-Sanjana 3688ada3bcSV-Sanjana template <typename Adaptor> 3788ada3bcSV-Sanjana class ConnectionImpl : public Connection 3888ada3bcSV-Sanjana { 3988ada3bcSV-Sanjana public: 40*8f79c5b6SEd Tanous ConnectionImpl(boost::asio::io_context& ioIn, Adaptor&& adaptorIn, 416fde95faSEd Tanous std::function<void(Connection&)> openHandlerIn, 426fde95faSEd Tanous std::function<void(Connection&)> closeHandlerIn) : 436fde95faSEd Tanous adaptor(std::move(adaptorIn)), 44*8f79c5b6SEd Tanous ioc(ioIn), timer(ioc), openHandler(std::move(openHandlerIn)), 4588ada3bcSV-Sanjana closeHandler(std::move(closeHandlerIn)) 46*8f79c5b6SEd Tanous 4788ada3bcSV-Sanjana { 4862598e31SEd Tanous BMCWEB_LOG_DEBUG("SseConnectionImpl: SSE constructor {}", logPtr(this)); 4988ada3bcSV-Sanjana } 5088ada3bcSV-Sanjana 5188ada3bcSV-Sanjana ConnectionImpl(const ConnectionImpl&) = delete; 5288ada3bcSV-Sanjana ConnectionImpl(const ConnectionImpl&&) = delete; 5388ada3bcSV-Sanjana ConnectionImpl& operator=(const ConnectionImpl&) = delete; 5488ada3bcSV-Sanjana ConnectionImpl& operator=(const ConnectionImpl&&) = delete; 5588ada3bcSV-Sanjana 5688ada3bcSV-Sanjana ~ConnectionImpl() override 5788ada3bcSV-Sanjana { 5862598e31SEd Tanous BMCWEB_LOG_DEBUG("SSE ConnectionImpl: SSE destructor {}", logPtr(this)); 5988ada3bcSV-Sanjana } 6088ada3bcSV-Sanjana 6188ada3bcSV-Sanjana boost::asio::io_context& getIoContext() override 6288ada3bcSV-Sanjana { 6388ada3bcSV-Sanjana return static_cast<boost::asio::io_context&>( 6488ada3bcSV-Sanjana adaptor.get_executor().context()); 6588ada3bcSV-Sanjana } 6688ada3bcSV-Sanjana 6788ada3bcSV-Sanjana void start() 6888ada3bcSV-Sanjana { 696fde95faSEd Tanous if (!openHandler) 7088ada3bcSV-Sanjana { 7162598e31SEd Tanous BMCWEB_LOG_CRITICAL("No open handler???"); 726fde95faSEd Tanous return; 7388ada3bcSV-Sanjana } 746fde95faSEd Tanous openHandler(*this); 75*8f79c5b6SEd Tanous sendSSEHeader(); 7688ada3bcSV-Sanjana } 7788ada3bcSV-Sanjana 7888ada3bcSV-Sanjana void close(const std::string_view msg) override 7988ada3bcSV-Sanjana { 80*8f79c5b6SEd Tanous BMCWEB_LOG_DEBUG("Closing connection with reason {}", msg); 8188ada3bcSV-Sanjana // send notification to handler for cleanup 8288ada3bcSV-Sanjana if (closeHandler) 8388ada3bcSV-Sanjana { 846fde95faSEd Tanous closeHandler(*this); 8588ada3bcSV-Sanjana } 8662598e31SEd Tanous BMCWEB_LOG_DEBUG("Closing SSE connection {} - {}", logPtr(this), msg); 876fde95faSEd Tanous boost::beast::get_lowest_layer(adaptor).close(); 8888ada3bcSV-Sanjana } 8988ada3bcSV-Sanjana 906fde95faSEd Tanous void sendSSEHeader() 9188ada3bcSV-Sanjana { 9262598e31SEd Tanous BMCWEB_LOG_DEBUG("Starting SSE connection"); 93*8f79c5b6SEd Tanous 946fde95faSEd Tanous res.set(boost::beast::http::field::content_type, "text/event-stream"); 958ece0e45SEd Tanous boost::beast::http::response_serializer<BodyType>& serial = 96*8f79c5b6SEd Tanous serializer.emplace(res); 9788ada3bcSV-Sanjana 9888ada3bcSV-Sanjana boost::beast::http::async_write_header( 998ece0e45SEd Tanous adaptor, serial, 10088ada3bcSV-Sanjana std::bind_front(&ConnectionImpl::sendSSEHeaderCallback, this, 10188ada3bcSV-Sanjana shared_from_this())); 10288ada3bcSV-Sanjana } 10388ada3bcSV-Sanjana 10488ada3bcSV-Sanjana void sendSSEHeaderCallback(const std::shared_ptr<Connection>& /*self*/, 1056fde95faSEd Tanous const boost::system::error_code& ec, 1066fde95faSEd Tanous size_t /*bytesSent*/) 10788ada3bcSV-Sanjana { 1086fde95faSEd Tanous serializer.reset(); 10988ada3bcSV-Sanjana if (ec) 11088ada3bcSV-Sanjana { 11162598e31SEd Tanous BMCWEB_LOG_ERROR("Error sending header{}", ec); 11288ada3bcSV-Sanjana close("async_write_header failed"); 11388ada3bcSV-Sanjana return; 11488ada3bcSV-Sanjana } 11562598e31SEd Tanous BMCWEB_LOG_DEBUG("SSE header sent - Connection established"); 11688ada3bcSV-Sanjana 11788ada3bcSV-Sanjana // SSE stream header sent, So let us setup monitor. 11888ada3bcSV-Sanjana // Any read data on this stream will be error in case of SSE. 119*8f79c5b6SEd Tanous adaptor.async_read_some(boost::asio::buffer(buffer), 1206fde95faSEd Tanous std::bind_front(&ConnectionImpl::afterReadError, 1216fde95faSEd Tanous this, shared_from_this())); 12288ada3bcSV-Sanjana } 12388ada3bcSV-Sanjana 1246fde95faSEd Tanous void afterReadError(const std::shared_ptr<Connection>& /*self*/, 125*8f79c5b6SEd Tanous const boost::system::error_code& ec, size_t bytesRead) 12688ada3bcSV-Sanjana { 127*8f79c5b6SEd Tanous BMCWEB_LOG_DEBUG("Read {}", bytesRead); 1286fde95faSEd Tanous if (ec == boost::asio::error::operation_aborted) 1296fde95faSEd Tanous { 1306fde95faSEd Tanous return; 13188ada3bcSV-Sanjana } 13288ada3bcSV-Sanjana if (ec) 13388ada3bcSV-Sanjana { 13462598e31SEd Tanous BMCWEB_LOG_ERROR("Read error: {}", ec); 13588ada3bcSV-Sanjana } 13688ada3bcSV-Sanjana 1376fde95faSEd Tanous close("Close SSE connection"); 13888ada3bcSV-Sanjana } 13988ada3bcSV-Sanjana 14088ada3bcSV-Sanjana void doWrite() 14188ada3bcSV-Sanjana { 14288ada3bcSV-Sanjana if (doingWrite) 14388ada3bcSV-Sanjana { 14488ada3bcSV-Sanjana return; 14588ada3bcSV-Sanjana } 14688ada3bcSV-Sanjana if (inputBuffer.size() == 0) 14788ada3bcSV-Sanjana { 14862598e31SEd Tanous BMCWEB_LOG_DEBUG("inputBuffer is empty... Bailing out"); 14988ada3bcSV-Sanjana return; 15088ada3bcSV-Sanjana } 1516fde95faSEd Tanous startTimeout(); 15288ada3bcSV-Sanjana doingWrite = true; 15388ada3bcSV-Sanjana 15488ada3bcSV-Sanjana adaptor.async_write_some( 15588ada3bcSV-Sanjana inputBuffer.data(), 15688ada3bcSV-Sanjana std::bind_front(&ConnectionImpl::doWriteCallback, this, 157*8f79c5b6SEd Tanous shared_from_this())); 15888ada3bcSV-Sanjana } 15988ada3bcSV-Sanjana 160*8f79c5b6SEd Tanous void doWriteCallback(const std::shared_ptr<Connection>& /*self*/, 16188ada3bcSV-Sanjana const boost::beast::error_code& ec, 1626fde95faSEd Tanous size_t bytesTransferred) 16388ada3bcSV-Sanjana { 1646fde95faSEd Tanous timer.cancel(); 16588ada3bcSV-Sanjana doingWrite = false; 16688ada3bcSV-Sanjana inputBuffer.consume(bytesTransferred); 16788ada3bcSV-Sanjana 16888ada3bcSV-Sanjana if (ec == boost::asio::error::eof) 16988ada3bcSV-Sanjana { 17062598e31SEd Tanous BMCWEB_LOG_ERROR("async_write_some() SSE stream closed"); 17188ada3bcSV-Sanjana close("SSE stream closed"); 17288ada3bcSV-Sanjana return; 17388ada3bcSV-Sanjana } 17488ada3bcSV-Sanjana 17588ada3bcSV-Sanjana if (ec) 17688ada3bcSV-Sanjana { 17762598e31SEd Tanous BMCWEB_LOG_ERROR("async_write_some() failed: {}", ec.message()); 17888ada3bcSV-Sanjana close("async_write_some failed"); 17988ada3bcSV-Sanjana return; 18088ada3bcSV-Sanjana } 18162598e31SEd Tanous BMCWEB_LOG_DEBUG("async_write_some() bytes transferred: {}", 18262598e31SEd Tanous bytesTransferred); 18388ada3bcSV-Sanjana 18488ada3bcSV-Sanjana doWrite(); 18588ada3bcSV-Sanjana } 18688ada3bcSV-Sanjana 18788ada3bcSV-Sanjana void sendEvent(std::string_view id, std::string_view msg) override 18888ada3bcSV-Sanjana { 18988ada3bcSV-Sanjana if (msg.empty()) 19088ada3bcSV-Sanjana { 19162598e31SEd Tanous BMCWEB_LOG_DEBUG("Empty data, bailing out."); 19288ada3bcSV-Sanjana return; 19388ada3bcSV-Sanjana } 19488ada3bcSV-Sanjana 1956fde95faSEd Tanous dataFormat(id, msg); 19688ada3bcSV-Sanjana 19788ada3bcSV-Sanjana doWrite(); 19888ada3bcSV-Sanjana } 19988ada3bcSV-Sanjana 2006fde95faSEd Tanous void dataFormat(std::string_view id, std::string_view msg) 20188ada3bcSV-Sanjana { 202*8f79c5b6SEd Tanous constexpr size_t bufferLimit = 10485760U; // 10MB 203*8f79c5b6SEd Tanous if (id.size() + msg.size() + inputBuffer.size() >= bufferLimit) 204*8f79c5b6SEd Tanous { 205*8f79c5b6SEd Tanous BMCWEB_LOG_ERROR("SSE Buffer overflow while waiting for client"); 206*8f79c5b6SEd Tanous close("Buffer overflow"); 207*8f79c5b6SEd Tanous return; 208*8f79c5b6SEd Tanous } 20988ada3bcSV-Sanjana std::string rawData; 21088ada3bcSV-Sanjana if (!id.empty()) 21188ada3bcSV-Sanjana { 21288ada3bcSV-Sanjana rawData += "id: "; 2136fde95faSEd Tanous rawData.append(id); 21488ada3bcSV-Sanjana rawData += "\n"; 21588ada3bcSV-Sanjana } 21688ada3bcSV-Sanjana 21788ada3bcSV-Sanjana rawData += "data: "; 21888ada3bcSV-Sanjana for (char character : msg) 21988ada3bcSV-Sanjana { 22088ada3bcSV-Sanjana rawData += character; 22188ada3bcSV-Sanjana if (character == '\n') 22288ada3bcSV-Sanjana { 22388ada3bcSV-Sanjana rawData += "data: "; 22488ada3bcSV-Sanjana } 22588ada3bcSV-Sanjana } 22688ada3bcSV-Sanjana rawData += "\n\n"; 22788ada3bcSV-Sanjana 22888ada3bcSV-Sanjana boost::asio::buffer_copy(inputBuffer.prepare(rawData.size()), 22988ada3bcSV-Sanjana boost::asio::buffer(rawData)); 23088ada3bcSV-Sanjana inputBuffer.commit(rawData.size()); 23188ada3bcSV-Sanjana } 23288ada3bcSV-Sanjana 2336fde95faSEd Tanous void startTimeout() 23488ada3bcSV-Sanjana { 23588ada3bcSV-Sanjana std::weak_ptr<Connection> weakSelf = weak_from_this(); 23688ada3bcSV-Sanjana timer.expires_after(std::chrono::seconds(30)); 23788ada3bcSV-Sanjana timer.async_wait(std::bind_front(&ConnectionImpl::onTimeoutCallback, 23888ada3bcSV-Sanjana this, weak_from_this())); 23988ada3bcSV-Sanjana } 24088ada3bcSV-Sanjana 24188ada3bcSV-Sanjana void onTimeoutCallback(const std::weak_ptr<Connection>& weakSelf, 2426fde95faSEd Tanous const boost::system::error_code& ec) 24388ada3bcSV-Sanjana { 24488ada3bcSV-Sanjana std::shared_ptr<Connection> self = weakSelf.lock(); 24588ada3bcSV-Sanjana if (!self) 24688ada3bcSV-Sanjana { 24762598e31SEd Tanous BMCWEB_LOG_CRITICAL("{} Failed to capture connection", 24862598e31SEd Tanous logPtr(self.get())); 24988ada3bcSV-Sanjana return; 25088ada3bcSV-Sanjana } 25188ada3bcSV-Sanjana 25288ada3bcSV-Sanjana if (ec == boost::asio::error::operation_aborted) 25388ada3bcSV-Sanjana { 254*8f79c5b6SEd Tanous BMCWEB_LOG_DEBUG("Timer operation aborted"); 2558ece0e45SEd Tanous // Canceled wait means the path succeeded. 25688ada3bcSV-Sanjana return; 25788ada3bcSV-Sanjana } 25888ada3bcSV-Sanjana if (ec) 25988ada3bcSV-Sanjana { 26062598e31SEd Tanous BMCWEB_LOG_CRITICAL("{} timer failed {}", logPtr(self.get()), ec); 26188ada3bcSV-Sanjana } 26288ada3bcSV-Sanjana 26362598e31SEd Tanous BMCWEB_LOG_WARNING("{} Connection timed out, closing", 26462598e31SEd Tanous logPtr(self.get())); 26588ada3bcSV-Sanjana 26688ada3bcSV-Sanjana self->close("closing connection"); 26788ada3bcSV-Sanjana } 26888ada3bcSV-Sanjana 26988ada3bcSV-Sanjana private: 270*8f79c5b6SEd Tanous std::array<char, 1> buffer{}; 27188ada3bcSV-Sanjana boost::beast::multi_buffer inputBuffer; 27288ada3bcSV-Sanjana 273*8f79c5b6SEd Tanous Adaptor adaptor; 274*8f79c5b6SEd Tanous 275*8f79c5b6SEd Tanous using BodyType = bmcweb::FileBody; 276*8f79c5b6SEd Tanous boost::beast::http::response<BodyType> res; 277*8f79c5b6SEd Tanous std::optional<boost::beast::http::response_serializer<BodyType>> serializer; 278*8f79c5b6SEd Tanous boost::asio::io_context& ioc; 2796fde95faSEd Tanous boost::asio::steady_timer timer; 28088ada3bcSV-Sanjana bool doingWrite = false; 28188ada3bcSV-Sanjana 2826fde95faSEd Tanous std::function<void(Connection&)> openHandler; 2836fde95faSEd Tanous std::function<void(Connection&)> closeHandler; 28488ada3bcSV-Sanjana }; 28588ada3bcSV-Sanjana } // namespace sse_socket 28688ada3bcSV-Sanjana } // namespace crow 287