1 #pragma once 2 #include "http_request.hpp" 3 #include "http_response.hpp" 4 5 #include <boost/asio/buffer.hpp> 6 #include <boost/asio/steady_timer.hpp> 7 #include <boost/beast/core/multi_buffer.hpp> 8 #include <boost/beast/websocket.hpp> 9 10 #include <array> 11 #include <cstddef> 12 #include <functional> 13 #include <optional> 14 15 namespace crow 16 { 17 18 namespace sse_socket 19 { 20 struct Connection : std::enable_shared_from_this<Connection> 21 { 22 public: 23 Connection() = default; 24 25 Connection(const Connection&) = delete; 26 Connection(Connection&&) = delete; 27 Connection& operator=(const Connection&) = delete; 28 Connection& operator=(const Connection&&) = delete; 29 virtual ~Connection() = default; 30 31 virtual boost::asio::io_context& getIoContext() = 0; 32 virtual void close(std::string_view msg = "quit") = 0; 33 virtual void sendEvent(std::string_view id, std::string_view msg) = 0; 34 }; 35 36 template <typename Adaptor> 37 class ConnectionImpl : public Connection 38 { 39 public: 40 ConnectionImpl(boost::asio::io_context& ioIn, Adaptor&& adaptorIn, 41 std::function<void(Connection&)> openHandlerIn, 42 std::function<void(Connection&)> closeHandlerIn) : 43 adaptor(std::move(adaptorIn)), 44 ioc(ioIn), timer(ioc), openHandler(std::move(openHandlerIn)), 45 closeHandler(std::move(closeHandlerIn)) 46 47 { 48 BMCWEB_LOG_DEBUG("SseConnectionImpl: SSE constructor {}", logPtr(this)); 49 } 50 51 ConnectionImpl(const ConnectionImpl&) = delete; 52 ConnectionImpl(const ConnectionImpl&&) = delete; 53 ConnectionImpl& operator=(const ConnectionImpl&) = delete; 54 ConnectionImpl& operator=(const ConnectionImpl&&) = delete; 55 56 ~ConnectionImpl() override 57 { 58 BMCWEB_LOG_DEBUG("SSE ConnectionImpl: SSE destructor {}", logPtr(this)); 59 } 60 61 boost::asio::io_context& getIoContext() override 62 { 63 return static_cast<boost::asio::io_context&>( 64 adaptor.get_executor().context()); 65 } 66 67 void start() 68 { 69 if (!openHandler) 70 { 71 BMCWEB_LOG_CRITICAL("No open handler???"); 72 return; 73 } 74 openHandler(*this); 75 sendSSEHeader(); 76 } 77 78 void close(const std::string_view msg) override 79 { 80 BMCWEB_LOG_DEBUG("Closing connection with reason {}", msg); 81 // send notification to handler for cleanup 82 if (closeHandler) 83 { 84 closeHandler(*this); 85 } 86 BMCWEB_LOG_DEBUG("Closing SSE connection {} - {}", logPtr(this), msg); 87 boost::beast::get_lowest_layer(adaptor).close(); 88 } 89 90 void sendSSEHeader() 91 { 92 BMCWEB_LOG_DEBUG("Starting SSE connection"); 93 94 res.set(boost::beast::http::field::content_type, "text/event-stream"); 95 boost::beast::http::response_serializer<BodyType>& serial = 96 serializer.emplace(res); 97 98 boost::beast::http::async_write_header( 99 adaptor, serial, 100 std::bind_front(&ConnectionImpl::sendSSEHeaderCallback, this, 101 shared_from_this())); 102 } 103 104 void sendSSEHeaderCallback(const std::shared_ptr<Connection>& /*self*/, 105 const boost::system::error_code& ec, 106 size_t /*bytesSent*/) 107 { 108 serializer.reset(); 109 if (ec) 110 { 111 BMCWEB_LOG_ERROR("Error sending header{}", ec); 112 close("async_write_header failed"); 113 return; 114 } 115 BMCWEB_LOG_DEBUG("SSE header sent - Connection established"); 116 117 // SSE stream header sent, So let us setup monitor. 118 // Any read data on this stream will be error in case of SSE. 119 adaptor.async_read_some(boost::asio::buffer(buffer), 120 std::bind_front(&ConnectionImpl::afterReadError, 121 this, shared_from_this())); 122 } 123 124 void afterReadError(const std::shared_ptr<Connection>& /*self*/, 125 const boost::system::error_code& ec, size_t bytesRead) 126 { 127 BMCWEB_LOG_DEBUG("Read {}", bytesRead); 128 if (ec == boost::asio::error::operation_aborted) 129 { 130 return; 131 } 132 if (ec) 133 { 134 BMCWEB_LOG_ERROR("Read error: {}", ec); 135 } 136 137 close("Close SSE connection"); 138 } 139 140 void doWrite() 141 { 142 if (doingWrite) 143 { 144 return; 145 } 146 if (inputBuffer.size() == 0) 147 { 148 BMCWEB_LOG_DEBUG("inputBuffer is empty... Bailing out"); 149 return; 150 } 151 startTimeout(); 152 doingWrite = true; 153 154 adaptor.async_write_some( 155 inputBuffer.data(), 156 std::bind_front(&ConnectionImpl::doWriteCallback, this, 157 shared_from_this())); 158 } 159 160 void doWriteCallback(const std::shared_ptr<Connection>& /*self*/, 161 const boost::beast::error_code& ec, 162 size_t bytesTransferred) 163 { 164 timer.cancel(); 165 doingWrite = false; 166 inputBuffer.consume(bytesTransferred); 167 168 if (ec == boost::asio::error::eof) 169 { 170 BMCWEB_LOG_ERROR("async_write_some() SSE stream closed"); 171 close("SSE stream closed"); 172 return; 173 } 174 175 if (ec) 176 { 177 BMCWEB_LOG_ERROR("async_write_some() failed: {}", ec.message()); 178 close("async_write_some failed"); 179 return; 180 } 181 BMCWEB_LOG_DEBUG("async_write_some() bytes transferred: {}", 182 bytesTransferred); 183 184 doWrite(); 185 } 186 187 void sendEvent(std::string_view id, std::string_view msg) override 188 { 189 if (msg.empty()) 190 { 191 BMCWEB_LOG_DEBUG("Empty data, bailing out."); 192 return; 193 } 194 195 dataFormat(id, msg); 196 197 doWrite(); 198 } 199 200 void dataFormat(std::string_view id, std::string_view msg) 201 { 202 constexpr size_t bufferLimit = 10485760U; // 10MB 203 if (id.size() + msg.size() + inputBuffer.size() >= bufferLimit) 204 { 205 BMCWEB_LOG_ERROR("SSE Buffer overflow while waiting for client"); 206 close("Buffer overflow"); 207 return; 208 } 209 std::string rawData; 210 if (!id.empty()) 211 { 212 rawData += "id: "; 213 rawData.append(id); 214 rawData += "\n"; 215 } 216 217 rawData += "data: "; 218 for (char character : msg) 219 { 220 rawData += character; 221 if (character == '\n') 222 { 223 rawData += "data: "; 224 } 225 } 226 rawData += "\n\n"; 227 228 boost::asio::buffer_copy(inputBuffer.prepare(rawData.size()), 229 boost::asio::buffer(rawData)); 230 inputBuffer.commit(rawData.size()); 231 } 232 233 void startTimeout() 234 { 235 std::weak_ptr<Connection> weakSelf = weak_from_this(); 236 timer.expires_after(std::chrono::seconds(30)); 237 timer.async_wait(std::bind_front(&ConnectionImpl::onTimeoutCallback, 238 this, weak_from_this())); 239 } 240 241 void onTimeoutCallback(const std::weak_ptr<Connection>& weakSelf, 242 const boost::system::error_code& ec) 243 { 244 std::shared_ptr<Connection> self = weakSelf.lock(); 245 if (!self) 246 { 247 BMCWEB_LOG_CRITICAL("{} Failed to capture connection", 248 logPtr(self.get())); 249 return; 250 } 251 252 if (ec == boost::asio::error::operation_aborted) 253 { 254 BMCWEB_LOG_DEBUG("Timer operation aborted"); 255 // Canceled wait means the path succeeded. 256 return; 257 } 258 if (ec) 259 { 260 BMCWEB_LOG_CRITICAL("{} timer failed {}", logPtr(self.get()), ec); 261 } 262 263 BMCWEB_LOG_WARNING("{} Connection timed out, closing", 264 logPtr(self.get())); 265 266 self->close("closing connection"); 267 } 268 269 private: 270 std::array<char, 1> buffer{}; 271 boost::beast::multi_buffer inputBuffer; 272 273 Adaptor adaptor; 274 275 using BodyType = bmcweb::FileBody; 276 boost::beast::http::response<BodyType> res; 277 std::optional<boost::beast::http::response_serializer<BodyType>> serializer; 278 boost::asio::io_context& ioc; 279 boost::asio::steady_timer timer; 280 bool doingWrite = false; 281 282 std::function<void(Connection&)> openHandler; 283 std::function<void(Connection&)> closeHandler; 284 }; 285 } // namespace sse_socket 286 } // namespace crow 287