1 #pragma once 2 #include "async_resp.hpp" 3 #include "http_request.hpp" 4 5 #include <boost/asio/buffer.hpp> 6 #include <boost/beast/core/multi_buffer.hpp> 7 #include <boost/beast/websocket.hpp> 8 9 #include <array> 10 #include <functional> 11 12 #ifdef BMCWEB_ENABLE_SSL 13 #include <boost/beast/websocket/ssl.hpp> 14 #endif 15 16 namespace crow 17 { 18 namespace websocket 19 { 20 21 enum class MessageType 22 { 23 Binary, 24 Text, 25 }; 26 27 struct Connection : std::enable_shared_from_this<Connection> 28 { 29 public: 30 explicit Connection(const crow::Request& reqIn) : req(reqIn.req) 31 {} 32 33 Connection(const Connection&) = delete; 34 Connection(Connection&&) = delete; 35 Connection& operator=(const Connection&) = delete; 36 Connection& operator=(const Connection&&) = delete; 37 38 virtual void sendBinary(std::string_view msg) = 0; 39 virtual void sendBinary(std::string&& msg) = 0; 40 virtual void sendEx(MessageType type, std::string_view msg, 41 std::function<void()>&& onDone) = 0; 42 virtual void sendText(std::string_view msg) = 0; 43 virtual void sendText(std::string&& msg) = 0; 44 virtual void close(std::string_view msg = "quit") = 0; 45 virtual void deferRead() = 0; 46 virtual void resumeRead() = 0; 47 virtual boost::asio::io_context& getIoContext() = 0; 48 virtual ~Connection() = default; 49 50 boost::beast::http::request<boost::beast::http::string_body> req; 51 }; 52 53 template <typename Adaptor> 54 class ConnectionImpl : public Connection 55 { 56 public: 57 ConnectionImpl( 58 const crow::Request& reqIn, Adaptor adaptorIn, 59 std::function<void(Connection&)> openHandlerIn, 60 std::function<void(Connection&, const std::string&, bool)> 61 messageHandlerIn, 62 std::function<void(crow::websocket::Connection&, std::string_view, 63 crow::websocket::MessageType type, 64 std::function<void()>&& whenComplete)> 65 messageExHandlerIn, 66 std::function<void(Connection&, const std::string&)> closeHandlerIn, 67 std::function<void(Connection&)> errorHandlerIn) : 68 Connection(reqIn), 69 ws(std::move(adaptorIn)), inBuffer(inString, 131088), 70 openHandler(std::move(openHandlerIn)), 71 messageHandler(std::move(messageHandlerIn)), 72 messageExHandler(std::move(messageExHandlerIn)), 73 closeHandler(std::move(closeHandlerIn)), 74 errorHandler(std::move(errorHandlerIn)), session(reqIn.session) 75 { 76 /* Turn on the timeouts on websocket stream to server role */ 77 ws.set_option(boost::beast::websocket::stream_base::timeout::suggested( 78 boost::beast::role_type::server)); 79 BMCWEB_LOG_DEBUG << "Creating new connection " << this; 80 } 81 82 boost::asio::io_context& getIoContext() override 83 { 84 return static_cast<boost::asio::io_context&>( 85 ws.get_executor().context()); 86 } 87 88 void start() 89 { 90 BMCWEB_LOG_DEBUG << "starting connection " << this; 91 92 using bf = boost::beast::http::field; 93 94 std::string_view protocol = req[bf::sec_websocket_protocol]; 95 96 ws.set_option(boost::beast::websocket::stream_base::decorator( 97 [session{session}, protocol{std::string(protocol)}]( 98 boost::beast::websocket::response_type& m) { 99 100 #ifndef BMCWEB_INSECURE_DISABLE_CSRF_PREVENTION 101 if (session != nullptr) 102 { 103 // use protocol for csrf checking 104 if (session->cookieAuth && 105 !crow::utility::constantTimeStringCompare( 106 protocol, session->csrfToken)) 107 { 108 BMCWEB_LOG_ERROR << "Websocket CSRF error"; 109 m.result(boost::beast::http::status::unauthorized); 110 return; 111 } 112 } 113 #endif 114 if (!protocol.empty()) 115 { 116 m.insert(bf::sec_websocket_protocol, protocol); 117 } 118 119 m.insert(bf::strict_transport_security, "max-age=31536000; " 120 "includeSubdomains; " 121 "preload"); 122 m.insert(bf::pragma, "no-cache"); 123 m.insert(bf::cache_control, "no-Store,no-Cache"); 124 m.insert("Content-Security-Policy", "default-src 'self'"); 125 m.insert("X-XSS-Protection", "1; " 126 "mode=block"); 127 m.insert("X-Content-Type-Options", "nosniff"); 128 })); 129 130 // Perform the websocket upgrade 131 ws.async_accept(req, [this, self(shared_from_this())]( 132 const boost::system::error_code& ec) { 133 if (ec) 134 { 135 BMCWEB_LOG_ERROR << "Error in ws.async_accept " << ec; 136 return; 137 } 138 acceptDone(); 139 }); 140 } 141 142 void sendBinary(std::string_view msg) override 143 { 144 ws.binary(true); 145 outBuffer.commit(boost::asio::buffer_copy(outBuffer.prepare(msg.size()), 146 boost::asio::buffer(msg))); 147 doWrite(); 148 } 149 150 void sendEx(MessageType type, std::string_view msg, 151 std::function<void()>&& onDone) override 152 { 153 if (doingWrite) 154 { 155 BMCWEB_LOG_CRITICAL 156 << "Cannot mix sendEx usage with sendBinary or sendText"; 157 onDone(); 158 return; 159 } 160 ws.binary(type == MessageType::Binary); 161 162 ws.async_write(boost::asio::buffer(msg), 163 [weak(weak_from_this()), onDone{std::move(onDone)}]( 164 const boost::beast::error_code& ec, size_t) { 165 std::shared_ptr<Connection> self = weak.lock(); 166 167 // Call the done handler regardless of whether we 168 // errored, but before we close things out 169 onDone(); 170 171 if (ec) 172 { 173 BMCWEB_LOG_ERROR << "Error in ws.async_write " << ec; 174 self->close("write error"); 175 } 176 }); 177 } 178 179 void sendBinary(std::string&& msg) override 180 { 181 ws.binary(true); 182 outBuffer.commit(boost::asio::buffer_copy(outBuffer.prepare(msg.size()), 183 boost::asio::buffer(msg))); 184 doWrite(); 185 } 186 187 void sendText(std::string_view msg) override 188 { 189 ws.text(true); 190 outBuffer.commit(boost::asio::buffer_copy(outBuffer.prepare(msg.size()), 191 boost::asio::buffer(msg))); 192 doWrite(); 193 } 194 195 void sendText(std::string&& msg) override 196 { 197 ws.text(true); 198 outBuffer.commit(boost::asio::buffer_copy(outBuffer.prepare(msg.size()), 199 boost::asio::buffer(msg))); 200 doWrite(); 201 } 202 203 void close(std::string_view msg) override 204 { 205 ws.async_close( 206 {boost::beast::websocket::close_code::normal, msg}, 207 [self(shared_from_this())](const boost::system::error_code& ec) { 208 if (ec == boost::asio::error::operation_aborted) 209 { 210 return; 211 } 212 if (ec) 213 { 214 BMCWEB_LOG_ERROR << "Error closing websocket " << ec; 215 return; 216 } 217 }); 218 } 219 220 void acceptDone() 221 { 222 BMCWEB_LOG_DEBUG << "Websocket accepted connection"; 223 224 if (openHandler) 225 { 226 openHandler(*this); 227 } 228 doRead(); 229 } 230 231 void deferRead() override 232 { 233 readingDefered = true; 234 235 // If we're not actively reading, we need to take ownership of 236 // ourselves for a small portion of time, do that, and clear when we 237 // resume. 238 selfOwned = shared_from_this(); 239 } 240 241 void resumeRead() override 242 { 243 readingDefered = false; 244 doRead(); 245 246 // No longer need to keep ourselves alive now that read is active. 247 selfOwned.reset(); 248 } 249 250 void doRead() 251 { 252 if (readingDefered) 253 { 254 return; 255 } 256 ws.async_read(inBuffer, [this, self(shared_from_this())]( 257 const boost::beast::error_code& ec, 258 size_t bytesRead) { 259 if (ec) 260 { 261 if (ec != boost::beast::websocket::error::closed) 262 { 263 BMCWEB_LOG_ERROR << "doRead error " << ec; 264 } 265 if (closeHandler) 266 { 267 std::string reason{ws.reason().reason.c_str()}; 268 closeHandler(*this, reason); 269 } 270 return; 271 } 272 273 handleMessage(bytesRead); 274 }); 275 } 276 void doWrite() 277 { 278 // If we're already doing a write, ignore the request, it will be picked 279 // up when the current write is complete 280 if (doingWrite) 281 { 282 return; 283 } 284 285 if (outBuffer.size() == 0) 286 { 287 // Done for now 288 return; 289 } 290 doingWrite = true; 291 ws.async_write(outBuffer.data(), [this, self(shared_from_this())]( 292 const boost::beast::error_code& ec, 293 size_t bytesSent) { 294 doingWrite = false; 295 outBuffer.consume(bytesSent); 296 if (ec == boost::beast::websocket::error::closed) 297 { 298 // Do nothing here. doRead handler will call the 299 // closeHandler. 300 close("Write error"); 301 return; 302 } 303 if (ec) 304 { 305 BMCWEB_LOG_ERROR << "Error in ws.async_write " << ec; 306 return; 307 } 308 doWrite(); 309 }); 310 } 311 312 private: 313 void handleMessage(size_t bytesRead) 314 { 315 if (messageExHandler) 316 { 317 // Note, because of the interactions with the read buffers, 318 // this message handler overrides the normal message handler 319 messageExHandler(*this, inString, MessageType::Binary, 320 [this, self(shared_from_this()), bytesRead]() { 321 if (self == nullptr) 322 { 323 return; 324 } 325 326 inBuffer.consume(bytesRead); 327 inString.clear(); 328 329 doRead(); 330 }); 331 return; 332 } 333 334 if (messageHandler) 335 { 336 messageHandler(*this, inString, ws.got_text()); 337 } 338 inBuffer.consume(bytesRead); 339 inString.clear(); 340 doRead(); 341 } 342 343 boost::beast::websocket::stream<Adaptor, false> ws; 344 345 bool readingDefered = false; 346 std::string inString; 347 boost::asio::dynamic_string_buffer<std::string::value_type, 348 std::string::traits_type, 349 std::string::allocator_type> 350 inBuffer; 351 352 boost::beast::multi_buffer outBuffer; 353 bool doingWrite = false; 354 355 std::function<void(Connection&)> openHandler; 356 std::function<void(Connection&, const std::string&, bool)> messageHandler; 357 std::function<void(crow::websocket::Connection&, std::string_view, 358 crow::websocket::MessageType type, 359 std::function<void()>&& whenComplete)> 360 messageExHandler; 361 std::function<void(Connection&, const std::string&)> closeHandler; 362 std::function<void(Connection&)> errorHandler; 363 std::shared_ptr<persistent_data::UserSession> session; 364 365 std::shared_ptr<Connection> selfOwned; 366 }; 367 } // namespace websocket 368 } // namespace crow 369