1 #pragma once 2 #include "boost_formatters.hpp" 3 #include "http_body.hpp" 4 #include "http_request.hpp" 5 #include "http_response.hpp" 6 7 #include <boost/asio/buffer.hpp> 8 #include <boost/asio/steady_timer.hpp> 9 #include <boost/beast/core/multi_buffer.hpp> 10 #include <boost/beast/websocket.hpp> 11 12 #include <array> 13 #include <cstddef> 14 #include <functional> 15 #include <optional> 16 17 namespace crow 18 { 19 20 namespace sse_socket 21 { 22 struct Connection : public std::enable_shared_from_this<Connection> 23 { 24 public: 25 Connection() = default; 26 27 Connection(const Connection&) = delete; 28 Connection(Connection&&) = delete; 29 Connection& operator=(const Connection&) = delete; 30 Connection& operator=(const Connection&&) = delete; 31 virtual ~Connection() = default; 32 33 virtual boost::asio::io_context& getIoContext() = 0; 34 virtual void close(std::string_view msg = "quit") = 0; 35 virtual void sendSseEvent(std::string_view id, std::string_view msg) = 0; 36 }; 37 38 template <typename Adaptor> 39 class ConnectionImpl : public Connection 40 { 41 public: 42 ConnectionImpl( 43 Adaptor&& adaptorIn, 44 std::function<void(Connection&, const Request&)> openHandlerIn, 45 std::function<void(Connection&)> closeHandlerIn) : 46 adaptor(std::move(adaptorIn)), 47 timer(static_cast<boost::asio::io_context&>( 48 adaptor.get_executor().context())), 49 openHandler(std::move(openHandlerIn)), 50 closeHandler(std::move(closeHandlerIn)) 51 52 { 53 BMCWEB_LOG_DEBUG("SseConnectionImpl: SSE constructor {}", logPtr(this)); 54 } 55 56 ConnectionImpl(const ConnectionImpl&) = delete; 57 ConnectionImpl(const ConnectionImpl&&) = delete; 58 ConnectionImpl& operator=(const ConnectionImpl&) = delete; 59 ConnectionImpl& operator=(const ConnectionImpl&&) = delete; 60 61 ~ConnectionImpl() override 62 { 63 BMCWEB_LOG_DEBUG("SSE ConnectionImpl: SSE destructor {}", logPtr(this)); 64 } 65 66 boost::asio::io_context& getIoContext() override 67 { 68 return static_cast<boost::asio::io_context&>( 69 adaptor.get_executor().context()); 70 } 71 72 void start(const Request& req) 73 { 74 BMCWEB_LOG_DEBUG("Starting SSE connection"); 75 76 res.set(boost::beast::http::field::content_type, "text/event-stream"); 77 boost::beast::http::response_serializer<BodyType>& serial = 78 serializer.emplace(res); 79 80 boost::beast::http::async_write_header( 81 adaptor, serial, 82 std::bind_front(&ConnectionImpl::sendSSEHeaderCallback, this, 83 shared_from_this(), req)); 84 } 85 86 void close(const std::string_view msg) override 87 { 88 BMCWEB_LOG_DEBUG("Closing connection with reason {}", msg); 89 // send notification to handler for cleanup 90 if (closeHandler) 91 { 92 closeHandler(*this); 93 } 94 BMCWEB_LOG_DEBUG("Closing SSE connection {} - {}", logPtr(this), msg); 95 boost::beast::get_lowest_layer(adaptor).close(); 96 } 97 98 void sendSSEHeaderCallback(const std::shared_ptr<Connection>& /*self*/, 99 const Request& req, 100 const boost::system::error_code& ec, 101 size_t /*bytesSent*/) 102 { 103 serializer.reset(); 104 if (ec) 105 { 106 BMCWEB_LOG_ERROR("Error sending header{}", ec); 107 close("async_write_header failed"); 108 return; 109 } 110 BMCWEB_LOG_DEBUG("SSE header sent - Connection established"); 111 if (!openHandler) 112 { 113 BMCWEB_LOG_CRITICAL("No open handler???"); 114 return; 115 } 116 openHandler(*this, req); 117 118 // SSE stream header sent, So let us setup monitor. 119 // Any read data on this stream will be error in case of SSE. 120 adaptor.async_read_some(boost::asio::buffer(buffer), 121 std::bind_front(&ConnectionImpl::afterReadError, 122 this, shared_from_this())); 123 } 124 125 void afterReadError(const std::shared_ptr<Connection>& /*self*/, 126 const boost::system::error_code& ec, size_t bytesRead) 127 { 128 BMCWEB_LOG_DEBUG("Read {}", bytesRead); 129 if (ec == boost::asio::error::operation_aborted) 130 { 131 return; 132 } 133 if (ec) 134 { 135 BMCWEB_LOG_ERROR("Read error: {}", ec); 136 } 137 138 close("Close SSE connection"); 139 } 140 141 void doWrite() 142 { 143 if (doingWrite) 144 { 145 return; 146 } 147 if (inputBuffer.size() == 0) 148 { 149 BMCWEB_LOG_DEBUG("inputBuffer is empty... Bailing out"); 150 return; 151 } 152 startTimeout(); 153 doingWrite = true; 154 155 adaptor.async_write_some( 156 inputBuffer.data(), 157 std::bind_front(&ConnectionImpl::doWriteCallback, this, 158 shared_from_this())); 159 } 160 161 void doWriteCallback(const std::shared_ptr<Connection>& /*self*/, 162 const boost::beast::error_code& ec, 163 size_t bytesTransferred) 164 { 165 timer.cancel(); 166 doingWrite = false; 167 inputBuffer.consume(bytesTransferred); 168 169 if (ec == boost::asio::error::eof) 170 { 171 BMCWEB_LOG_ERROR("async_write_some() SSE stream closed"); 172 close("SSE stream closed"); 173 return; 174 } 175 176 if (ec) 177 { 178 BMCWEB_LOG_ERROR("async_write_some() failed: {}", ec.message()); 179 close("async_write_some failed"); 180 return; 181 } 182 BMCWEB_LOG_DEBUG("async_write_some() bytes transferred: {}", 183 bytesTransferred); 184 185 doWrite(); 186 } 187 188 void sendSseEvent(std::string_view id, std::string_view msg) override 189 { 190 if (msg.empty()) 191 { 192 BMCWEB_LOG_DEBUG("Empty data, bailing out."); 193 return; 194 } 195 196 dataFormat(id, msg); 197 198 doWrite(); 199 } 200 201 void dataFormat(std::string_view id, std::string_view msg) 202 { 203 constexpr size_t bufferLimit = 10485760U; // 10MB 204 if (id.size() + msg.size() + inputBuffer.size() >= bufferLimit) 205 { 206 BMCWEB_LOG_ERROR("SSE Buffer overflow while waiting for client"); 207 close("Buffer overflow"); 208 return; 209 } 210 std::string rawData; 211 if (!id.empty()) 212 { 213 rawData += "id: "; 214 rawData.append(id); 215 rawData += "\n"; 216 } 217 218 rawData += "data: "; 219 for (char character : msg) 220 { 221 rawData += character; 222 if (character == '\n') 223 { 224 rawData += "data: "; 225 } 226 } 227 rawData += "\n\n"; 228 229 size_t copied = boost::asio::buffer_copy( 230 inputBuffer.prepare(rawData.size()), boost::asio::buffer(rawData)); 231 inputBuffer.commit(copied); 232 } 233 234 void startTimeout() 235 { 236 std::weak_ptr<Connection> weakSelf = weak_from_this(); 237 timer.expires_after(std::chrono::seconds(30)); 238 timer.async_wait(std::bind_front(&ConnectionImpl::onTimeoutCallback, 239 this, weak_from_this())); 240 } 241 242 void onTimeoutCallback(const std::weak_ptr<Connection>& weakSelf, 243 const boost::system::error_code& ec) 244 { 245 std::shared_ptr<Connection> self = weakSelf.lock(); 246 if (!self) 247 { 248 BMCWEB_LOG_CRITICAL("{} Failed to capture connection", 249 logPtr(self.get())); 250 return; 251 } 252 253 if (ec == boost::asio::error::operation_aborted) 254 { 255 BMCWEB_LOG_DEBUG("Timer operation aborted"); 256 // Canceled wait means the path succeeded. 257 return; 258 } 259 if (ec) 260 { 261 BMCWEB_LOG_CRITICAL("{} timer failed {}", logPtr(self.get()), ec); 262 } 263 264 BMCWEB_LOG_WARNING("{} Connection timed out, closing", 265 logPtr(self.get())); 266 267 self->close("closing connection"); 268 } 269 270 private: 271 std::array<char, 1> buffer{}; 272 boost::beast::multi_buffer inputBuffer; 273 274 Adaptor adaptor; 275 276 using BodyType = bmcweb::HttpBody; 277 boost::beast::http::response<BodyType> res; 278 std::optional<boost::beast::http::response_serializer<BodyType>> serializer; 279 boost::asio::steady_timer timer; 280 bool doingWrite = false; 281 282 std::function<void(Connection&, const Request&)> openHandler; 283 std::function<void(Connection&)> closeHandler; 284 }; 285 } // namespace sse_socket 286 } // namespace crow 287