1 #pragma once 2 #include "async_resp.hpp" 3 #include "http_request.hpp" 4 5 #include <boost/asio/buffer.hpp> 6 #include <boost/beast/core/multi_buffer.hpp> 7 #include <boost/beast/websocket.hpp> 8 9 #include <array> 10 #include <functional> 11 12 #ifdef BMCWEB_ENABLE_SSL 13 #include <boost/beast/websocket/ssl.hpp> 14 #endif 15 16 namespace crow 17 { 18 namespace websocket 19 { 20 21 enum class MessageType 22 { 23 Binary, 24 Text, 25 }; 26 27 struct Connection : std::enable_shared_from_this<Connection> 28 { 29 public: 30 Connection() = default; 31 32 Connection(const Connection&) = delete; 33 Connection(Connection&&) = delete; 34 Connection& operator=(const Connection&) = delete; 35 Connection& operator=(const Connection&&) = delete; 36 37 virtual void sendBinary(std::string_view msg) = 0; 38 virtual void sendBinary(std::string&& msg) = 0; 39 virtual void sendEx(MessageType type, std::string_view msg, 40 std::function<void()>&& onDone) = 0; 41 virtual void sendText(std::string_view msg) = 0; 42 virtual void sendText(std::string&& msg) = 0; 43 virtual void close(std::string_view msg = "quit") = 0; 44 virtual void deferRead() = 0; 45 virtual void resumeRead() = 0; 46 virtual boost::asio::io_context& getIoContext() = 0; 47 virtual ~Connection() = default; 48 virtual boost::urls::url_view url() = 0; 49 }; 50 51 template <typename Adaptor> 52 class ConnectionImpl : public Connection 53 { 54 using self_t = ConnectionImpl<Adaptor>; 55 56 public: 57 ConnectionImpl( 58 const boost::urls::url_view& urlViewIn, 59 const std::shared_ptr<persistent_data::UserSession>& sessionIn, 60 Adaptor adaptorIn, std::function<void(Connection&)> openHandlerIn, 61 std::function<void(Connection&, const std::string&, bool)> 62 messageHandlerIn, 63 std::function<void(crow::websocket::Connection&, std::string_view, 64 crow::websocket::MessageType type, 65 std::function<void()>&& whenComplete)> 66 messageExHandlerIn, 67 std::function<void(Connection&, const std::string&)> closeHandlerIn, 68 std::function<void(Connection&)> errorHandlerIn) : 69 uri(urlViewIn), 70 ws(std::move(adaptorIn)), inBuffer(inString, 131088), 71 openHandler(std::move(openHandlerIn)), 72 messageHandler(std::move(messageHandlerIn)), 73 messageExHandler(std::move(messageExHandlerIn)), 74 closeHandler(std::move(closeHandlerIn)), 75 errorHandler(std::move(errorHandlerIn)), session(sessionIn) 76 { 77 /* Turn on the timeouts on websocket stream to server role */ 78 ws.set_option(boost::beast::websocket::stream_base::timeout::suggested( 79 boost::beast::role_type::server)); 80 BMCWEB_LOG_DEBUG("Creating new connection {}", logPtr(this)); 81 } 82 83 boost::asio::io_context& getIoContext() override 84 { 85 return static_cast<boost::asio::io_context&>( 86 ws.get_executor().context()); 87 } 88 89 void start(const crow::Request& req) 90 { 91 BMCWEB_LOG_DEBUG("starting connection {}", logPtr(this)); 92 93 using bf = boost::beast::http::field; 94 std::string protocolHeader = req.req[bf::sec_websocket_protocol]; 95 96 ws.set_option(boost::beast::websocket::stream_base::decorator( 97 [session{session}, 98 protocolHeader](boost::beast::websocket::response_type& m) { 99 100 #ifndef BMCWEB_INSECURE_DISABLE_CSRF_PREVENTION 101 if (session != nullptr) 102 { 103 // use protocol for csrf checking 104 if (session->cookieAuth && 105 !crow::utility::constantTimeStringCompare( 106 protocolHeader, session->csrfToken)) 107 { 108 BMCWEB_LOG_ERROR("Websocket CSRF error"); 109 m.result(boost::beast::http::status::unauthorized); 110 return; 111 } 112 } 113 #endif 114 if (!protocolHeader.empty()) 115 { 116 m.insert(bf::sec_websocket_protocol, protocolHeader); 117 } 118 119 m.insert(bf::strict_transport_security, "max-age=31536000; " 120 "includeSubdomains; " 121 "preload"); 122 m.insert(bf::pragma, "no-cache"); 123 m.insert(bf::cache_control, "no-Store,no-Cache"); 124 m.insert("Content-Security-Policy", "default-src 'self'"); 125 m.insert("X-XSS-Protection", "1; " 126 "mode=block"); 127 m.insert("X-Content-Type-Options", "nosniff"); 128 })); 129 130 // Make a pointer to keep the req alive while we accept it. 131 using Body = boost::beast::http::request<bmcweb::FileBody>; 132 std::unique_ptr<Body> mobile = std::make_unique<Body>(req.req); 133 Body* ptr = mobile.get(); 134 // Perform the websocket upgrade 135 ws.async_accept(*ptr, 136 std::bind_front(&self_t::acceptDone, this, 137 shared_from_this(), std::move(mobile))); 138 } 139 140 void sendBinary(std::string_view msg) override 141 { 142 ws.binary(true); 143 outBuffer.commit(boost::asio::buffer_copy(outBuffer.prepare(msg.size()), 144 boost::asio::buffer(msg))); 145 doWrite(); 146 } 147 148 void sendEx(MessageType type, std::string_view msg, 149 std::function<void()>&& onDone) override 150 { 151 if (doingWrite) 152 { 153 BMCWEB_LOG_CRITICAL( 154 "Cannot mix sendEx usage with sendBinary or sendText"); 155 onDone(); 156 return; 157 } 158 ws.binary(type == MessageType::Binary); 159 160 ws.async_write(boost::asio::buffer(msg), 161 [weak(weak_from_this()), onDone{std::move(onDone)}]( 162 const boost::beast::error_code& ec, size_t) { 163 std::shared_ptr<Connection> self = weak.lock(); 164 if (!self) 165 { 166 BMCWEB_LOG_ERROR("Connection went away"); 167 return; 168 } 169 170 // Call the done handler regardless of whether we 171 // errored, but before we close things out 172 onDone(); 173 174 if (ec) 175 { 176 BMCWEB_LOG_ERROR("Error in ws.async_write {}", ec); 177 self->close("write error"); 178 } 179 }); 180 } 181 182 void sendBinary(std::string&& msg) override 183 { 184 ws.binary(true); 185 outBuffer.commit(boost::asio::buffer_copy(outBuffer.prepare(msg.size()), 186 boost::asio::buffer(msg))); 187 doWrite(); 188 } 189 190 void sendText(std::string_view msg) override 191 { 192 ws.text(true); 193 outBuffer.commit(boost::asio::buffer_copy(outBuffer.prepare(msg.size()), 194 boost::asio::buffer(msg))); 195 doWrite(); 196 } 197 198 void sendText(std::string&& msg) override 199 { 200 ws.text(true); 201 outBuffer.commit(boost::asio::buffer_copy(outBuffer.prepare(msg.size()), 202 boost::asio::buffer(msg))); 203 doWrite(); 204 } 205 206 void close(std::string_view msg) override 207 { 208 ws.async_close( 209 {boost::beast::websocket::close_code::normal, msg}, 210 [self(shared_from_this())](const boost::system::error_code& ec) { 211 if (ec == boost::asio::error::operation_aborted) 212 { 213 return; 214 } 215 if (ec) 216 { 217 BMCWEB_LOG_ERROR("Error closing websocket {}", ec); 218 return; 219 } 220 }); 221 } 222 223 boost::urls::url_view url() override 224 { 225 return uri; 226 } 227 228 void acceptDone(const std::shared_ptr<Connection>& /*self*/, 229 const std::unique_ptr< 230 boost::beast::http::request<bmcweb::FileBody>>& /*req*/, 231 const boost::system::error_code& ec) 232 { 233 if (ec) 234 { 235 BMCWEB_LOG_ERROR("Error in ws.async_accept {}", ec); 236 return; 237 } 238 BMCWEB_LOG_DEBUG("Websocket accepted connection"); 239 240 if (openHandler) 241 { 242 openHandler(*this); 243 } 244 doRead(); 245 } 246 247 void deferRead() override 248 { 249 readingDefered = true; 250 251 // If we're not actively reading, we need to take ownership of 252 // ourselves for a small portion of time, do that, and clear when we 253 // resume. 254 selfOwned = shared_from_this(); 255 } 256 257 void resumeRead() override 258 { 259 readingDefered = false; 260 doRead(); 261 262 // No longer need to keep ourselves alive now that read is active. 263 selfOwned.reset(); 264 } 265 266 void doRead() 267 { 268 if (readingDefered) 269 { 270 return; 271 } 272 ws.async_read(inBuffer, [this, self(shared_from_this())]( 273 const boost::beast::error_code& ec, 274 size_t bytesRead) { 275 if (ec) 276 { 277 if (ec != boost::beast::websocket::error::closed) 278 { 279 BMCWEB_LOG_ERROR("doRead error {}", ec); 280 } 281 if (closeHandler) 282 { 283 std::string reason{ws.reason().reason.c_str()}; 284 closeHandler(*this, reason); 285 } 286 return; 287 } 288 289 handleMessage(bytesRead); 290 }); 291 } 292 void doWrite() 293 { 294 // If we're already doing a write, ignore the request, it will be picked 295 // up when the current write is complete 296 if (doingWrite) 297 { 298 return; 299 } 300 301 if (outBuffer.size() == 0) 302 { 303 // Done for now 304 return; 305 } 306 doingWrite = true; 307 ws.async_write(outBuffer.data(), [this, self(shared_from_this())]( 308 const boost::beast::error_code& ec, 309 size_t bytesSent) { 310 doingWrite = false; 311 outBuffer.consume(bytesSent); 312 if (ec == boost::beast::websocket::error::closed) 313 { 314 // Do nothing here. doRead handler will call the 315 // closeHandler. 316 close("Write error"); 317 return; 318 } 319 if (ec) 320 { 321 BMCWEB_LOG_ERROR("Error in ws.async_write {}", ec); 322 return; 323 } 324 doWrite(); 325 }); 326 } 327 328 private: 329 void handleMessage(size_t bytesRead) 330 { 331 if (messageExHandler) 332 { 333 // Note, because of the interactions with the read buffers, 334 // this message handler overrides the normal message handler 335 messageExHandler(*this, inString, MessageType::Binary, 336 [this, self(shared_from_this()), bytesRead]() { 337 if (self == nullptr) 338 { 339 return; 340 } 341 342 inBuffer.consume(bytesRead); 343 inString.clear(); 344 345 doRead(); 346 }); 347 return; 348 } 349 350 if (messageHandler) 351 { 352 messageHandler(*this, inString, ws.got_text()); 353 } 354 inBuffer.consume(bytesRead); 355 inString.clear(); 356 doRead(); 357 } 358 359 boost::urls::url uri; 360 361 boost::beast::websocket::stream<Adaptor, false> ws; 362 363 bool readingDefered = false; 364 std::string inString; 365 boost::asio::dynamic_string_buffer<std::string::value_type, 366 std::string::traits_type, 367 std::string::allocator_type> 368 inBuffer; 369 370 boost::beast::multi_buffer outBuffer; 371 bool doingWrite = false; 372 373 std::function<void(Connection&)> openHandler; 374 std::function<void(Connection&, const std::string&, bool)> messageHandler; 375 std::function<void(crow::websocket::Connection&, std::string_view, 376 crow::websocket::MessageType type, 377 std::function<void()>&& whenComplete)> 378 messageExHandler; 379 std::function<void(Connection&, const std::string&)> closeHandler; 380 std::function<void(Connection&)> errorHandler; 381 std::shared_ptr<persistent_data::UserSession> session; 382 383 std::shared_ptr<Connection> selfOwned; 384 }; 385 } // namespace websocket 386 } // namespace crow 387