1 #pragma once 2 #include "async_resp.hpp" 3 #include "http_request.hpp" 4 5 #include <boost/asio/buffer.hpp> 6 #include <boost/beast/core/multi_buffer.hpp> 7 #include <boost/beast/websocket.hpp> 8 9 #include <array> 10 #include <functional> 11 12 #ifdef BMCWEB_ENABLE_SSL 13 #include <boost/beast/websocket/ssl.hpp> 14 #endif 15 16 namespace crow 17 { 18 namespace websocket 19 { 20 21 enum class MessageType 22 { 23 Binary, 24 Text, 25 }; 26 27 struct Connection : std::enable_shared_from_this<Connection> 28 { 29 public: 30 explicit Connection(const crow::Request& reqIn) : req(reqIn.req) 31 {} 32 33 Connection(const Connection&) = delete; 34 Connection(Connection&&) = delete; 35 Connection& operator=(const Connection&) = delete; 36 Connection& operator=(const Connection&&) = delete; 37 38 virtual void sendBinary(std::string_view msg) = 0; 39 virtual void sendBinary(std::string&& msg) = 0; 40 virtual void sendEx(MessageType type, std::string_view msg, 41 std::function<void()>&& onDone) = 0; 42 virtual void sendText(std::string_view msg) = 0; 43 virtual void sendText(std::string&& msg) = 0; 44 virtual void close(std::string_view msg = "quit") = 0; 45 virtual void deferRead() = 0; 46 virtual void resumeRead() = 0; 47 virtual boost::asio::io_context& getIoContext() = 0; 48 virtual ~Connection() = default; 49 50 boost::beast::http::request<boost::beast::http::string_body> req; 51 }; 52 53 template <typename Adaptor> 54 class ConnectionImpl : public Connection 55 { 56 public: 57 ConnectionImpl( 58 const crow::Request& reqIn, Adaptor adaptorIn, 59 std::function<void(Connection&)> openHandlerIn, 60 std::function<void(Connection&, const std::string&, bool)> 61 messageHandlerIn, 62 std::function<void(crow::websocket::Connection&, std::string_view, 63 crow::websocket::MessageType type, 64 std::function<void()>&& whenComplete)> 65 messageExHandlerIn, 66 std::function<void(Connection&, const std::string&)> closeHandlerIn, 67 std::function<void(Connection&)> errorHandlerIn) : 68 Connection(reqIn), 69 ws(std::move(adaptorIn)), inBuffer(inString, 131088), 70 openHandler(std::move(openHandlerIn)), 71 messageHandler(std::move(messageHandlerIn)), 72 messageExHandler(std::move(messageExHandlerIn)), 73 closeHandler(std::move(closeHandlerIn)), 74 errorHandler(std::move(errorHandlerIn)), session(reqIn.session) 75 { 76 /* Turn on the timeouts on websocket stream to server role */ 77 ws.set_option(boost::beast::websocket::stream_base::timeout::suggested( 78 boost::beast::role_type::server)); 79 BMCWEB_LOG_DEBUG << "Creating new connection " << this; 80 } 81 82 boost::asio::io_context& getIoContext() override 83 { 84 return static_cast<boost::asio::io_context&>( 85 ws.get_executor().context()); 86 } 87 88 void start() 89 { 90 BMCWEB_LOG_DEBUG << "starting connection " << this; 91 92 using bf = boost::beast::http::field; 93 94 std::string_view protocol = req[bf::sec_websocket_protocol]; 95 96 ws.set_option(boost::beast::websocket::stream_base::decorator( 97 [session{session}, protocol{std::string(protocol)}]( 98 boost::beast::websocket::response_type& m) { 99 100 #ifndef BMCWEB_INSECURE_DISABLE_CSRF_PREVENTION 101 if (session != nullptr) 102 { 103 // use protocol for csrf checking 104 if (!crow::utility::constantTimeStringCompare( 105 protocol, session->csrfToken)) 106 { 107 BMCWEB_LOG_ERROR << "Websocket CSRF error"; 108 m.result(boost::beast::http::status::unauthorized); 109 return; 110 } 111 } 112 #endif 113 if (!protocol.empty()) 114 { 115 m.insert(bf::sec_websocket_protocol, protocol); 116 } 117 118 m.insert(bf::strict_transport_security, "max-age=31536000; " 119 "includeSubdomains; " 120 "preload"); 121 m.insert(bf::pragma, "no-cache"); 122 m.insert(bf::cache_control, "no-Store,no-Cache"); 123 m.insert("Content-Security-Policy", "default-src 'self'"); 124 m.insert("X-XSS-Protection", "1; " 125 "mode=block"); 126 m.insert("X-Content-Type-Options", "nosniff"); 127 })); 128 129 // Perform the websocket upgrade 130 ws.async_accept(req, [this, self(shared_from_this())]( 131 const boost::system::error_code& ec) { 132 if (ec) 133 { 134 BMCWEB_LOG_ERROR << "Error in ws.async_accept " << ec; 135 return; 136 } 137 acceptDone(); 138 }); 139 } 140 141 void sendBinary(std::string_view msg) override 142 { 143 ws.binary(true); 144 outBuffer.commit(boost::asio::buffer_copy(outBuffer.prepare(msg.size()), 145 boost::asio::buffer(msg))); 146 doWrite(); 147 } 148 149 void sendEx(MessageType type, std::string_view msg, 150 std::function<void()>&& onDone) override 151 { 152 if (doingWrite) 153 { 154 BMCWEB_LOG_CRITICAL 155 << "Cannot mix sendEx usage with sendBinary or sendText"; 156 onDone(); 157 return; 158 } 159 ws.binary(type == MessageType::Binary); 160 161 ws.async_write(boost::asio::buffer(msg), 162 [weak(weak_from_this()), onDone{std::move(onDone)}]( 163 const boost::beast::error_code& ec, size_t) { 164 std::shared_ptr<Connection> self = weak.lock(); 165 166 // Call the done handler regardless of whether we 167 // errored, but before we close things out 168 onDone(); 169 170 if (ec) 171 { 172 BMCWEB_LOG_ERROR << "Error in ws.async_write " << ec; 173 self->close("write error"); 174 } 175 }); 176 } 177 178 void sendBinary(std::string&& msg) override 179 { 180 ws.binary(true); 181 outBuffer.commit(boost::asio::buffer_copy(outBuffer.prepare(msg.size()), 182 boost::asio::buffer(msg))); 183 doWrite(); 184 } 185 186 void sendText(std::string_view msg) override 187 { 188 ws.text(true); 189 outBuffer.commit(boost::asio::buffer_copy(outBuffer.prepare(msg.size()), 190 boost::asio::buffer(msg))); 191 doWrite(); 192 } 193 194 void sendText(std::string&& msg) override 195 { 196 ws.text(true); 197 outBuffer.commit(boost::asio::buffer_copy(outBuffer.prepare(msg.size()), 198 boost::asio::buffer(msg))); 199 doWrite(); 200 } 201 202 void close(std::string_view msg) override 203 { 204 ws.async_close( 205 {boost::beast::websocket::close_code::normal, msg}, 206 [self(shared_from_this())](const boost::system::error_code& ec) { 207 if (ec == boost::asio::error::operation_aborted) 208 { 209 return; 210 } 211 if (ec) 212 { 213 BMCWEB_LOG_ERROR << "Error closing websocket " << ec; 214 return; 215 } 216 }); 217 } 218 219 void acceptDone() 220 { 221 BMCWEB_LOG_DEBUG << "Websocket accepted connection"; 222 223 if (openHandler) 224 { 225 openHandler(*this); 226 } 227 doRead(); 228 } 229 230 void deferRead() override 231 { 232 readingDefered = true; 233 234 // If we're not actively reading, we need to take ownership of 235 // ourselves for a small portion of time, do that, and clear when we 236 // resume. 237 selfOwned = shared_from_this(); 238 } 239 240 void resumeRead() override 241 { 242 readingDefered = false; 243 doRead(); 244 245 // No longer need to keep ourselves alive now that read is active. 246 selfOwned.reset(); 247 } 248 249 void doRead() 250 { 251 if (readingDefered) 252 { 253 return; 254 } 255 ws.async_read(inBuffer, [this, self(shared_from_this())]( 256 const boost::beast::error_code& ec, 257 size_t bytesRead) { 258 if (ec) 259 { 260 if (ec != boost::beast::websocket::error::closed) 261 { 262 BMCWEB_LOG_ERROR << "doRead error " << ec; 263 } 264 if (closeHandler) 265 { 266 std::string reason{ws.reason().reason.c_str()}; 267 closeHandler(*this, reason); 268 } 269 return; 270 } 271 272 handleMessage(bytesRead); 273 }); 274 } 275 void doWrite() 276 { 277 // If we're already doing a write, ignore the request, it will be picked 278 // up when the current write is complete 279 if (doingWrite) 280 { 281 return; 282 } 283 284 if (outBuffer.size() == 0) 285 { 286 // Done for now 287 return; 288 } 289 doingWrite = true; 290 ws.async_write(outBuffer.data(), [this, self(shared_from_this())]( 291 const boost::beast::error_code& ec, 292 size_t bytesSent) { 293 doingWrite = false; 294 outBuffer.consume(bytesSent); 295 if (ec == boost::beast::websocket::error::closed) 296 { 297 // Do nothing here. doRead handler will call the 298 // closeHandler. 299 close("Write error"); 300 return; 301 } 302 if (ec) 303 { 304 BMCWEB_LOG_ERROR << "Error in ws.async_write " << ec; 305 return; 306 } 307 doWrite(); 308 }); 309 } 310 311 private: 312 void handleMessage(size_t bytesRead) 313 { 314 if (messageExHandler) 315 { 316 // Note, because of the interactions with the read buffers, 317 // this message handler overrides the normal message handler 318 messageExHandler(*this, inString, MessageType::Binary, 319 [this, self(shared_from_this()), bytesRead]() { 320 if (self == nullptr) 321 { 322 return; 323 } 324 325 inBuffer.consume(bytesRead); 326 inString.clear(); 327 328 doRead(); 329 }); 330 return; 331 } 332 333 if (messageHandler) 334 { 335 messageHandler(*this, inString, ws.got_text()); 336 } 337 inBuffer.consume(bytesRead); 338 inString.clear(); 339 doRead(); 340 } 341 342 boost::beast::websocket::stream<Adaptor, false> ws; 343 344 bool readingDefered = false; 345 std::string inString; 346 boost::asio::dynamic_string_buffer<std::string::value_type, 347 std::string::traits_type, 348 std::string::allocator_type> 349 inBuffer; 350 351 boost::beast::multi_buffer outBuffer; 352 bool doingWrite = false; 353 354 std::function<void(Connection&)> openHandler; 355 std::function<void(Connection&, const std::string&, bool)> messageHandler; 356 std::function<void(crow::websocket::Connection&, std::string_view, 357 crow::websocket::MessageType type, 358 std::function<void()>&& whenComplete)> 359 messageExHandler; 360 std::function<void(Connection&, const std::string&)> closeHandler; 361 std::function<void(Connection&)> errorHandler; 362 std::shared_ptr<persistent_data::UserSession> session; 363 364 std::shared_ptr<Connection> selfOwned; 365 }; 366 } // namespace websocket 367 } // namespace crow 368