188ada3bcSV-Sanjana #pragma once 2*6fde95faSEd Tanous #include "dbus_singleton.hpp" 388ada3bcSV-Sanjana #include "http_request.hpp" 488ada3bcSV-Sanjana #include "http_response.hpp" 588ada3bcSV-Sanjana 688ada3bcSV-Sanjana #include <boost/algorithm/string/predicate.hpp> 788ada3bcSV-Sanjana #include <boost/asio/buffer.hpp> 888ada3bcSV-Sanjana #include <boost/asio/steady_timer.hpp> 988ada3bcSV-Sanjana #include <boost/beast/core/multi_buffer.hpp> 1088ada3bcSV-Sanjana #include <boost/beast/http/buffer_body.hpp> 1188ada3bcSV-Sanjana #include <boost/beast/websocket.hpp> 1288ada3bcSV-Sanjana 1388ada3bcSV-Sanjana #include <array> 1488ada3bcSV-Sanjana #include <functional> 1588ada3bcSV-Sanjana 1688ada3bcSV-Sanjana #ifdef BMCWEB_ENABLE_SSL 1788ada3bcSV-Sanjana #include <boost/beast/websocket/ssl.hpp> 1888ada3bcSV-Sanjana #endif 1988ada3bcSV-Sanjana 2088ada3bcSV-Sanjana namespace crow 2188ada3bcSV-Sanjana { 2288ada3bcSV-Sanjana 2388ada3bcSV-Sanjana namespace sse_socket 2488ada3bcSV-Sanjana { 2588ada3bcSV-Sanjana struct Connection : std::enable_shared_from_this<Connection> 2688ada3bcSV-Sanjana { 2788ada3bcSV-Sanjana public: 28*6fde95faSEd Tanous Connection() = default; 2988ada3bcSV-Sanjana 3088ada3bcSV-Sanjana Connection(const Connection&) = delete; 3188ada3bcSV-Sanjana Connection(Connection&&) = delete; 3288ada3bcSV-Sanjana Connection& operator=(const Connection&) = delete; 3388ada3bcSV-Sanjana Connection& operator=(const Connection&&) = delete; 3488ada3bcSV-Sanjana virtual ~Connection() = default; 3588ada3bcSV-Sanjana 3688ada3bcSV-Sanjana virtual boost::asio::io_context& getIoContext() = 0; 3788ada3bcSV-Sanjana virtual void close(std::string_view msg = "quit") = 0; 3888ada3bcSV-Sanjana virtual void sendEvent(std::string_view id, std::string_view msg) = 0; 3988ada3bcSV-Sanjana }; 4088ada3bcSV-Sanjana 4188ada3bcSV-Sanjana template <typename Adaptor> 4288ada3bcSV-Sanjana class ConnectionImpl : public Connection 4388ada3bcSV-Sanjana { 4488ada3bcSV-Sanjana public: 45*6fde95faSEd Tanous ConnectionImpl(Adaptor&& adaptorIn, 46*6fde95faSEd Tanous std::function<void(Connection&)> openHandlerIn, 47*6fde95faSEd Tanous std::function<void(Connection&)> closeHandlerIn) : 48*6fde95faSEd Tanous adaptor(std::move(adaptorIn)), 49*6fde95faSEd Tanous timer(ioc), openHandler(std::move(openHandlerIn)), 5088ada3bcSV-Sanjana closeHandler(std::move(closeHandlerIn)) 5188ada3bcSV-Sanjana { 5288ada3bcSV-Sanjana BMCWEB_LOG_DEBUG << "SseConnectionImpl: SSE constructor " << this; 5388ada3bcSV-Sanjana } 5488ada3bcSV-Sanjana 5588ada3bcSV-Sanjana ConnectionImpl(const ConnectionImpl&) = delete; 5688ada3bcSV-Sanjana ConnectionImpl(const ConnectionImpl&&) = delete; 5788ada3bcSV-Sanjana ConnectionImpl& operator=(const ConnectionImpl&) = delete; 5888ada3bcSV-Sanjana ConnectionImpl& operator=(const ConnectionImpl&&) = delete; 5988ada3bcSV-Sanjana 6088ada3bcSV-Sanjana ~ConnectionImpl() override 6188ada3bcSV-Sanjana { 6288ada3bcSV-Sanjana BMCWEB_LOG_DEBUG << "SSE ConnectionImpl: SSE destructor " << this; 6388ada3bcSV-Sanjana } 6488ada3bcSV-Sanjana 6588ada3bcSV-Sanjana boost::asio::io_context& getIoContext() override 6688ada3bcSV-Sanjana { 6788ada3bcSV-Sanjana return static_cast<boost::asio::io_context&>( 6888ada3bcSV-Sanjana adaptor.get_executor().context()); 6988ada3bcSV-Sanjana } 7088ada3bcSV-Sanjana 7188ada3bcSV-Sanjana void start() 7288ada3bcSV-Sanjana { 73*6fde95faSEd Tanous if (!openHandler) 7488ada3bcSV-Sanjana { 75*6fde95faSEd Tanous BMCWEB_LOG_CRITICAL << "No open handler???"; 76*6fde95faSEd Tanous return; 7788ada3bcSV-Sanjana } 78*6fde95faSEd Tanous openHandler(*this); 7988ada3bcSV-Sanjana } 8088ada3bcSV-Sanjana 8188ada3bcSV-Sanjana void close(const std::string_view msg) override 8288ada3bcSV-Sanjana { 8388ada3bcSV-Sanjana // send notification to handler for cleanup 8488ada3bcSV-Sanjana if (closeHandler) 8588ada3bcSV-Sanjana { 86*6fde95faSEd Tanous closeHandler(*this); 8788ada3bcSV-Sanjana } 88*6fde95faSEd Tanous BMCWEB_LOG_DEBUG << "Closing SSE connection " << this << " - " << msg; 89*6fde95faSEd Tanous boost::beast::get_lowest_layer(adaptor).close(); 9088ada3bcSV-Sanjana } 9188ada3bcSV-Sanjana 92*6fde95faSEd Tanous void sendSSEHeader() 9388ada3bcSV-Sanjana { 9488ada3bcSV-Sanjana BMCWEB_LOG_DEBUG << "Starting SSE connection"; 9588ada3bcSV-Sanjana using BodyType = boost::beast::http::buffer_body; 96*6fde95faSEd Tanous boost::beast::http::response<BodyType> res( 97*6fde95faSEd Tanous boost::beast::http::status::ok, 11, BodyType{}); 98*6fde95faSEd Tanous res.set(boost::beast::http::field::content_type, "text/event-stream"); 99*6fde95faSEd Tanous res.body().more = true; 100*6fde95faSEd Tanous boost::beast::http::response_serializer<BodyType>& ser = 101*6fde95faSEd Tanous serializer.emplace(std::move(res)); 10288ada3bcSV-Sanjana 10388ada3bcSV-Sanjana boost::beast::http::async_write_header( 104*6fde95faSEd Tanous adaptor, ser, 10588ada3bcSV-Sanjana std::bind_front(&ConnectionImpl::sendSSEHeaderCallback, this, 10688ada3bcSV-Sanjana shared_from_this())); 10788ada3bcSV-Sanjana } 10888ada3bcSV-Sanjana 10988ada3bcSV-Sanjana void sendSSEHeaderCallback(const std::shared_ptr<Connection>& /*self*/, 110*6fde95faSEd Tanous const boost::system::error_code& ec, 111*6fde95faSEd Tanous size_t /*bytesSent*/) 11288ada3bcSV-Sanjana { 113*6fde95faSEd Tanous serializer.reset(); 11488ada3bcSV-Sanjana if (ec) 11588ada3bcSV-Sanjana { 11688ada3bcSV-Sanjana BMCWEB_LOG_ERROR << "Error sending header" << ec; 11788ada3bcSV-Sanjana close("async_write_header failed"); 11888ada3bcSV-Sanjana return; 11988ada3bcSV-Sanjana } 12088ada3bcSV-Sanjana BMCWEB_LOG_DEBUG << "SSE header sent - Connection established"; 12188ada3bcSV-Sanjana 12288ada3bcSV-Sanjana serializer.reset(); 12388ada3bcSV-Sanjana 12488ada3bcSV-Sanjana // SSE stream header sent, So let us setup monitor. 12588ada3bcSV-Sanjana // Any read data on this stream will be error in case of SSE. 126*6fde95faSEd Tanous 127*6fde95faSEd Tanous adaptor.async_wait(boost::asio::ip::tcp::socket::wait_error, 128*6fde95faSEd Tanous std::bind_front(&ConnectionImpl::afterReadError, 129*6fde95faSEd Tanous this, shared_from_this())); 13088ada3bcSV-Sanjana } 13188ada3bcSV-Sanjana 132*6fde95faSEd Tanous void afterReadError(const std::shared_ptr<Connection>& /*self*/, 133*6fde95faSEd Tanous const boost::system::error_code& ec) 13488ada3bcSV-Sanjana { 135*6fde95faSEd Tanous if (ec == boost::asio::error::operation_aborted) 136*6fde95faSEd Tanous { 137*6fde95faSEd Tanous return; 13888ada3bcSV-Sanjana } 13988ada3bcSV-Sanjana if (ec) 14088ada3bcSV-Sanjana { 14188ada3bcSV-Sanjana BMCWEB_LOG_ERROR << "Read error: " << ec; 14288ada3bcSV-Sanjana } 14388ada3bcSV-Sanjana 144*6fde95faSEd Tanous close("Close SSE connection"); 14588ada3bcSV-Sanjana } 14688ada3bcSV-Sanjana 14788ada3bcSV-Sanjana void doWrite() 14888ada3bcSV-Sanjana { 14988ada3bcSV-Sanjana if (doingWrite) 15088ada3bcSV-Sanjana { 15188ada3bcSV-Sanjana return; 15288ada3bcSV-Sanjana } 15388ada3bcSV-Sanjana if (inputBuffer.size() == 0) 15488ada3bcSV-Sanjana { 15588ada3bcSV-Sanjana BMCWEB_LOG_DEBUG << "inputBuffer is empty... Bailing out"; 15688ada3bcSV-Sanjana return; 15788ada3bcSV-Sanjana } 158*6fde95faSEd Tanous startTimeout(); 15988ada3bcSV-Sanjana doingWrite = true; 16088ada3bcSV-Sanjana 16188ada3bcSV-Sanjana adaptor.async_write_some( 16288ada3bcSV-Sanjana inputBuffer.data(), 16388ada3bcSV-Sanjana std::bind_front(&ConnectionImpl::doWriteCallback, this, 164*6fde95faSEd Tanous weak_from_this())); 16588ada3bcSV-Sanjana } 16688ada3bcSV-Sanjana 167*6fde95faSEd Tanous void doWriteCallback(const std::weak_ptr<Connection>& weak, 16888ada3bcSV-Sanjana const boost::beast::error_code& ec, 169*6fde95faSEd Tanous size_t bytesTransferred) 17088ada3bcSV-Sanjana { 171*6fde95faSEd Tanous auto self = weak.lock(); 172*6fde95faSEd Tanous if (self == nullptr) 173*6fde95faSEd Tanous { 174*6fde95faSEd Tanous return; 175*6fde95faSEd Tanous } 176*6fde95faSEd Tanous timer.cancel(); 17788ada3bcSV-Sanjana doingWrite = false; 17888ada3bcSV-Sanjana inputBuffer.consume(bytesTransferred); 17988ada3bcSV-Sanjana 18088ada3bcSV-Sanjana if (ec == boost::asio::error::eof) 18188ada3bcSV-Sanjana { 18288ada3bcSV-Sanjana BMCWEB_LOG_ERROR << "async_write_some() SSE stream closed"; 18388ada3bcSV-Sanjana close("SSE stream closed"); 18488ada3bcSV-Sanjana return; 18588ada3bcSV-Sanjana } 18688ada3bcSV-Sanjana 18788ada3bcSV-Sanjana if (ec) 18888ada3bcSV-Sanjana { 18988ada3bcSV-Sanjana BMCWEB_LOG_ERROR << "async_write_some() failed: " << ec.message(); 19088ada3bcSV-Sanjana close("async_write_some failed"); 19188ada3bcSV-Sanjana return; 19288ada3bcSV-Sanjana } 19388ada3bcSV-Sanjana BMCWEB_LOG_DEBUG << "async_write_some() bytes transferred: " 19488ada3bcSV-Sanjana << bytesTransferred; 19588ada3bcSV-Sanjana 19688ada3bcSV-Sanjana doWrite(); 19788ada3bcSV-Sanjana } 19888ada3bcSV-Sanjana 19988ada3bcSV-Sanjana void sendEvent(std::string_view id, std::string_view msg) override 20088ada3bcSV-Sanjana { 20188ada3bcSV-Sanjana if (msg.empty()) 20288ada3bcSV-Sanjana { 20388ada3bcSV-Sanjana BMCWEB_LOG_DEBUG << "Empty data, bailing out."; 20488ada3bcSV-Sanjana return; 20588ada3bcSV-Sanjana } 20688ada3bcSV-Sanjana 207*6fde95faSEd Tanous dataFormat(id, msg); 20888ada3bcSV-Sanjana 20988ada3bcSV-Sanjana doWrite(); 21088ada3bcSV-Sanjana } 21188ada3bcSV-Sanjana 212*6fde95faSEd Tanous void dataFormat(std::string_view id, std::string_view msg) 21388ada3bcSV-Sanjana { 21488ada3bcSV-Sanjana std::string rawData; 21588ada3bcSV-Sanjana if (!id.empty()) 21688ada3bcSV-Sanjana { 21788ada3bcSV-Sanjana rawData += "id: "; 218*6fde95faSEd Tanous rawData.append(id); 21988ada3bcSV-Sanjana rawData += "\n"; 22088ada3bcSV-Sanjana } 22188ada3bcSV-Sanjana 22288ada3bcSV-Sanjana rawData += "data: "; 22388ada3bcSV-Sanjana for (char character : msg) 22488ada3bcSV-Sanjana { 22588ada3bcSV-Sanjana rawData += character; 22688ada3bcSV-Sanjana if (character == '\n') 22788ada3bcSV-Sanjana { 22888ada3bcSV-Sanjana rawData += "data: "; 22988ada3bcSV-Sanjana } 23088ada3bcSV-Sanjana } 23188ada3bcSV-Sanjana rawData += "\n\n"; 23288ada3bcSV-Sanjana 23388ada3bcSV-Sanjana boost::asio::buffer_copy(inputBuffer.prepare(rawData.size()), 23488ada3bcSV-Sanjana boost::asio::buffer(rawData)); 23588ada3bcSV-Sanjana inputBuffer.commit(rawData.size()); 23688ada3bcSV-Sanjana } 23788ada3bcSV-Sanjana 238*6fde95faSEd Tanous void startTimeout() 23988ada3bcSV-Sanjana { 24088ada3bcSV-Sanjana std::weak_ptr<Connection> weakSelf = weak_from_this(); 24188ada3bcSV-Sanjana timer.expires_after(std::chrono::seconds(30)); 24288ada3bcSV-Sanjana timer.async_wait(std::bind_front(&ConnectionImpl::onTimeoutCallback, 24388ada3bcSV-Sanjana this, weak_from_this())); 24488ada3bcSV-Sanjana } 24588ada3bcSV-Sanjana 24688ada3bcSV-Sanjana void onTimeoutCallback(const std::weak_ptr<Connection>& weakSelf, 247*6fde95faSEd Tanous const boost::system::error_code& ec) 24888ada3bcSV-Sanjana { 24988ada3bcSV-Sanjana std::shared_ptr<Connection> self = weakSelf.lock(); 25088ada3bcSV-Sanjana if (!self) 25188ada3bcSV-Sanjana { 25288ada3bcSV-Sanjana BMCWEB_LOG_CRITICAL << self << " Failed to capture connection"; 25388ada3bcSV-Sanjana return; 25488ada3bcSV-Sanjana } 25588ada3bcSV-Sanjana 25688ada3bcSV-Sanjana if (ec == boost::asio::error::operation_aborted) 25788ada3bcSV-Sanjana { 25888ada3bcSV-Sanjana BMCWEB_LOG_DEBUG << "operation aborted"; 25988ada3bcSV-Sanjana // Canceled wait means the path succeeeded. 26088ada3bcSV-Sanjana return; 26188ada3bcSV-Sanjana } 26288ada3bcSV-Sanjana if (ec) 26388ada3bcSV-Sanjana { 26488ada3bcSV-Sanjana BMCWEB_LOG_CRITICAL << self << " timer failed " << ec; 26588ada3bcSV-Sanjana } 26688ada3bcSV-Sanjana 26788ada3bcSV-Sanjana BMCWEB_LOG_WARNING << self << "Connection timed out, closing"; 26888ada3bcSV-Sanjana 26988ada3bcSV-Sanjana self->close("closing connection"); 27088ada3bcSV-Sanjana } 27188ada3bcSV-Sanjana 27288ada3bcSV-Sanjana private: 27388ada3bcSV-Sanjana Adaptor adaptor; 27488ada3bcSV-Sanjana 27588ada3bcSV-Sanjana boost::beast::multi_buffer inputBuffer; 27688ada3bcSV-Sanjana 27788ada3bcSV-Sanjana std::optional<boost::beast::http::response_serializer< 278*6fde95faSEd Tanous boost::beast::http::buffer_body>> 27988ada3bcSV-Sanjana serializer; 28088ada3bcSV-Sanjana boost::asio::io_context& ioc = 28188ada3bcSV-Sanjana crow::connections::systemBus->get_io_context(); 282*6fde95faSEd Tanous boost::asio::steady_timer timer; 28388ada3bcSV-Sanjana bool doingWrite = false; 28488ada3bcSV-Sanjana 285*6fde95faSEd Tanous std::function<void(Connection&)> openHandler; 286*6fde95faSEd Tanous std::function<void(Connection&)> closeHandler; 28788ada3bcSV-Sanjana }; 28888ada3bcSV-Sanjana } // namespace sse_socket 28988ada3bcSV-Sanjana } // namespace crow 290