1 #pragma once 2 #include "async_resp.hpp" 3 #include "http_request.hpp" 4 5 #include <boost/asio/buffer.hpp> 6 #include <boost/beast/core/multi_buffer.hpp> 7 #include <boost/beast/websocket.hpp> 8 9 #include <array> 10 #include <functional> 11 12 #ifdef BMCWEB_ENABLE_SSL 13 #include <boost/beast/websocket/ssl.hpp> 14 #endif 15 16 namespace crow 17 { 18 namespace websocket 19 { 20 21 enum class MessageType 22 { 23 Binary, 24 Text, 25 }; 26 27 struct Connection : std::enable_shared_from_this<Connection> 28 { 29 public: 30 Connection() = default; 31 32 Connection(const Connection&) = delete; 33 Connection(Connection&&) = delete; 34 Connection& operator=(const Connection&) = delete; 35 Connection& operator=(const Connection&&) = delete; 36 37 virtual void sendBinary(std::string_view msg) = 0; 38 virtual void sendBinary(std::string&& msg) = 0; 39 virtual void sendEx(MessageType type, std::string_view msg, 40 std::function<void()>&& onDone) = 0; 41 virtual void sendText(std::string_view msg) = 0; 42 virtual void sendText(std::string&& msg) = 0; 43 virtual void close(std::string_view msg = "quit") = 0; 44 virtual void deferRead() = 0; 45 virtual void resumeRead() = 0; 46 virtual boost::asio::io_context& getIoContext() = 0; 47 virtual ~Connection() = default; 48 virtual boost::urls::url_view url() = 0; 49 }; 50 51 template <typename Adaptor> 52 class ConnectionImpl : public Connection 53 { 54 using self_t = ConnectionImpl<Adaptor>; 55 56 public: 57 ConnectionImpl( 58 const boost::urls::url_view& urlViewIn, 59 const std::shared_ptr<persistent_data::UserSession>& sessionIn, 60 Adaptor adaptorIn, std::function<void(Connection&)> openHandlerIn, 61 std::function<void(Connection&, const std::string&, bool)> 62 messageHandlerIn, 63 std::function<void(crow::websocket::Connection&, std::string_view, 64 crow::websocket::MessageType type, 65 std::function<void()>&& whenComplete)> 66 messageExHandlerIn, 67 std::function<void(Connection&, const std::string&)> closeHandlerIn, 68 std::function<void(Connection&)> errorHandlerIn) : 69 uri(urlViewIn), 70 ws(std::move(adaptorIn)), inBuffer(inString, 131088), 71 openHandler(std::move(openHandlerIn)), 72 messageHandler(std::move(messageHandlerIn)), 73 messageExHandler(std::move(messageExHandlerIn)), 74 closeHandler(std::move(closeHandlerIn)), 75 errorHandler(std::move(errorHandlerIn)), session(sessionIn) 76 { 77 /* Turn on the timeouts on websocket stream to server role */ 78 ws.set_option(boost::beast::websocket::stream_base::timeout::suggested( 79 boost::beast::role_type::server)); 80 BMCWEB_LOG_DEBUG("Creating new connection {}", logPtr(this)); 81 } 82 83 boost::asio::io_context& getIoContext() override 84 { 85 return static_cast<boost::asio::io_context&>( 86 ws.get_executor().context()); 87 } 88 89 void start(const crow::Request& req) 90 { 91 BMCWEB_LOG_DEBUG("starting connection {}", logPtr(this)); 92 93 using bf = boost::beast::http::field; 94 std::string protocolHeader = req.req[bf::sec_websocket_protocol]; 95 96 ws.set_option(boost::beast::websocket::stream_base::decorator( 97 [session{session}, 98 protocolHeader](boost::beast::websocket::response_type& m) { 99 100 #ifndef BMCWEB_INSECURE_DISABLE_CSRF_PREVENTION 101 if (session != nullptr) 102 { 103 // use protocol for csrf checking 104 if (session->cookieAuth && 105 !crow::utility::constantTimeStringCompare( 106 protocolHeader, session->csrfToken)) 107 { 108 BMCWEB_LOG_ERROR("Websocket CSRF error"); 109 m.result(boost::beast::http::status::unauthorized); 110 return; 111 } 112 } 113 #endif 114 if (!protocolHeader.empty()) 115 { 116 m.insert(bf::sec_websocket_protocol, protocolHeader); 117 } 118 119 m.insert(bf::strict_transport_security, "max-age=31536000; " 120 "includeSubdomains; " 121 "preload"); 122 m.insert(bf::pragma, "no-cache"); 123 m.insert(bf::cache_control, "no-Store,no-Cache"); 124 m.insert("Content-Security-Policy", "default-src 'self'"); 125 m.insert("X-XSS-Protection", "1; " 126 "mode=block"); 127 m.insert("X-Content-Type-Options", "nosniff"); 128 })); 129 130 // Make a pointer to keep the req alive while we accept it. 131 using Body = 132 boost::beast::http::request<boost::beast::http::string_body>; 133 std::unique_ptr<Body> mobile = std::make_unique<Body>(req.req); 134 Body* ptr = mobile.get(); 135 // Perform the websocket upgrade 136 ws.async_accept(*ptr, 137 std::bind_front(&self_t::acceptDone, this, 138 shared_from_this(), std::move(mobile))); 139 } 140 141 void sendBinary(std::string_view msg) override 142 { 143 ws.binary(true); 144 outBuffer.commit(boost::asio::buffer_copy(outBuffer.prepare(msg.size()), 145 boost::asio::buffer(msg))); 146 doWrite(); 147 } 148 149 void sendEx(MessageType type, std::string_view msg, 150 std::function<void()>&& onDone) override 151 { 152 if (doingWrite) 153 { 154 BMCWEB_LOG_CRITICAL( 155 "Cannot mix sendEx usage with sendBinary or sendText"); 156 onDone(); 157 return; 158 } 159 ws.binary(type == MessageType::Binary); 160 161 ws.async_write(boost::asio::buffer(msg), 162 [weak(weak_from_this()), onDone{std::move(onDone)}]( 163 const boost::beast::error_code& ec, size_t) { 164 std::shared_ptr<Connection> self = weak.lock(); 165 166 // Call the done handler regardless of whether we 167 // errored, but before we close things out 168 onDone(); 169 170 if (ec) 171 { 172 BMCWEB_LOG_ERROR("Error in ws.async_write {}", ec); 173 self->close("write error"); 174 } 175 }); 176 } 177 178 void sendBinary(std::string&& msg) override 179 { 180 ws.binary(true); 181 outBuffer.commit(boost::asio::buffer_copy(outBuffer.prepare(msg.size()), 182 boost::asio::buffer(msg))); 183 doWrite(); 184 } 185 186 void sendText(std::string_view msg) override 187 { 188 ws.text(true); 189 outBuffer.commit(boost::asio::buffer_copy(outBuffer.prepare(msg.size()), 190 boost::asio::buffer(msg))); 191 doWrite(); 192 } 193 194 void sendText(std::string&& msg) override 195 { 196 ws.text(true); 197 outBuffer.commit(boost::asio::buffer_copy(outBuffer.prepare(msg.size()), 198 boost::asio::buffer(msg))); 199 doWrite(); 200 } 201 202 void close(std::string_view msg) override 203 { 204 ws.async_close( 205 {boost::beast::websocket::close_code::normal, msg}, 206 [self(shared_from_this())](const boost::system::error_code& ec) { 207 if (ec == boost::asio::error::operation_aborted) 208 { 209 return; 210 } 211 if (ec) 212 { 213 BMCWEB_LOG_ERROR("Error closing websocket {}", ec); 214 return; 215 } 216 }); 217 } 218 219 boost::urls::url_view url() override 220 { 221 return uri; 222 } 223 224 void acceptDone(const std::shared_ptr<Connection>& /*self*/, 225 const std::unique_ptr<boost::beast::http::request< 226 boost::beast::http::string_body>>& /*req*/, 227 const boost::system::error_code& ec) 228 { 229 if (ec) 230 { 231 BMCWEB_LOG_ERROR("Error in ws.async_accept {}", ec); 232 return; 233 } 234 BMCWEB_LOG_DEBUG("Websocket accepted connection"); 235 236 if (openHandler) 237 { 238 openHandler(*this); 239 } 240 doRead(); 241 } 242 243 void deferRead() override 244 { 245 readingDefered = true; 246 247 // If we're not actively reading, we need to take ownership of 248 // ourselves for a small portion of time, do that, and clear when we 249 // resume. 250 selfOwned = shared_from_this(); 251 } 252 253 void resumeRead() override 254 { 255 readingDefered = false; 256 doRead(); 257 258 // No longer need to keep ourselves alive now that read is active. 259 selfOwned.reset(); 260 } 261 262 void doRead() 263 { 264 if (readingDefered) 265 { 266 return; 267 } 268 ws.async_read(inBuffer, [this, self(shared_from_this())]( 269 const boost::beast::error_code& ec, 270 size_t bytesRead) { 271 if (ec) 272 { 273 if (ec != boost::beast::websocket::error::closed) 274 { 275 BMCWEB_LOG_ERROR("doRead error {}", ec); 276 } 277 if (closeHandler) 278 { 279 std::string reason{ws.reason().reason.c_str()}; 280 closeHandler(*this, reason); 281 } 282 return; 283 } 284 285 handleMessage(bytesRead); 286 }); 287 } 288 void doWrite() 289 { 290 // If we're already doing a write, ignore the request, it will be picked 291 // up when the current write is complete 292 if (doingWrite) 293 { 294 return; 295 } 296 297 if (outBuffer.size() == 0) 298 { 299 // Done for now 300 return; 301 } 302 doingWrite = true; 303 ws.async_write(outBuffer.data(), [this, self(shared_from_this())]( 304 const boost::beast::error_code& ec, 305 size_t bytesSent) { 306 doingWrite = false; 307 outBuffer.consume(bytesSent); 308 if (ec == boost::beast::websocket::error::closed) 309 { 310 // Do nothing here. doRead handler will call the 311 // closeHandler. 312 close("Write error"); 313 return; 314 } 315 if (ec) 316 { 317 BMCWEB_LOG_ERROR("Error in ws.async_write {}", ec); 318 return; 319 } 320 doWrite(); 321 }); 322 } 323 324 private: 325 void handleMessage(size_t bytesRead) 326 { 327 if (messageExHandler) 328 { 329 // Note, because of the interactions with the read buffers, 330 // this message handler overrides the normal message handler 331 messageExHandler(*this, inString, MessageType::Binary, 332 [this, self(shared_from_this()), bytesRead]() { 333 if (self == nullptr) 334 { 335 return; 336 } 337 338 inBuffer.consume(bytesRead); 339 inString.clear(); 340 341 doRead(); 342 }); 343 return; 344 } 345 346 if (messageHandler) 347 { 348 messageHandler(*this, inString, ws.got_text()); 349 } 350 inBuffer.consume(bytesRead); 351 inString.clear(); 352 doRead(); 353 } 354 355 boost::urls::url uri; 356 357 boost::beast::websocket::stream<Adaptor, false> ws; 358 359 bool readingDefered = false; 360 std::string inString; 361 boost::asio::dynamic_string_buffer<std::string::value_type, 362 std::string::traits_type, 363 std::string::allocator_type> 364 inBuffer; 365 366 boost::beast::multi_buffer outBuffer; 367 bool doingWrite = false; 368 369 std::function<void(Connection&)> openHandler; 370 std::function<void(Connection&, const std::string&, bool)> messageHandler; 371 std::function<void(crow::websocket::Connection&, std::string_view, 372 crow::websocket::MessageType type, 373 std::function<void()>&& whenComplete)> 374 messageExHandler; 375 std::function<void(Connection&, const std::string&)> closeHandler; 376 std::function<void(Connection&)> errorHandler; 377 std::shared_ptr<persistent_data::UserSession> session; 378 379 std::shared_ptr<Connection> selfOwned; 380 }; 381 } // namespace websocket 382 } // namespace crow 383