1 // SPDX-License-Identifier: Apache-2.0 2 // SPDX-FileCopyrightText: Copyright OpenBMC Authors 3 #pragma once 4 #include "boost_formatters.hpp" 5 #include "http_body.hpp" 6 #include "http_request.hpp" 7 #include "io_context_singleton.hpp" 8 #include "logging.hpp" 9 #include "server_sent_event.hpp" 10 11 #include <boost/asio/buffer.hpp> 12 #include <boost/asio/error.hpp> 13 #include <boost/asio/steady_timer.hpp> 14 #include <boost/beast/core/error.hpp> 15 #include <boost/beast/core/multi_buffer.hpp> 16 #include <boost/beast/http/field.hpp> 17 #include <boost/beast/http/serializer.hpp> 18 #include <boost/beast/http/write.hpp> 19 20 #include <array> 21 #include <chrono> 22 #include <cstddef> 23 #include <functional> 24 #include <memory> 25 #include <optional> 26 #include <string> 27 #include <string_view> 28 #include <utility> 29 30 namespace crow 31 { 32 33 namespace sse_socket 34 { 35 36 template <typename Adaptor> 37 class ConnectionImpl : public Connection 38 { 39 public: ConnectionImpl(Adaptor && adaptorIn,std::function<void (Connection &,const Request &)> openHandlerIn,std::function<void (Connection &)> closeHandlerIn)40 ConnectionImpl( 41 Adaptor&& adaptorIn, 42 std::function<void(Connection&, const Request&)> openHandlerIn, 43 std::function<void(Connection&)> closeHandlerIn) : 44 adaptor(std::move(adaptorIn)), timer(getIoContext()), 45 openHandler(std::move(openHandlerIn)), 46 closeHandler(std::move(closeHandlerIn)) 47 48 { 49 BMCWEB_LOG_DEBUG("SseConnectionImpl: SSE constructor {}", logPtr(this)); 50 } 51 52 ConnectionImpl(const ConnectionImpl&) = delete; 53 ConnectionImpl(const ConnectionImpl&&) = delete; 54 ConnectionImpl& operator=(const ConnectionImpl&) = delete; 55 ConnectionImpl& operator=(const ConnectionImpl&&) = delete; 56 ~ConnectionImpl()57 ~ConnectionImpl() override 58 { 59 BMCWEB_LOG_DEBUG("SSE ConnectionImpl: SSE destructor {}", logPtr(this)); 60 } 61 start(const Request & req)62 void start(const Request& req) 63 { 64 BMCWEB_LOG_DEBUG("Starting SSE connection"); 65 66 res.set(boost::beast::http::field::content_type, "text/event-stream"); 67 boost::beast::http::response_serializer<BodyType>& serial = 68 serializer.emplace(res); 69 70 boost::beast::http::async_write_header( 71 adaptor, serial, 72 std::bind_front(&ConnectionImpl::sendSSEHeaderCallback, this, 73 shared_from_this(), req)); 74 } 75 close(const std::string_view msg)76 void close(const std::string_view msg) override 77 { 78 BMCWEB_LOG_DEBUG("Closing connection with reason {}", msg); 79 // send notification to handler for cleanup 80 if (closeHandler) 81 { 82 closeHandler(*this); 83 } 84 BMCWEB_LOG_DEBUG("Closing SSE connection {} - {}", logPtr(this), msg); 85 boost::beast::get_lowest_layer(adaptor).close(); 86 } 87 sendSSEHeaderCallback(const std::shared_ptr<Connection> &,const Request & req,const boost::system::error_code & ec,size_t)88 void sendSSEHeaderCallback(const std::shared_ptr<Connection>& /*self*/, 89 const Request& req, 90 const boost::system::error_code& ec, 91 size_t /*bytesSent*/) 92 { 93 serializer.reset(); 94 if (ec) 95 { 96 BMCWEB_LOG_ERROR("Error sending header{}", ec); 97 close("async_write_header failed"); 98 return; 99 } 100 BMCWEB_LOG_DEBUG("SSE header sent - Connection established"); 101 if (!openHandler) 102 { 103 BMCWEB_LOG_CRITICAL("No open handler???"); 104 return; 105 } 106 openHandler(*this, req); 107 108 // SSE stream header sent, So let us setup monitor. 109 // Any read data on this stream will be error in case of SSE. 110 adaptor.async_read_some(boost::asio::buffer(buffer), 111 std::bind_front(&ConnectionImpl::afterReadError, 112 this, shared_from_this())); 113 } 114 afterReadError(const std::shared_ptr<Connection> &,const boost::system::error_code & ec,size_t bytesRead)115 void afterReadError(const std::shared_ptr<Connection>& /*self*/, 116 const boost::system::error_code& ec, size_t bytesRead) 117 { 118 BMCWEB_LOG_DEBUG("Read {}", bytesRead); 119 if (ec == boost::asio::error::operation_aborted) 120 { 121 return; 122 } 123 if (ec) 124 { 125 BMCWEB_LOG_ERROR("Read error: {}", ec); 126 } 127 128 close("Close SSE connection"); 129 } 130 doWrite()131 void doWrite() 132 { 133 if (doingWrite) 134 { 135 return; 136 } 137 if (inputBuffer.size() == 0) 138 { 139 BMCWEB_LOG_DEBUG("inputBuffer is empty... Bailing out"); 140 return; 141 } 142 startTimeout(); 143 doingWrite = true; 144 145 adaptor.async_write_some( 146 inputBuffer.data(), 147 std::bind_front(&ConnectionImpl::doWriteCallback, this, 148 shared_from_this())); 149 } 150 doWriteCallback(const std::shared_ptr<Connection> &,const boost::beast::error_code & ec,size_t bytesTransferred)151 void doWriteCallback(const std::shared_ptr<Connection>& /*self*/, 152 const boost::beast::error_code& ec, 153 size_t bytesTransferred) 154 { 155 timer.cancel(); 156 doingWrite = false; 157 inputBuffer.consume(bytesTransferred); 158 159 if (ec == boost::asio::error::eof) 160 { 161 BMCWEB_LOG_ERROR("async_write_some() SSE stream closed"); 162 close("SSE stream closed"); 163 return; 164 } 165 166 if (ec) 167 { 168 BMCWEB_LOG_ERROR("async_write_some() failed: {}", ec.message()); 169 close("async_write_some failed"); 170 return; 171 } 172 BMCWEB_LOG_DEBUG("async_write_some() bytes transferred: {}", 173 bytesTransferred); 174 175 doWrite(); 176 } 177 sendSseEvent(std::string_view id,std::string_view msg)178 void sendSseEvent(std::string_view id, std::string_view msg) override 179 { 180 if (msg.empty()) 181 { 182 BMCWEB_LOG_DEBUG("Empty data, bailing out."); 183 return; 184 } 185 186 dataFormat(id, msg); 187 188 doWrite(); 189 } 190 dataFormat(std::string_view id,std::string_view msg)191 void dataFormat(std::string_view id, std::string_view msg) 192 { 193 constexpr size_t bufferLimit = 10485760U; // 10MB 194 if (id.size() + msg.size() + inputBuffer.size() >= bufferLimit) 195 { 196 BMCWEB_LOG_ERROR("SSE Buffer overflow while waiting for client"); 197 close("Buffer overflow"); 198 return; 199 } 200 std::string rawData; 201 if (!id.empty()) 202 { 203 rawData += "id: "; 204 rawData.append(id); 205 rawData += "\n"; 206 } 207 208 rawData += "data: "; 209 for (char character : msg) 210 { 211 rawData += character; 212 if (character == '\n') 213 { 214 rawData += "data: "; 215 } 216 } 217 rawData += "\n\n"; 218 219 size_t copied = boost::asio::buffer_copy( 220 inputBuffer.prepare(rawData.size()), boost::asio::buffer(rawData)); 221 inputBuffer.commit(copied); 222 } 223 startTimeout()224 void startTimeout() 225 { 226 std::weak_ptr<Connection> weakSelf = weak_from_this(); 227 timer.expires_after(std::chrono::seconds(30)); 228 timer.async_wait(std::bind_front(&ConnectionImpl::onTimeoutCallback, 229 this, weak_from_this())); 230 } 231 onTimeoutCallback(const std::weak_ptr<Connection> & weakSelf,const boost::system::error_code & ec)232 void onTimeoutCallback(const std::weak_ptr<Connection>& weakSelf, 233 const boost::system::error_code& ec) 234 { 235 std::shared_ptr<Connection> self = weakSelf.lock(); 236 if (!self) 237 { 238 BMCWEB_LOG_CRITICAL("{} Failed to capture connection", 239 logPtr(self.get())); 240 return; 241 } 242 243 if (ec == boost::asio::error::operation_aborted) 244 { 245 BMCWEB_LOG_DEBUG("Timer operation aborted"); 246 // Canceled wait means the path succeeded. 247 return; 248 } 249 if (ec) 250 { 251 BMCWEB_LOG_CRITICAL("{} timer failed {}", logPtr(self.get()), ec); 252 } 253 254 BMCWEB_LOG_WARNING("{} Connection timed out, closing", 255 logPtr(self.get())); 256 257 self->close("closing connection"); 258 } 259 260 private: 261 std::array<char, 1> buffer{}; 262 boost::beast::multi_buffer inputBuffer; 263 264 Adaptor adaptor; 265 266 using BodyType = bmcweb::HttpBody; 267 boost::beast::http::response<BodyType> res; 268 std::optional<boost::beast::http::response_serializer<BodyType>> serializer; 269 boost::asio::steady_timer timer; 270 bool doingWrite = false; 271 272 std::function<void(Connection&, const Request&)> openHandler; 273 std::function<void(Connection&)> closeHandler; 274 }; 275 } // namespace sse_socket 276 } // namespace crow 277