1 #pragma once 2 #include "boost_formatters.hpp" 3 #include "http_body.hpp" 4 #include "http_request.hpp" 5 #include "http_response.hpp" 6 7 #include <boost/asio/buffer.hpp> 8 #include <boost/asio/steady_timer.hpp> 9 #include <boost/beast/core/multi_buffer.hpp> 10 #include <boost/beast/websocket.hpp> 11 12 #include <array> 13 #include <cstddef> 14 #include <functional> 15 #include <optional> 16 17 namespace crow 18 { 19 20 namespace sse_socket 21 { 22 struct Connection : public std::enable_shared_from_this<Connection> 23 { 24 public: 25 Connection() = default; 26 27 Connection(const Connection&) = delete; 28 Connection(Connection&&) = delete; 29 Connection& operator=(const Connection&) = delete; 30 Connection& operator=(const Connection&&) = delete; 31 virtual ~Connection() = default; 32 33 virtual boost::asio::io_context& getIoContext() = 0; 34 virtual void close(std::string_view msg = "quit") = 0; 35 virtual void sendSseEvent(std::string_view id, std::string_view msg) = 0; 36 }; 37 38 template <typename Adaptor> 39 class ConnectionImpl : public Connection 40 { 41 public: 42 ConnectionImpl( 43 Adaptor&& adaptorIn, 44 std::function<void(Connection&, const Request&)> openHandlerIn, 45 std::function<void(Connection&)> closeHandlerIn) : 46 adaptor(std::move(adaptorIn)), 47 timer(static_cast<boost::asio::io_context&>( 48 adaptor.get_executor().context())), 49 openHandler(std::move(openHandlerIn)), 50 closeHandler(std::move(closeHandlerIn)) 51 52 { 53 BMCWEB_LOG_DEBUG("SseConnectionImpl: SSE constructor {}", logPtr(this)); 54 } 55 56 ConnectionImpl(const ConnectionImpl&) = delete; 57 ConnectionImpl(const ConnectionImpl&&) = delete; 58 ConnectionImpl& operator=(const ConnectionImpl&) = delete; 59 ConnectionImpl& operator=(const ConnectionImpl&&) = delete; 60 61 ~ConnectionImpl() override 62 { 63 BMCWEB_LOG_DEBUG("SSE ConnectionImpl: SSE destructor {}", logPtr(this)); 64 } 65 66 boost::asio::io_context& getIoContext() override 67 { 68 return static_cast<boost::asio::io_context&>( 69 adaptor.get_executor().context()); 70 } 71 72 void start(const Request& req) 73 { 74 if (!openHandler) 75 { 76 BMCWEB_LOG_CRITICAL("No open handler???"); 77 return; 78 } 79 openHandler(*this, req); 80 sendSSEHeader(); 81 } 82 83 void close(const std::string_view msg) override 84 { 85 BMCWEB_LOG_DEBUG("Closing connection with reason {}", msg); 86 // send notification to handler for cleanup 87 if (closeHandler) 88 { 89 closeHandler(*this); 90 } 91 BMCWEB_LOG_DEBUG("Closing SSE connection {} - {}", logPtr(this), msg); 92 boost::beast::get_lowest_layer(adaptor).close(); 93 } 94 95 void sendSSEHeader() 96 { 97 BMCWEB_LOG_DEBUG("Starting SSE connection"); 98 99 res.set(boost::beast::http::field::content_type, "text/event-stream"); 100 boost::beast::http::response_serializer<BodyType>& serial = 101 serializer.emplace(res); 102 103 boost::beast::http::async_write_header( 104 adaptor, serial, 105 std::bind_front(&ConnectionImpl::sendSSEHeaderCallback, this, 106 shared_from_this())); 107 } 108 109 void sendSSEHeaderCallback(const std::shared_ptr<Connection>& /*self*/, 110 const boost::system::error_code& ec, 111 size_t /*bytesSent*/) 112 { 113 serializer.reset(); 114 if (ec) 115 { 116 BMCWEB_LOG_ERROR("Error sending header{}", ec); 117 close("async_write_header failed"); 118 return; 119 } 120 BMCWEB_LOG_DEBUG("SSE header sent - Connection established"); 121 122 // SSE stream header sent, So let us setup monitor. 123 // Any read data on this stream will be error in case of SSE. 124 adaptor.async_read_some(boost::asio::buffer(buffer), 125 std::bind_front(&ConnectionImpl::afterReadError, 126 this, shared_from_this())); 127 } 128 129 void afterReadError(const std::shared_ptr<Connection>& /*self*/, 130 const boost::system::error_code& ec, size_t bytesRead) 131 { 132 BMCWEB_LOG_DEBUG("Read {}", bytesRead); 133 if (ec == boost::asio::error::operation_aborted) 134 { 135 return; 136 } 137 if (ec) 138 { 139 BMCWEB_LOG_ERROR("Read error: {}", ec); 140 } 141 142 close("Close SSE connection"); 143 } 144 145 void doWrite() 146 { 147 if (doingWrite) 148 { 149 return; 150 } 151 if (inputBuffer.size() == 0) 152 { 153 BMCWEB_LOG_DEBUG("inputBuffer is empty... Bailing out"); 154 return; 155 } 156 startTimeout(); 157 doingWrite = true; 158 159 adaptor.async_write_some( 160 inputBuffer.data(), 161 std::bind_front(&ConnectionImpl::doWriteCallback, this, 162 shared_from_this())); 163 } 164 165 void doWriteCallback(const std::shared_ptr<Connection>& /*self*/, 166 const boost::beast::error_code& ec, 167 size_t bytesTransferred) 168 { 169 timer.cancel(); 170 doingWrite = false; 171 inputBuffer.consume(bytesTransferred); 172 173 if (ec == boost::asio::error::eof) 174 { 175 BMCWEB_LOG_ERROR("async_write_some() SSE stream closed"); 176 close("SSE stream closed"); 177 return; 178 } 179 180 if (ec) 181 { 182 BMCWEB_LOG_ERROR("async_write_some() failed: {}", ec.message()); 183 close("async_write_some failed"); 184 return; 185 } 186 BMCWEB_LOG_DEBUG("async_write_some() bytes transferred: {}", 187 bytesTransferred); 188 189 doWrite(); 190 } 191 192 void sendSseEvent(std::string_view id, std::string_view msg) override 193 { 194 if (msg.empty()) 195 { 196 BMCWEB_LOG_DEBUG("Empty data, bailing out."); 197 return; 198 } 199 200 dataFormat(id, msg); 201 202 doWrite(); 203 } 204 205 void dataFormat(std::string_view id, std::string_view msg) 206 { 207 constexpr size_t bufferLimit = 10485760U; // 10MB 208 if (id.size() + msg.size() + inputBuffer.size() >= bufferLimit) 209 { 210 BMCWEB_LOG_ERROR("SSE Buffer overflow while waiting for client"); 211 close("Buffer overflow"); 212 return; 213 } 214 std::string rawData; 215 if (!id.empty()) 216 { 217 rawData += "id: "; 218 rawData.append(id); 219 rawData += "\n"; 220 } 221 222 rawData += "data: "; 223 for (char character : msg) 224 { 225 rawData += character; 226 if (character == '\n') 227 { 228 rawData += "data: "; 229 } 230 } 231 rawData += "\n\n"; 232 233 size_t copied = boost::asio::buffer_copy( 234 inputBuffer.prepare(rawData.size()), boost::asio::buffer(rawData)); 235 inputBuffer.commit(copied); 236 } 237 238 void startTimeout() 239 { 240 std::weak_ptr<Connection> weakSelf = weak_from_this(); 241 timer.expires_after(std::chrono::seconds(30)); 242 timer.async_wait(std::bind_front(&ConnectionImpl::onTimeoutCallback, 243 this, weak_from_this())); 244 } 245 246 void onTimeoutCallback(const std::weak_ptr<Connection>& weakSelf, 247 const boost::system::error_code& ec) 248 { 249 std::shared_ptr<Connection> self = weakSelf.lock(); 250 if (!self) 251 { 252 BMCWEB_LOG_CRITICAL("{} Failed to capture connection", 253 logPtr(self.get())); 254 return; 255 } 256 257 if (ec == boost::asio::error::operation_aborted) 258 { 259 BMCWEB_LOG_DEBUG("Timer operation aborted"); 260 // Canceled wait means the path succeeded. 261 return; 262 } 263 if (ec) 264 { 265 BMCWEB_LOG_CRITICAL("{} timer failed {}", logPtr(self.get()), ec); 266 } 267 268 BMCWEB_LOG_WARNING("{} Connection timed out, closing", 269 logPtr(self.get())); 270 271 self->close("closing connection"); 272 } 273 274 private: 275 std::array<char, 1> buffer{}; 276 boost::beast::multi_buffer inputBuffer; 277 278 Adaptor adaptor; 279 280 using BodyType = bmcweb::HttpBody; 281 boost::beast::http::response<BodyType> res; 282 std::optional<boost::beast::http::response_serializer<BodyType>> serializer; 283 boost::asio::steady_timer timer; 284 bool doingWrite = false; 285 286 std::function<void(Connection&, const Request&)> openHandler; 287 std::function<void(Connection&)> closeHandler; 288 }; 289 } // namespace sse_socket 290 } // namespace crow 291