1 // SPDX-License-Identifier: Apache-2.0 2 // SPDX-FileCopyrightText: Copyright OpenBMC Authors 3 #pragma once 4 #include "boost_formatters.hpp" 5 #include "http_body.hpp" 6 #include "http_request.hpp" 7 #include "http_response.hpp" 8 9 #include <boost/asio/buffer.hpp> 10 #include <boost/asio/steady_timer.hpp> 11 #include <boost/beast/core/multi_buffer.hpp> 12 #include <boost/beast/websocket.hpp> 13 14 #include <array> 15 #include <cstddef> 16 #include <functional> 17 #include <optional> 18 19 namespace crow 20 { 21 22 namespace sse_socket 23 { 24 struct Connection : public std::enable_shared_from_this<Connection> 25 { 26 public: 27 Connection() = default; 28 29 Connection(const Connection&) = delete; 30 Connection(Connection&&) = delete; 31 Connection& operator=(const Connection&) = delete; 32 Connection& operator=(const Connection&&) = delete; 33 virtual ~Connection() = default; 34 35 virtual boost::asio::io_context& getIoContext() = 0; 36 virtual void close(std::string_view msg = "quit") = 0; 37 virtual void sendSseEvent(std::string_view id, std::string_view msg) = 0; 38 }; 39 40 template <typename Adaptor> 41 class ConnectionImpl : public Connection 42 { 43 public: ConnectionImpl(Adaptor && adaptorIn,std::function<void (Connection &,const Request &)> openHandlerIn,std::function<void (Connection &)> closeHandlerIn)44 ConnectionImpl( 45 Adaptor&& adaptorIn, 46 std::function<void(Connection&, const Request&)> openHandlerIn, 47 std::function<void(Connection&)> closeHandlerIn) : 48 adaptor(std::move(adaptorIn)), 49 timer(static_cast<boost::asio::io_context&>( 50 adaptor.get_executor().context())), 51 openHandler(std::move(openHandlerIn)), 52 closeHandler(std::move(closeHandlerIn)) 53 54 { 55 BMCWEB_LOG_DEBUG("SseConnectionImpl: SSE constructor {}", logPtr(this)); 56 } 57 58 ConnectionImpl(const ConnectionImpl&) = delete; 59 ConnectionImpl(const ConnectionImpl&&) = delete; 60 ConnectionImpl& operator=(const ConnectionImpl&) = delete; 61 ConnectionImpl& operator=(const ConnectionImpl&&) = delete; 62 ~ConnectionImpl()63 ~ConnectionImpl() override 64 { 65 BMCWEB_LOG_DEBUG("SSE ConnectionImpl: SSE destructor {}", logPtr(this)); 66 } 67 getIoContext()68 boost::asio::io_context& getIoContext() override 69 { 70 return static_cast<boost::asio::io_context&>( 71 adaptor.get_executor().context()); 72 } 73 start(const Request & req)74 void start(const Request& req) 75 { 76 BMCWEB_LOG_DEBUG("Starting SSE connection"); 77 78 res.set(boost::beast::http::field::content_type, "text/event-stream"); 79 boost::beast::http::response_serializer<BodyType>& serial = 80 serializer.emplace(res); 81 82 boost::beast::http::async_write_header( 83 adaptor, serial, 84 std::bind_front(&ConnectionImpl::sendSSEHeaderCallback, this, 85 shared_from_this(), req)); 86 } 87 close(const std::string_view msg)88 void close(const std::string_view msg) override 89 { 90 BMCWEB_LOG_DEBUG("Closing connection with reason {}", msg); 91 // send notification to handler for cleanup 92 if (closeHandler) 93 { 94 closeHandler(*this); 95 } 96 BMCWEB_LOG_DEBUG("Closing SSE connection {} - {}", logPtr(this), msg); 97 boost::beast::get_lowest_layer(adaptor).close(); 98 } 99 sendSSEHeaderCallback(const std::shared_ptr<Connection> &,const Request & req,const boost::system::error_code & ec,size_t)100 void sendSSEHeaderCallback(const std::shared_ptr<Connection>& /*self*/, 101 const Request& req, 102 const boost::system::error_code& ec, 103 size_t /*bytesSent*/) 104 { 105 serializer.reset(); 106 if (ec) 107 { 108 BMCWEB_LOG_ERROR("Error sending header{}", ec); 109 close("async_write_header failed"); 110 return; 111 } 112 BMCWEB_LOG_DEBUG("SSE header sent - Connection established"); 113 if (!openHandler) 114 { 115 BMCWEB_LOG_CRITICAL("No open handler???"); 116 return; 117 } 118 openHandler(*this, req); 119 120 // SSE stream header sent, So let us setup monitor. 121 // Any read data on this stream will be error in case of SSE. 122 adaptor.async_read_some(boost::asio::buffer(buffer), 123 std::bind_front(&ConnectionImpl::afterReadError, 124 this, shared_from_this())); 125 } 126 afterReadError(const std::shared_ptr<Connection> &,const boost::system::error_code & ec,size_t bytesRead)127 void afterReadError(const std::shared_ptr<Connection>& /*self*/, 128 const boost::system::error_code& ec, size_t bytesRead) 129 { 130 BMCWEB_LOG_DEBUG("Read {}", bytesRead); 131 if (ec == boost::asio::error::operation_aborted) 132 { 133 return; 134 } 135 if (ec) 136 { 137 BMCWEB_LOG_ERROR("Read error: {}", ec); 138 } 139 140 close("Close SSE connection"); 141 } 142 doWrite()143 void doWrite() 144 { 145 if (doingWrite) 146 { 147 return; 148 } 149 if (inputBuffer.size() == 0) 150 { 151 BMCWEB_LOG_DEBUG("inputBuffer is empty... Bailing out"); 152 return; 153 } 154 startTimeout(); 155 doingWrite = true; 156 157 adaptor.async_write_some( 158 inputBuffer.data(), 159 std::bind_front(&ConnectionImpl::doWriteCallback, this, 160 shared_from_this())); 161 } 162 doWriteCallback(const std::shared_ptr<Connection> &,const boost::beast::error_code & ec,size_t bytesTransferred)163 void doWriteCallback(const std::shared_ptr<Connection>& /*self*/, 164 const boost::beast::error_code& ec, 165 size_t bytesTransferred) 166 { 167 timer.cancel(); 168 doingWrite = false; 169 inputBuffer.consume(bytesTransferred); 170 171 if (ec == boost::asio::error::eof) 172 { 173 BMCWEB_LOG_ERROR("async_write_some() SSE stream closed"); 174 close("SSE stream closed"); 175 return; 176 } 177 178 if (ec) 179 { 180 BMCWEB_LOG_ERROR("async_write_some() failed: {}", ec.message()); 181 close("async_write_some failed"); 182 return; 183 } 184 BMCWEB_LOG_DEBUG("async_write_some() bytes transferred: {}", 185 bytesTransferred); 186 187 doWrite(); 188 } 189 sendSseEvent(std::string_view id,std::string_view msg)190 void sendSseEvent(std::string_view id, std::string_view msg) override 191 { 192 if (msg.empty()) 193 { 194 BMCWEB_LOG_DEBUG("Empty data, bailing out."); 195 return; 196 } 197 198 dataFormat(id, msg); 199 200 doWrite(); 201 } 202 dataFormat(std::string_view id,std::string_view msg)203 void dataFormat(std::string_view id, std::string_view msg) 204 { 205 constexpr size_t bufferLimit = 10485760U; // 10MB 206 if (id.size() + msg.size() + inputBuffer.size() >= bufferLimit) 207 { 208 BMCWEB_LOG_ERROR("SSE Buffer overflow while waiting for client"); 209 close("Buffer overflow"); 210 return; 211 } 212 std::string rawData; 213 if (!id.empty()) 214 { 215 rawData += "id: "; 216 rawData.append(id); 217 rawData += "\n"; 218 } 219 220 rawData += "data: "; 221 for (char character : msg) 222 { 223 rawData += character; 224 if (character == '\n') 225 { 226 rawData += "data: "; 227 } 228 } 229 rawData += "\n\n"; 230 231 size_t copied = boost::asio::buffer_copy( 232 inputBuffer.prepare(rawData.size()), boost::asio::buffer(rawData)); 233 inputBuffer.commit(copied); 234 } 235 startTimeout()236 void startTimeout() 237 { 238 std::weak_ptr<Connection> weakSelf = weak_from_this(); 239 timer.expires_after(std::chrono::seconds(30)); 240 timer.async_wait(std::bind_front(&ConnectionImpl::onTimeoutCallback, 241 this, weak_from_this())); 242 } 243 onTimeoutCallback(const std::weak_ptr<Connection> & weakSelf,const boost::system::error_code & ec)244 void onTimeoutCallback(const std::weak_ptr<Connection>& weakSelf, 245 const boost::system::error_code& ec) 246 { 247 std::shared_ptr<Connection> self = weakSelf.lock(); 248 if (!self) 249 { 250 BMCWEB_LOG_CRITICAL("{} Failed to capture connection", 251 logPtr(self.get())); 252 return; 253 } 254 255 if (ec == boost::asio::error::operation_aborted) 256 { 257 BMCWEB_LOG_DEBUG("Timer operation aborted"); 258 // Canceled wait means the path succeeded. 259 return; 260 } 261 if (ec) 262 { 263 BMCWEB_LOG_CRITICAL("{} timer failed {}", logPtr(self.get()), ec); 264 } 265 266 BMCWEB_LOG_WARNING("{} Connection timed out, closing", 267 logPtr(self.get())); 268 269 self->close("closing connection"); 270 } 271 272 private: 273 std::array<char, 1> buffer{}; 274 boost::beast::multi_buffer inputBuffer; 275 276 Adaptor adaptor; 277 278 using BodyType = bmcweb::HttpBody; 279 boost::beast::http::response<BodyType> res; 280 std::optional<boost::beast::http::response_serializer<BodyType>> serializer; 281 boost::asio::steady_timer timer; 282 bool doingWrite = false; 283 284 std::function<void(Connection&, const Request&)> openHandler; 285 std::function<void(Connection&)> closeHandler; 286 }; 287 } // namespace sse_socket 288 } // namespace crow 289