1 #pragma once 2 #include "http_body.hpp" 3 #include "http_request.hpp" 4 #include "http_response.hpp" 5 6 #include <boost/asio/buffer.hpp> 7 #include <boost/asio/steady_timer.hpp> 8 #include <boost/beast/core/multi_buffer.hpp> 9 #include <boost/beast/websocket.hpp> 10 11 #include <array> 12 #include <cstddef> 13 #include <functional> 14 #include <optional> 15 16 namespace crow 17 { 18 19 namespace sse_socket 20 { 21 struct Connection : public std::enable_shared_from_this<Connection> 22 { 23 public: 24 Connection() = default; 25 26 Connection(const Connection&) = delete; 27 Connection(Connection&&) = delete; 28 Connection& operator=(const Connection&) = delete; 29 Connection& operator=(const Connection&&) = delete; 30 virtual ~Connection() = default; 31 32 virtual boost::asio::io_context& getIoContext() = 0; 33 virtual void close(std::string_view msg = "quit") = 0; 34 virtual void sendEvent(std::string_view id, std::string_view msg) = 0; 35 }; 36 37 template <typename Adaptor> 38 class ConnectionImpl : public Connection 39 { 40 public: 41 ConnectionImpl( 42 Adaptor&& adaptorIn, 43 std::function<void(Connection&, const Request&)> openHandlerIn, 44 std::function<void(Connection&)> closeHandlerIn) : 45 adaptor(std::move(adaptorIn)), 46 timer(static_cast<boost::asio::io_context&>( 47 adaptor.get_executor().context())), 48 openHandler(std::move(openHandlerIn)), 49 closeHandler(std::move(closeHandlerIn)) 50 51 { 52 BMCWEB_LOG_DEBUG("SseConnectionImpl: SSE constructor {}", logPtr(this)); 53 } 54 55 ConnectionImpl(const ConnectionImpl&) = delete; 56 ConnectionImpl(const ConnectionImpl&&) = delete; 57 ConnectionImpl& operator=(const ConnectionImpl&) = delete; 58 ConnectionImpl& operator=(const ConnectionImpl&&) = delete; 59 60 ~ConnectionImpl() override 61 { 62 BMCWEB_LOG_DEBUG("SSE ConnectionImpl: SSE destructor {}", logPtr(this)); 63 } 64 65 boost::asio::io_context& getIoContext() override 66 { 67 return static_cast<boost::asio::io_context&>( 68 adaptor.get_executor().context()); 69 } 70 71 void start(const Request& req) 72 { 73 if (!openHandler) 74 { 75 BMCWEB_LOG_CRITICAL("No open handler???"); 76 return; 77 } 78 openHandler(*this, req); 79 sendSSEHeader(); 80 } 81 82 void close(const std::string_view msg) override 83 { 84 BMCWEB_LOG_DEBUG("Closing connection with reason {}", msg); 85 // send notification to handler for cleanup 86 if (closeHandler) 87 { 88 closeHandler(*this); 89 } 90 BMCWEB_LOG_DEBUG("Closing SSE connection {} - {}", logPtr(this), msg); 91 boost::beast::get_lowest_layer(adaptor).close(); 92 } 93 94 void sendSSEHeader() 95 { 96 BMCWEB_LOG_DEBUG("Starting SSE connection"); 97 98 res.set(boost::beast::http::field::content_type, "text/event-stream"); 99 boost::beast::http::response_serializer<BodyType>& serial = 100 serializer.emplace(res); 101 102 boost::beast::http::async_write_header( 103 adaptor, serial, 104 std::bind_front(&ConnectionImpl::sendSSEHeaderCallback, this, 105 shared_from_this())); 106 } 107 108 void sendSSEHeaderCallback(const std::shared_ptr<Connection>& /*self*/, 109 const boost::system::error_code& ec, 110 size_t /*bytesSent*/) 111 { 112 serializer.reset(); 113 if (ec) 114 { 115 BMCWEB_LOG_ERROR("Error sending header{}", ec); 116 close("async_write_header failed"); 117 return; 118 } 119 BMCWEB_LOG_DEBUG("SSE header sent - Connection established"); 120 121 // SSE stream header sent, So let us setup monitor. 122 // Any read data on this stream will be error in case of SSE. 123 adaptor.async_read_some(boost::asio::buffer(buffer), 124 std::bind_front(&ConnectionImpl::afterReadError, 125 this, shared_from_this())); 126 } 127 128 void afterReadError(const std::shared_ptr<Connection>& /*self*/, 129 const boost::system::error_code& ec, size_t bytesRead) 130 { 131 BMCWEB_LOG_DEBUG("Read {}", bytesRead); 132 if (ec == boost::asio::error::operation_aborted) 133 { 134 return; 135 } 136 if (ec) 137 { 138 BMCWEB_LOG_ERROR("Read error: {}", ec); 139 } 140 141 close("Close SSE connection"); 142 } 143 144 void doWrite() 145 { 146 if (doingWrite) 147 { 148 return; 149 } 150 if (inputBuffer.size() == 0) 151 { 152 BMCWEB_LOG_DEBUG("inputBuffer is empty... Bailing out"); 153 return; 154 } 155 startTimeout(); 156 doingWrite = true; 157 158 adaptor.async_write_some( 159 inputBuffer.data(), 160 std::bind_front(&ConnectionImpl::doWriteCallback, this, 161 shared_from_this())); 162 } 163 164 void doWriteCallback(const std::shared_ptr<Connection>& /*self*/, 165 const boost::beast::error_code& ec, 166 size_t bytesTransferred) 167 { 168 timer.cancel(); 169 doingWrite = false; 170 inputBuffer.consume(bytesTransferred); 171 172 if (ec == boost::asio::error::eof) 173 { 174 BMCWEB_LOG_ERROR("async_write_some() SSE stream closed"); 175 close("SSE stream closed"); 176 return; 177 } 178 179 if (ec) 180 { 181 BMCWEB_LOG_ERROR("async_write_some() failed: {}", ec.message()); 182 close("async_write_some failed"); 183 return; 184 } 185 BMCWEB_LOG_DEBUG("async_write_some() bytes transferred: {}", 186 bytesTransferred); 187 188 doWrite(); 189 } 190 191 void sendEvent(std::string_view id, std::string_view msg) override 192 { 193 if (msg.empty()) 194 { 195 BMCWEB_LOG_DEBUG("Empty data, bailing out."); 196 return; 197 } 198 199 dataFormat(id, msg); 200 201 doWrite(); 202 } 203 204 void dataFormat(std::string_view id, std::string_view msg) 205 { 206 constexpr size_t bufferLimit = 10485760U; // 10MB 207 if (id.size() + msg.size() + inputBuffer.size() >= bufferLimit) 208 { 209 BMCWEB_LOG_ERROR("SSE Buffer overflow while waiting for client"); 210 close("Buffer overflow"); 211 return; 212 } 213 std::string rawData; 214 if (!id.empty()) 215 { 216 rawData += "id: "; 217 rawData.append(id); 218 rawData += "\n"; 219 } 220 221 rawData += "data: "; 222 for (char character : msg) 223 { 224 rawData += character; 225 if (character == '\n') 226 { 227 rawData += "data: "; 228 } 229 } 230 rawData += "\n\n"; 231 232 size_t copied = boost::asio::buffer_copy( 233 inputBuffer.prepare(rawData.size()), boost::asio::buffer(rawData)); 234 inputBuffer.commit(copied); 235 } 236 237 void startTimeout() 238 { 239 std::weak_ptr<Connection> weakSelf = weak_from_this(); 240 timer.expires_after(std::chrono::seconds(30)); 241 timer.async_wait(std::bind_front(&ConnectionImpl::onTimeoutCallback, 242 this, weak_from_this())); 243 } 244 245 void onTimeoutCallback(const std::weak_ptr<Connection>& weakSelf, 246 const boost::system::error_code& ec) 247 { 248 std::shared_ptr<Connection> self = weakSelf.lock(); 249 if (!self) 250 { 251 BMCWEB_LOG_CRITICAL("{} Failed to capture connection", 252 logPtr(self.get())); 253 return; 254 } 255 256 if (ec == boost::asio::error::operation_aborted) 257 { 258 BMCWEB_LOG_DEBUG("Timer operation aborted"); 259 // Canceled wait means the path succeeded. 260 return; 261 } 262 if (ec) 263 { 264 BMCWEB_LOG_CRITICAL("{} timer failed {}", logPtr(self.get()), ec); 265 } 266 267 BMCWEB_LOG_WARNING("{} Connection timed out, closing", 268 logPtr(self.get())); 269 270 self->close("closing connection"); 271 } 272 273 private: 274 std::array<char, 1> buffer{}; 275 boost::beast::multi_buffer inputBuffer; 276 277 Adaptor adaptor; 278 279 using BodyType = bmcweb::HttpBody; 280 boost::beast::http::response<BodyType> res; 281 std::optional<boost::beast::http::response_serializer<BodyType>> serializer; 282 boost::asio::steady_timer timer; 283 bool doingWrite = false; 284 285 std::function<void(Connection&, const Request&)> openHandler; 286 std::function<void(Connection&)> closeHandler; 287 }; 288 } // namespace sse_socket 289 } // namespace crow 290