1 #pragma once 2 #include "dbus_singleton.hpp" 3 #include "http_request.hpp" 4 #include "http_response.hpp" 5 6 #include <boost/asio/buffer.hpp> 7 #include <boost/asio/steady_timer.hpp> 8 #include <boost/beast/core/multi_buffer.hpp> 9 #include <boost/beast/http/buffer_body.hpp> 10 #include <boost/beast/websocket.hpp> 11 12 #include <array> 13 #include <functional> 14 15 #ifdef BMCWEB_ENABLE_SSL 16 #include <boost/beast/websocket/ssl.hpp> 17 #endif 18 19 namespace crow 20 { 21 22 namespace sse_socket 23 { 24 struct Connection : std::enable_shared_from_this<Connection> 25 { 26 public: 27 Connection() = default; 28 29 Connection(const Connection&) = delete; 30 Connection(Connection&&) = delete; 31 Connection& operator=(const Connection&) = delete; 32 Connection& operator=(const Connection&&) = delete; 33 virtual ~Connection() = default; 34 35 virtual boost::asio::io_context& getIoContext() = 0; 36 virtual void close(std::string_view msg = "quit") = 0; 37 virtual void sendEvent(std::string_view id, std::string_view msg) = 0; 38 }; 39 40 template <typename Adaptor> 41 class ConnectionImpl : public Connection 42 { 43 public: 44 ConnectionImpl(Adaptor&& adaptorIn, 45 std::function<void(Connection&)> openHandlerIn, 46 std::function<void(Connection&)> closeHandlerIn) : 47 adaptor(std::move(adaptorIn)), 48 timer(ioc), openHandler(std::move(openHandlerIn)), 49 closeHandler(std::move(closeHandlerIn)) 50 { 51 BMCWEB_LOG_DEBUG("SseConnectionImpl: SSE constructor {}", logPtr(this)); 52 } 53 54 ConnectionImpl(const ConnectionImpl&) = delete; 55 ConnectionImpl(const ConnectionImpl&&) = delete; 56 ConnectionImpl& operator=(const ConnectionImpl&) = delete; 57 ConnectionImpl& operator=(const ConnectionImpl&&) = delete; 58 59 ~ConnectionImpl() override 60 { 61 BMCWEB_LOG_DEBUG("SSE ConnectionImpl: SSE destructor {}", logPtr(this)); 62 } 63 64 boost::asio::io_context& getIoContext() override 65 { 66 return static_cast<boost::asio::io_context&>( 67 adaptor.get_executor().context()); 68 } 69 70 void start() 71 { 72 if (!openHandler) 73 { 74 BMCWEB_LOG_CRITICAL("No open handler???"); 75 return; 76 } 77 openHandler(*this); 78 } 79 80 void close(const std::string_view msg) override 81 { 82 // send notification to handler for cleanup 83 if (closeHandler) 84 { 85 closeHandler(*this); 86 } 87 BMCWEB_LOG_DEBUG("Closing SSE connection {} - {}", logPtr(this), msg); 88 boost::beast::get_lowest_layer(adaptor).close(); 89 } 90 91 void sendSSEHeader() 92 { 93 BMCWEB_LOG_DEBUG("Starting SSE connection"); 94 using BodyType = boost::beast::http::buffer_body; 95 boost::beast::http::response<BodyType> res( 96 boost::beast::http::status::ok, 11, BodyType{}); 97 res.set(boost::beast::http::field::content_type, "text/event-stream"); 98 res.body().more = true; 99 boost::beast::http::response_serializer<BodyType>& serial = 100 serializer.emplace(std::move(res)); 101 102 boost::beast::http::async_write_header( 103 adaptor, serial, 104 std::bind_front(&ConnectionImpl::sendSSEHeaderCallback, this, 105 shared_from_this())); 106 } 107 108 void sendSSEHeaderCallback(const std::shared_ptr<Connection>& /*self*/, 109 const boost::system::error_code& ec, 110 size_t /*bytesSent*/) 111 { 112 serializer.reset(); 113 if (ec) 114 { 115 BMCWEB_LOG_ERROR("Error sending header{}", ec); 116 close("async_write_header failed"); 117 return; 118 } 119 BMCWEB_LOG_DEBUG("SSE header sent - Connection established"); 120 121 serializer.reset(); 122 123 // SSE stream header sent, So let us setup monitor. 124 // Any read data on this stream will be error in case of SSE. 125 126 adaptor.async_wait(boost::asio::ip::tcp::socket::wait_error, 127 std::bind_front(&ConnectionImpl::afterReadError, 128 this, shared_from_this())); 129 } 130 131 void afterReadError(const std::shared_ptr<Connection>& /*self*/, 132 const boost::system::error_code& ec) 133 { 134 if (ec == boost::asio::error::operation_aborted) 135 { 136 return; 137 } 138 if (ec) 139 { 140 BMCWEB_LOG_ERROR("Read error: {}", ec); 141 } 142 143 close("Close SSE connection"); 144 } 145 146 void doWrite() 147 { 148 if (doingWrite) 149 { 150 return; 151 } 152 if (inputBuffer.size() == 0) 153 { 154 BMCWEB_LOG_DEBUG("inputBuffer is empty... Bailing out"); 155 return; 156 } 157 startTimeout(); 158 doingWrite = true; 159 160 adaptor.async_write_some( 161 inputBuffer.data(), 162 std::bind_front(&ConnectionImpl::doWriteCallback, this, 163 weak_from_this())); 164 } 165 166 void doWriteCallback(const std::weak_ptr<Connection>& weak, 167 const boost::beast::error_code& ec, 168 size_t bytesTransferred) 169 { 170 auto self = weak.lock(); 171 if (self == nullptr) 172 { 173 return; 174 } 175 timer.cancel(); 176 doingWrite = false; 177 inputBuffer.consume(bytesTransferred); 178 179 if (ec == boost::asio::error::eof) 180 { 181 BMCWEB_LOG_ERROR("async_write_some() SSE stream closed"); 182 close("SSE stream closed"); 183 return; 184 } 185 186 if (ec) 187 { 188 BMCWEB_LOG_ERROR("async_write_some() failed: {}", ec.message()); 189 close("async_write_some failed"); 190 return; 191 } 192 BMCWEB_LOG_DEBUG("async_write_some() bytes transferred: {}", 193 bytesTransferred); 194 195 doWrite(); 196 } 197 198 void sendEvent(std::string_view id, std::string_view msg) override 199 { 200 if (msg.empty()) 201 { 202 BMCWEB_LOG_DEBUG("Empty data, bailing out."); 203 return; 204 } 205 206 dataFormat(id, msg); 207 208 doWrite(); 209 } 210 211 void dataFormat(std::string_view id, std::string_view msg) 212 { 213 std::string rawData; 214 if (!id.empty()) 215 { 216 rawData += "id: "; 217 rawData.append(id); 218 rawData += "\n"; 219 } 220 221 rawData += "data: "; 222 for (char character : msg) 223 { 224 rawData += character; 225 if (character == '\n') 226 { 227 rawData += "data: "; 228 } 229 } 230 rawData += "\n\n"; 231 232 boost::asio::buffer_copy(inputBuffer.prepare(rawData.size()), 233 boost::asio::buffer(rawData)); 234 inputBuffer.commit(rawData.size()); 235 } 236 237 void startTimeout() 238 { 239 std::weak_ptr<Connection> weakSelf = weak_from_this(); 240 timer.expires_after(std::chrono::seconds(30)); 241 timer.async_wait(std::bind_front(&ConnectionImpl::onTimeoutCallback, 242 this, weak_from_this())); 243 } 244 245 void onTimeoutCallback(const std::weak_ptr<Connection>& weakSelf, 246 const boost::system::error_code& ec) 247 { 248 std::shared_ptr<Connection> self = weakSelf.lock(); 249 if (!self) 250 { 251 BMCWEB_LOG_CRITICAL("{} Failed to capture connection", 252 logPtr(self.get())); 253 return; 254 } 255 256 if (ec == boost::asio::error::operation_aborted) 257 { 258 BMCWEB_LOG_DEBUG("operation aborted"); 259 // Canceled wait means the path succeeded. 260 return; 261 } 262 if (ec) 263 { 264 BMCWEB_LOG_CRITICAL("{} timer failed {}", logPtr(self.get()), ec); 265 } 266 267 BMCWEB_LOG_WARNING("{}Connection timed out, closing", 268 logPtr(self.get())); 269 270 self->close("closing connection"); 271 } 272 273 private: 274 Adaptor adaptor; 275 276 boost::beast::multi_buffer inputBuffer; 277 278 std::optional<boost::beast::http::response_serializer< 279 boost::beast::http::buffer_body>> 280 serializer; 281 boost::asio::io_context& ioc = 282 crow::connections::systemBus->get_io_context(); 283 boost::asio::steady_timer timer; 284 bool doingWrite = false; 285 286 std::function<void(Connection&)> openHandler; 287 std::function<void(Connection&)> closeHandler; 288 }; 289 } // namespace sse_socket 290 } // namespace crow 291