1*e60300aeSEd Tanous // SPDX-License-Identifier: Apache-2.0 2*e60300aeSEd Tanous // SPDX-FileCopyrightText: Copyright OpenBMC Authors 3*e60300aeSEd Tanous #pragma once 4*e60300aeSEd Tanous #include "boost_formatters.hpp" 5*e60300aeSEd Tanous #include "http_body.hpp" 6*e60300aeSEd Tanous #include "http_request.hpp" 7*e60300aeSEd Tanous #include "io_context_singleton.hpp" 8*e60300aeSEd Tanous #include "logging.hpp" 9*e60300aeSEd Tanous #include "server_sent_event.hpp" 10*e60300aeSEd Tanous 11*e60300aeSEd Tanous #include <boost/asio/buffer.hpp> 12*e60300aeSEd Tanous #include <boost/asio/error.hpp> 13*e60300aeSEd Tanous #include <boost/asio/steady_timer.hpp> 14*e60300aeSEd Tanous #include <boost/beast/core/error.hpp> 15*e60300aeSEd Tanous #include <boost/beast/core/multi_buffer.hpp> 16*e60300aeSEd Tanous #include <boost/beast/http/field.hpp> 17*e60300aeSEd Tanous #include <boost/beast/http/serializer.hpp> 18*e60300aeSEd Tanous #include <boost/beast/http/write.hpp> 19*e60300aeSEd Tanous 20*e60300aeSEd Tanous #include <array> 21*e60300aeSEd Tanous #include <chrono> 22*e60300aeSEd Tanous #include <cstddef> 23*e60300aeSEd Tanous #include <functional> 24*e60300aeSEd Tanous #include <memory> 25*e60300aeSEd Tanous #include <optional> 26*e60300aeSEd Tanous #include <string> 27*e60300aeSEd Tanous #include <string_view> 28*e60300aeSEd Tanous #include <utility> 29*e60300aeSEd Tanous 30*e60300aeSEd Tanous namespace crow 31*e60300aeSEd Tanous { 32*e60300aeSEd Tanous 33*e60300aeSEd Tanous namespace sse_socket 34*e60300aeSEd Tanous { 35*e60300aeSEd Tanous 36*e60300aeSEd Tanous template <typename Adaptor> 37*e60300aeSEd Tanous class ConnectionImpl : public Connection 38*e60300aeSEd Tanous { 39*e60300aeSEd Tanous public: ConnectionImpl(Adaptor && adaptorIn,std::function<void (Connection &,const Request &)> openHandlerIn,std::function<void (Connection &)> closeHandlerIn)40*e60300aeSEd Tanous ConnectionImpl( 41*e60300aeSEd Tanous Adaptor&& adaptorIn, 42*e60300aeSEd Tanous std::function<void(Connection&, const Request&)> openHandlerIn, 43*e60300aeSEd Tanous std::function<void(Connection&)> closeHandlerIn) : 44*e60300aeSEd Tanous adaptor(std::move(adaptorIn)), timer(getIoContext()), 45*e60300aeSEd Tanous openHandler(std::move(openHandlerIn)), 46*e60300aeSEd Tanous closeHandler(std::move(closeHandlerIn)) 47*e60300aeSEd Tanous 48*e60300aeSEd Tanous { 49*e60300aeSEd Tanous BMCWEB_LOG_DEBUG("SseConnectionImpl: SSE constructor {}", logPtr(this)); 50*e60300aeSEd Tanous } 51*e60300aeSEd Tanous 52*e60300aeSEd Tanous ConnectionImpl(const ConnectionImpl&) = delete; 53*e60300aeSEd Tanous ConnectionImpl(const ConnectionImpl&&) = delete; 54*e60300aeSEd Tanous ConnectionImpl& operator=(const ConnectionImpl&) = delete; 55*e60300aeSEd Tanous ConnectionImpl& operator=(const ConnectionImpl&&) = delete; 56*e60300aeSEd Tanous ~ConnectionImpl()57*e60300aeSEd Tanous ~ConnectionImpl() override 58*e60300aeSEd Tanous { 59*e60300aeSEd Tanous BMCWEB_LOG_DEBUG("SSE ConnectionImpl: SSE destructor {}", logPtr(this)); 60*e60300aeSEd Tanous } 61*e60300aeSEd Tanous start(const Request & req)62*e60300aeSEd Tanous void start(const Request& req) 63*e60300aeSEd Tanous { 64*e60300aeSEd Tanous BMCWEB_LOG_DEBUG("Starting SSE connection"); 65*e60300aeSEd Tanous 66*e60300aeSEd Tanous res.set(boost::beast::http::field::content_type, "text/event-stream"); 67*e60300aeSEd Tanous boost::beast::http::response_serializer<BodyType>& serial = 68*e60300aeSEd Tanous serializer.emplace(res); 69*e60300aeSEd Tanous 70*e60300aeSEd Tanous boost::beast::http::async_write_header( 71*e60300aeSEd Tanous adaptor, serial, 72*e60300aeSEd Tanous std::bind_front(&ConnectionImpl::sendSSEHeaderCallback, this, 73*e60300aeSEd Tanous shared_from_this(), req)); 74*e60300aeSEd Tanous } 75*e60300aeSEd Tanous close(const std::string_view msg)76*e60300aeSEd Tanous void close(const std::string_view msg) override 77*e60300aeSEd Tanous { 78*e60300aeSEd Tanous BMCWEB_LOG_DEBUG("Closing connection with reason {}", msg); 79*e60300aeSEd Tanous // send notification to handler for cleanup 80*e60300aeSEd Tanous if (closeHandler) 81*e60300aeSEd Tanous { 82*e60300aeSEd Tanous closeHandler(*this); 83*e60300aeSEd Tanous } 84*e60300aeSEd Tanous BMCWEB_LOG_DEBUG("Closing SSE connection {} - {}", logPtr(this), msg); 85*e60300aeSEd Tanous boost::beast::get_lowest_layer(adaptor).close(); 86*e60300aeSEd Tanous } 87*e60300aeSEd Tanous sendSSEHeaderCallback(const std::shared_ptr<Connection> &,const Request & req,const boost::system::error_code & ec,size_t)88*e60300aeSEd Tanous void sendSSEHeaderCallback(const std::shared_ptr<Connection>& /*self*/, 89*e60300aeSEd Tanous const Request& req, 90*e60300aeSEd Tanous const boost::system::error_code& ec, 91*e60300aeSEd Tanous size_t /*bytesSent*/) 92*e60300aeSEd Tanous { 93*e60300aeSEd Tanous serializer.reset(); 94*e60300aeSEd Tanous if (ec) 95*e60300aeSEd Tanous { 96*e60300aeSEd Tanous BMCWEB_LOG_ERROR("Error sending header{}", ec); 97*e60300aeSEd Tanous close("async_write_header failed"); 98*e60300aeSEd Tanous return; 99*e60300aeSEd Tanous } 100*e60300aeSEd Tanous BMCWEB_LOG_DEBUG("SSE header sent - Connection established"); 101*e60300aeSEd Tanous if (!openHandler) 102*e60300aeSEd Tanous { 103*e60300aeSEd Tanous BMCWEB_LOG_CRITICAL("No open handler???"); 104*e60300aeSEd Tanous return; 105*e60300aeSEd Tanous } 106*e60300aeSEd Tanous openHandler(*this, req); 107*e60300aeSEd Tanous 108*e60300aeSEd Tanous // SSE stream header sent, So let us setup monitor. 109*e60300aeSEd Tanous // Any read data on this stream will be error in case of SSE. 110*e60300aeSEd Tanous adaptor.async_read_some(boost::asio::buffer(buffer), 111*e60300aeSEd Tanous std::bind_front(&ConnectionImpl::afterReadError, 112*e60300aeSEd Tanous this, shared_from_this())); 113*e60300aeSEd Tanous } 114*e60300aeSEd Tanous afterReadError(const std::shared_ptr<Connection> &,const boost::system::error_code & ec,size_t bytesRead)115*e60300aeSEd Tanous void afterReadError(const std::shared_ptr<Connection>& /*self*/, 116*e60300aeSEd Tanous const boost::system::error_code& ec, size_t bytesRead) 117*e60300aeSEd Tanous { 118*e60300aeSEd Tanous BMCWEB_LOG_DEBUG("Read {}", bytesRead); 119*e60300aeSEd Tanous if (ec == boost::asio::error::operation_aborted) 120*e60300aeSEd Tanous { 121*e60300aeSEd Tanous return; 122*e60300aeSEd Tanous } 123*e60300aeSEd Tanous if (ec) 124*e60300aeSEd Tanous { 125*e60300aeSEd Tanous BMCWEB_LOG_ERROR("Read error: {}", ec); 126*e60300aeSEd Tanous } 127*e60300aeSEd Tanous 128*e60300aeSEd Tanous close("Close SSE connection"); 129*e60300aeSEd Tanous } 130*e60300aeSEd Tanous doWrite()131*e60300aeSEd Tanous void doWrite() 132*e60300aeSEd Tanous { 133*e60300aeSEd Tanous if (doingWrite) 134*e60300aeSEd Tanous { 135*e60300aeSEd Tanous return; 136*e60300aeSEd Tanous } 137*e60300aeSEd Tanous if (inputBuffer.size() == 0) 138*e60300aeSEd Tanous { 139*e60300aeSEd Tanous BMCWEB_LOG_DEBUG("inputBuffer is empty... Bailing out"); 140*e60300aeSEd Tanous return; 141*e60300aeSEd Tanous } 142*e60300aeSEd Tanous startTimeout(); 143*e60300aeSEd Tanous doingWrite = true; 144*e60300aeSEd Tanous 145*e60300aeSEd Tanous adaptor.async_write_some( 146*e60300aeSEd Tanous inputBuffer.data(), 147*e60300aeSEd Tanous std::bind_front(&ConnectionImpl::doWriteCallback, this, 148*e60300aeSEd Tanous shared_from_this())); 149*e60300aeSEd Tanous } 150*e60300aeSEd Tanous doWriteCallback(const std::shared_ptr<Connection> &,const boost::beast::error_code & ec,size_t bytesTransferred)151*e60300aeSEd Tanous void doWriteCallback(const std::shared_ptr<Connection>& /*self*/, 152*e60300aeSEd Tanous const boost::beast::error_code& ec, 153*e60300aeSEd Tanous size_t bytesTransferred) 154*e60300aeSEd Tanous { 155*e60300aeSEd Tanous timer.cancel(); 156*e60300aeSEd Tanous doingWrite = false; 157*e60300aeSEd Tanous inputBuffer.consume(bytesTransferred); 158*e60300aeSEd Tanous 159*e60300aeSEd Tanous if (ec == boost::asio::error::eof) 160*e60300aeSEd Tanous { 161*e60300aeSEd Tanous BMCWEB_LOG_ERROR("async_write_some() SSE stream closed"); 162*e60300aeSEd Tanous close("SSE stream closed"); 163*e60300aeSEd Tanous return; 164*e60300aeSEd Tanous } 165*e60300aeSEd Tanous 166*e60300aeSEd Tanous if (ec) 167*e60300aeSEd Tanous { 168*e60300aeSEd Tanous BMCWEB_LOG_ERROR("async_write_some() failed: {}", ec.message()); 169*e60300aeSEd Tanous close("async_write_some failed"); 170*e60300aeSEd Tanous return; 171*e60300aeSEd Tanous } 172*e60300aeSEd Tanous BMCWEB_LOG_DEBUG("async_write_some() bytes transferred: {}", 173*e60300aeSEd Tanous bytesTransferred); 174*e60300aeSEd Tanous 175*e60300aeSEd Tanous doWrite(); 176*e60300aeSEd Tanous } 177*e60300aeSEd Tanous sendSseEvent(std::string_view id,std::string_view msg)178*e60300aeSEd Tanous void sendSseEvent(std::string_view id, std::string_view msg) override 179*e60300aeSEd Tanous { 180*e60300aeSEd Tanous if (msg.empty()) 181*e60300aeSEd Tanous { 182*e60300aeSEd Tanous BMCWEB_LOG_DEBUG("Empty data, bailing out."); 183*e60300aeSEd Tanous return; 184*e60300aeSEd Tanous } 185*e60300aeSEd Tanous 186*e60300aeSEd Tanous dataFormat(id, msg); 187*e60300aeSEd Tanous 188*e60300aeSEd Tanous doWrite(); 189*e60300aeSEd Tanous } 190*e60300aeSEd Tanous dataFormat(std::string_view id,std::string_view msg)191*e60300aeSEd Tanous void dataFormat(std::string_view id, std::string_view msg) 192*e60300aeSEd Tanous { 193*e60300aeSEd Tanous constexpr size_t bufferLimit = 10485760U; // 10MB 194*e60300aeSEd Tanous if (id.size() + msg.size() + inputBuffer.size() >= bufferLimit) 195*e60300aeSEd Tanous { 196*e60300aeSEd Tanous BMCWEB_LOG_ERROR("SSE Buffer overflow while waiting for client"); 197*e60300aeSEd Tanous close("Buffer overflow"); 198*e60300aeSEd Tanous return; 199*e60300aeSEd Tanous } 200*e60300aeSEd Tanous std::string rawData; 201*e60300aeSEd Tanous if (!id.empty()) 202*e60300aeSEd Tanous { 203*e60300aeSEd Tanous rawData += "id: "; 204*e60300aeSEd Tanous rawData.append(id); 205*e60300aeSEd Tanous rawData += "\n"; 206*e60300aeSEd Tanous } 207*e60300aeSEd Tanous 208*e60300aeSEd Tanous rawData += "data: "; 209*e60300aeSEd Tanous for (char character : msg) 210*e60300aeSEd Tanous { 211*e60300aeSEd Tanous rawData += character; 212*e60300aeSEd Tanous if (character == '\n') 213*e60300aeSEd Tanous { 214*e60300aeSEd Tanous rawData += "data: "; 215*e60300aeSEd Tanous } 216*e60300aeSEd Tanous } 217*e60300aeSEd Tanous rawData += "\n\n"; 218*e60300aeSEd Tanous 219*e60300aeSEd Tanous size_t copied = boost::asio::buffer_copy( 220*e60300aeSEd Tanous inputBuffer.prepare(rawData.size()), boost::asio::buffer(rawData)); 221*e60300aeSEd Tanous inputBuffer.commit(copied); 222*e60300aeSEd Tanous } 223*e60300aeSEd Tanous startTimeout()224*e60300aeSEd Tanous void startTimeout() 225*e60300aeSEd Tanous { 226*e60300aeSEd Tanous std::weak_ptr<Connection> weakSelf = weak_from_this(); 227*e60300aeSEd Tanous timer.expires_after(std::chrono::seconds(30)); 228*e60300aeSEd Tanous timer.async_wait(std::bind_front(&ConnectionImpl::onTimeoutCallback, 229*e60300aeSEd Tanous this, weak_from_this())); 230*e60300aeSEd Tanous } 231*e60300aeSEd Tanous onTimeoutCallback(const std::weak_ptr<Connection> & weakSelf,const boost::system::error_code & ec)232*e60300aeSEd Tanous void onTimeoutCallback(const std::weak_ptr<Connection>& weakSelf, 233*e60300aeSEd Tanous const boost::system::error_code& ec) 234*e60300aeSEd Tanous { 235*e60300aeSEd Tanous std::shared_ptr<Connection> self = weakSelf.lock(); 236*e60300aeSEd Tanous if (!self) 237*e60300aeSEd Tanous { 238*e60300aeSEd Tanous BMCWEB_LOG_CRITICAL("{} Failed to capture connection", 239*e60300aeSEd Tanous logPtr(self.get())); 240*e60300aeSEd Tanous return; 241*e60300aeSEd Tanous } 242*e60300aeSEd Tanous 243*e60300aeSEd Tanous if (ec == boost::asio::error::operation_aborted) 244*e60300aeSEd Tanous { 245*e60300aeSEd Tanous BMCWEB_LOG_DEBUG("Timer operation aborted"); 246*e60300aeSEd Tanous // Canceled wait means the path succeeded. 247*e60300aeSEd Tanous return; 248*e60300aeSEd Tanous } 249*e60300aeSEd Tanous if (ec) 250*e60300aeSEd Tanous { 251*e60300aeSEd Tanous BMCWEB_LOG_CRITICAL("{} timer failed {}", logPtr(self.get()), ec); 252*e60300aeSEd Tanous } 253*e60300aeSEd Tanous 254*e60300aeSEd Tanous BMCWEB_LOG_WARNING("{} Connection timed out, closing", 255*e60300aeSEd Tanous logPtr(self.get())); 256*e60300aeSEd Tanous 257*e60300aeSEd Tanous self->close("closing connection"); 258*e60300aeSEd Tanous } 259*e60300aeSEd Tanous 260*e60300aeSEd Tanous private: 261*e60300aeSEd Tanous std::array<char, 1> buffer{}; 262*e60300aeSEd Tanous boost::beast::multi_buffer inputBuffer; 263*e60300aeSEd Tanous 264*e60300aeSEd Tanous Adaptor adaptor; 265*e60300aeSEd Tanous 266*e60300aeSEd Tanous using BodyType = bmcweb::HttpBody; 267*e60300aeSEd Tanous boost::beast::http::response<BodyType> res; 268*e60300aeSEd Tanous std::optional<boost::beast::http::response_serializer<BodyType>> serializer; 269*e60300aeSEd Tanous boost::asio::steady_timer timer; 270*e60300aeSEd Tanous bool doingWrite = false; 271*e60300aeSEd Tanous 272*e60300aeSEd Tanous std::function<void(Connection&, const Request&)> openHandler; 273*e60300aeSEd Tanous std::function<void(Connection&)> closeHandler; 274*e60300aeSEd Tanous }; 275*e60300aeSEd Tanous } // namespace sse_socket 276*e60300aeSEd Tanous } // namespace crow 277