188ada3bcSV-Sanjana #pragma once 26fde95faSEd Tanous #include "dbus_singleton.hpp" 388ada3bcSV-Sanjana #include "http_request.hpp" 488ada3bcSV-Sanjana #include "http_response.hpp" 588ada3bcSV-Sanjana 688ada3bcSV-Sanjana #include <boost/algorithm/string/predicate.hpp> 788ada3bcSV-Sanjana #include <boost/asio/buffer.hpp> 888ada3bcSV-Sanjana #include <boost/asio/steady_timer.hpp> 988ada3bcSV-Sanjana #include <boost/beast/core/multi_buffer.hpp> 1088ada3bcSV-Sanjana #include <boost/beast/http/buffer_body.hpp> 1188ada3bcSV-Sanjana #include <boost/beast/websocket.hpp> 1288ada3bcSV-Sanjana 1388ada3bcSV-Sanjana #include <array> 1488ada3bcSV-Sanjana #include <functional> 1588ada3bcSV-Sanjana 1688ada3bcSV-Sanjana #ifdef BMCWEB_ENABLE_SSL 1788ada3bcSV-Sanjana #include <boost/beast/websocket/ssl.hpp> 1888ada3bcSV-Sanjana #endif 1988ada3bcSV-Sanjana 2088ada3bcSV-Sanjana namespace crow 2188ada3bcSV-Sanjana { 2288ada3bcSV-Sanjana 2388ada3bcSV-Sanjana namespace sse_socket 2488ada3bcSV-Sanjana { 2588ada3bcSV-Sanjana struct Connection : std::enable_shared_from_this<Connection> 2688ada3bcSV-Sanjana { 2788ada3bcSV-Sanjana public: 286fde95faSEd Tanous Connection() = default; 2988ada3bcSV-Sanjana 3088ada3bcSV-Sanjana Connection(const Connection&) = delete; 3188ada3bcSV-Sanjana Connection(Connection&&) = delete; 3288ada3bcSV-Sanjana Connection& operator=(const Connection&) = delete; 3388ada3bcSV-Sanjana Connection& operator=(const Connection&&) = delete; 3488ada3bcSV-Sanjana virtual ~Connection() = default; 3588ada3bcSV-Sanjana 3688ada3bcSV-Sanjana virtual boost::asio::io_context& getIoContext() = 0; 3788ada3bcSV-Sanjana virtual void close(std::string_view msg = "quit") = 0; 3888ada3bcSV-Sanjana virtual void sendEvent(std::string_view id, std::string_view msg) = 0; 3988ada3bcSV-Sanjana }; 4088ada3bcSV-Sanjana 4188ada3bcSV-Sanjana template <typename Adaptor> 4288ada3bcSV-Sanjana class ConnectionImpl : public Connection 4388ada3bcSV-Sanjana { 4488ada3bcSV-Sanjana public: 456fde95faSEd Tanous ConnectionImpl(Adaptor&& adaptorIn, 466fde95faSEd Tanous std::function<void(Connection&)> openHandlerIn, 476fde95faSEd Tanous std::function<void(Connection&)> closeHandlerIn) : 486fde95faSEd Tanous adaptor(std::move(adaptorIn)), 496fde95faSEd Tanous timer(ioc), openHandler(std::move(openHandlerIn)), 5088ada3bcSV-Sanjana closeHandler(std::move(closeHandlerIn)) 5188ada3bcSV-Sanjana { 52*62598e31SEd Tanous BMCWEB_LOG_DEBUG("SseConnectionImpl: SSE constructor {}", logPtr(this)); 5388ada3bcSV-Sanjana } 5488ada3bcSV-Sanjana 5588ada3bcSV-Sanjana ConnectionImpl(const ConnectionImpl&) = delete; 5688ada3bcSV-Sanjana ConnectionImpl(const ConnectionImpl&&) = delete; 5788ada3bcSV-Sanjana ConnectionImpl& operator=(const ConnectionImpl&) = delete; 5888ada3bcSV-Sanjana ConnectionImpl& operator=(const ConnectionImpl&&) = delete; 5988ada3bcSV-Sanjana 6088ada3bcSV-Sanjana ~ConnectionImpl() override 6188ada3bcSV-Sanjana { 62*62598e31SEd Tanous BMCWEB_LOG_DEBUG("SSE ConnectionImpl: SSE destructor {}", logPtr(this)); 6388ada3bcSV-Sanjana } 6488ada3bcSV-Sanjana 6588ada3bcSV-Sanjana boost::asio::io_context& getIoContext() override 6688ada3bcSV-Sanjana { 6788ada3bcSV-Sanjana return static_cast<boost::asio::io_context&>( 6888ada3bcSV-Sanjana adaptor.get_executor().context()); 6988ada3bcSV-Sanjana } 7088ada3bcSV-Sanjana 7188ada3bcSV-Sanjana void start() 7288ada3bcSV-Sanjana { 736fde95faSEd Tanous if (!openHandler) 7488ada3bcSV-Sanjana { 75*62598e31SEd Tanous BMCWEB_LOG_CRITICAL("No open handler???"); 766fde95faSEd Tanous return; 7788ada3bcSV-Sanjana } 786fde95faSEd Tanous openHandler(*this); 7988ada3bcSV-Sanjana } 8088ada3bcSV-Sanjana 8188ada3bcSV-Sanjana void close(const std::string_view msg) override 8288ada3bcSV-Sanjana { 8388ada3bcSV-Sanjana // send notification to handler for cleanup 8488ada3bcSV-Sanjana if (closeHandler) 8588ada3bcSV-Sanjana { 866fde95faSEd Tanous closeHandler(*this); 8788ada3bcSV-Sanjana } 88*62598e31SEd Tanous BMCWEB_LOG_DEBUG("Closing SSE connection {} - {}", logPtr(this), msg); 896fde95faSEd Tanous boost::beast::get_lowest_layer(adaptor).close(); 9088ada3bcSV-Sanjana } 9188ada3bcSV-Sanjana 926fde95faSEd Tanous void sendSSEHeader() 9388ada3bcSV-Sanjana { 94*62598e31SEd Tanous BMCWEB_LOG_DEBUG("Starting SSE connection"); 9588ada3bcSV-Sanjana using BodyType = boost::beast::http::buffer_body; 966fde95faSEd Tanous boost::beast::http::response<BodyType> res( 976fde95faSEd Tanous boost::beast::http::status::ok, 11, BodyType{}); 986fde95faSEd Tanous res.set(boost::beast::http::field::content_type, "text/event-stream"); 996fde95faSEd Tanous res.body().more = true; 1006fde95faSEd Tanous boost::beast::http::response_serializer<BodyType>& ser = 1016fde95faSEd Tanous serializer.emplace(std::move(res)); 10288ada3bcSV-Sanjana 10388ada3bcSV-Sanjana boost::beast::http::async_write_header( 1046fde95faSEd Tanous adaptor, ser, 10588ada3bcSV-Sanjana std::bind_front(&ConnectionImpl::sendSSEHeaderCallback, this, 10688ada3bcSV-Sanjana shared_from_this())); 10788ada3bcSV-Sanjana } 10888ada3bcSV-Sanjana 10988ada3bcSV-Sanjana void sendSSEHeaderCallback(const std::shared_ptr<Connection>& /*self*/, 1106fde95faSEd Tanous const boost::system::error_code& ec, 1116fde95faSEd Tanous size_t /*bytesSent*/) 11288ada3bcSV-Sanjana { 1136fde95faSEd Tanous serializer.reset(); 11488ada3bcSV-Sanjana if (ec) 11588ada3bcSV-Sanjana { 116*62598e31SEd Tanous BMCWEB_LOG_ERROR("Error sending header{}", ec); 11788ada3bcSV-Sanjana close("async_write_header failed"); 11888ada3bcSV-Sanjana return; 11988ada3bcSV-Sanjana } 120*62598e31SEd Tanous BMCWEB_LOG_DEBUG("SSE header sent - Connection established"); 12188ada3bcSV-Sanjana 12288ada3bcSV-Sanjana serializer.reset(); 12388ada3bcSV-Sanjana 12488ada3bcSV-Sanjana // SSE stream header sent, So let us setup monitor. 12588ada3bcSV-Sanjana // Any read data on this stream will be error in case of SSE. 1266fde95faSEd Tanous 1276fde95faSEd Tanous adaptor.async_wait(boost::asio::ip::tcp::socket::wait_error, 1286fde95faSEd Tanous std::bind_front(&ConnectionImpl::afterReadError, 1296fde95faSEd Tanous this, shared_from_this())); 13088ada3bcSV-Sanjana } 13188ada3bcSV-Sanjana 1326fde95faSEd Tanous void afterReadError(const std::shared_ptr<Connection>& /*self*/, 1336fde95faSEd Tanous const boost::system::error_code& ec) 13488ada3bcSV-Sanjana { 1356fde95faSEd Tanous if (ec == boost::asio::error::operation_aborted) 1366fde95faSEd Tanous { 1376fde95faSEd Tanous return; 13888ada3bcSV-Sanjana } 13988ada3bcSV-Sanjana if (ec) 14088ada3bcSV-Sanjana { 141*62598e31SEd Tanous BMCWEB_LOG_ERROR("Read error: {}", ec); 14288ada3bcSV-Sanjana } 14388ada3bcSV-Sanjana 1446fde95faSEd Tanous close("Close SSE connection"); 14588ada3bcSV-Sanjana } 14688ada3bcSV-Sanjana 14788ada3bcSV-Sanjana void doWrite() 14888ada3bcSV-Sanjana { 14988ada3bcSV-Sanjana if (doingWrite) 15088ada3bcSV-Sanjana { 15188ada3bcSV-Sanjana return; 15288ada3bcSV-Sanjana } 15388ada3bcSV-Sanjana if (inputBuffer.size() == 0) 15488ada3bcSV-Sanjana { 155*62598e31SEd Tanous BMCWEB_LOG_DEBUG("inputBuffer is empty... Bailing out"); 15688ada3bcSV-Sanjana return; 15788ada3bcSV-Sanjana } 1586fde95faSEd Tanous startTimeout(); 15988ada3bcSV-Sanjana doingWrite = true; 16088ada3bcSV-Sanjana 16188ada3bcSV-Sanjana adaptor.async_write_some( 16288ada3bcSV-Sanjana inputBuffer.data(), 16388ada3bcSV-Sanjana std::bind_front(&ConnectionImpl::doWriteCallback, this, 1646fde95faSEd Tanous weak_from_this())); 16588ada3bcSV-Sanjana } 16688ada3bcSV-Sanjana 1676fde95faSEd Tanous void doWriteCallback(const std::weak_ptr<Connection>& weak, 16888ada3bcSV-Sanjana const boost::beast::error_code& ec, 1696fde95faSEd Tanous size_t bytesTransferred) 17088ada3bcSV-Sanjana { 1716fde95faSEd Tanous auto self = weak.lock(); 1726fde95faSEd Tanous if (self == nullptr) 1736fde95faSEd Tanous { 1746fde95faSEd Tanous return; 1756fde95faSEd Tanous } 1766fde95faSEd Tanous timer.cancel(); 17788ada3bcSV-Sanjana doingWrite = false; 17888ada3bcSV-Sanjana inputBuffer.consume(bytesTransferred); 17988ada3bcSV-Sanjana 18088ada3bcSV-Sanjana if (ec == boost::asio::error::eof) 18188ada3bcSV-Sanjana { 182*62598e31SEd Tanous BMCWEB_LOG_ERROR("async_write_some() SSE stream closed"); 18388ada3bcSV-Sanjana close("SSE stream closed"); 18488ada3bcSV-Sanjana return; 18588ada3bcSV-Sanjana } 18688ada3bcSV-Sanjana 18788ada3bcSV-Sanjana if (ec) 18888ada3bcSV-Sanjana { 189*62598e31SEd Tanous BMCWEB_LOG_ERROR("async_write_some() failed: {}", ec.message()); 19088ada3bcSV-Sanjana close("async_write_some failed"); 19188ada3bcSV-Sanjana return; 19288ada3bcSV-Sanjana } 193*62598e31SEd Tanous BMCWEB_LOG_DEBUG("async_write_some() bytes transferred: {}", 194*62598e31SEd Tanous bytesTransferred); 19588ada3bcSV-Sanjana 19688ada3bcSV-Sanjana doWrite(); 19788ada3bcSV-Sanjana } 19888ada3bcSV-Sanjana 19988ada3bcSV-Sanjana void sendEvent(std::string_view id, std::string_view msg) override 20088ada3bcSV-Sanjana { 20188ada3bcSV-Sanjana if (msg.empty()) 20288ada3bcSV-Sanjana { 203*62598e31SEd Tanous BMCWEB_LOG_DEBUG("Empty data, bailing out."); 20488ada3bcSV-Sanjana return; 20588ada3bcSV-Sanjana } 20688ada3bcSV-Sanjana 2076fde95faSEd Tanous dataFormat(id, msg); 20888ada3bcSV-Sanjana 20988ada3bcSV-Sanjana doWrite(); 21088ada3bcSV-Sanjana } 21188ada3bcSV-Sanjana 2126fde95faSEd Tanous void dataFormat(std::string_view id, std::string_view msg) 21388ada3bcSV-Sanjana { 21488ada3bcSV-Sanjana std::string rawData; 21588ada3bcSV-Sanjana if (!id.empty()) 21688ada3bcSV-Sanjana { 21788ada3bcSV-Sanjana rawData += "id: "; 2186fde95faSEd Tanous rawData.append(id); 21988ada3bcSV-Sanjana rawData += "\n"; 22088ada3bcSV-Sanjana } 22188ada3bcSV-Sanjana 22288ada3bcSV-Sanjana rawData += "data: "; 22388ada3bcSV-Sanjana for (char character : msg) 22488ada3bcSV-Sanjana { 22588ada3bcSV-Sanjana rawData += character; 22688ada3bcSV-Sanjana if (character == '\n') 22788ada3bcSV-Sanjana { 22888ada3bcSV-Sanjana rawData += "data: "; 22988ada3bcSV-Sanjana } 23088ada3bcSV-Sanjana } 23188ada3bcSV-Sanjana rawData += "\n\n"; 23288ada3bcSV-Sanjana 23388ada3bcSV-Sanjana boost::asio::buffer_copy(inputBuffer.prepare(rawData.size()), 23488ada3bcSV-Sanjana boost::asio::buffer(rawData)); 23588ada3bcSV-Sanjana inputBuffer.commit(rawData.size()); 23688ada3bcSV-Sanjana } 23788ada3bcSV-Sanjana 2386fde95faSEd Tanous void startTimeout() 23988ada3bcSV-Sanjana { 24088ada3bcSV-Sanjana std::weak_ptr<Connection> weakSelf = weak_from_this(); 24188ada3bcSV-Sanjana timer.expires_after(std::chrono::seconds(30)); 24288ada3bcSV-Sanjana timer.async_wait(std::bind_front(&ConnectionImpl::onTimeoutCallback, 24388ada3bcSV-Sanjana this, weak_from_this())); 24488ada3bcSV-Sanjana } 24588ada3bcSV-Sanjana 24688ada3bcSV-Sanjana void onTimeoutCallback(const std::weak_ptr<Connection>& weakSelf, 2476fde95faSEd Tanous const boost::system::error_code& ec) 24888ada3bcSV-Sanjana { 24988ada3bcSV-Sanjana std::shared_ptr<Connection> self = weakSelf.lock(); 25088ada3bcSV-Sanjana if (!self) 25188ada3bcSV-Sanjana { 252*62598e31SEd Tanous BMCWEB_LOG_CRITICAL("{} Failed to capture connection", 253*62598e31SEd Tanous logPtr(self.get())); 25488ada3bcSV-Sanjana return; 25588ada3bcSV-Sanjana } 25688ada3bcSV-Sanjana 25788ada3bcSV-Sanjana if (ec == boost::asio::error::operation_aborted) 25888ada3bcSV-Sanjana { 259*62598e31SEd Tanous BMCWEB_LOG_DEBUG("operation aborted"); 26088ada3bcSV-Sanjana // Canceled wait means the path succeeeded. 26188ada3bcSV-Sanjana return; 26288ada3bcSV-Sanjana } 26388ada3bcSV-Sanjana if (ec) 26488ada3bcSV-Sanjana { 265*62598e31SEd Tanous BMCWEB_LOG_CRITICAL("{} timer failed {}", logPtr(self.get()), ec); 26688ada3bcSV-Sanjana } 26788ada3bcSV-Sanjana 268*62598e31SEd Tanous BMCWEB_LOG_WARNING("{}Connection timed out, closing", 269*62598e31SEd Tanous logPtr(self.get())); 27088ada3bcSV-Sanjana 27188ada3bcSV-Sanjana self->close("closing connection"); 27288ada3bcSV-Sanjana } 27388ada3bcSV-Sanjana 27488ada3bcSV-Sanjana private: 27588ada3bcSV-Sanjana Adaptor adaptor; 27688ada3bcSV-Sanjana 27788ada3bcSV-Sanjana boost::beast::multi_buffer inputBuffer; 27888ada3bcSV-Sanjana 27988ada3bcSV-Sanjana std::optional<boost::beast::http::response_serializer< 2806fde95faSEd Tanous boost::beast::http::buffer_body>> 28188ada3bcSV-Sanjana serializer; 28288ada3bcSV-Sanjana boost::asio::io_context& ioc = 28388ada3bcSV-Sanjana crow::connections::systemBus->get_io_context(); 2846fde95faSEd Tanous boost::asio::steady_timer timer; 28588ada3bcSV-Sanjana bool doingWrite = false; 28688ada3bcSV-Sanjana 2876fde95faSEd Tanous std::function<void(Connection&)> openHandler; 2886fde95faSEd Tanous std::function<void(Connection&)> closeHandler; 28988ada3bcSV-Sanjana }; 29088ada3bcSV-Sanjana } // namespace sse_socket 29188ada3bcSV-Sanjana } // namespace crow 292