1 /* 2 // Copyright (c) 2020 Intel Corporation 3 // 4 // Licensed under the Apache License, Version 2.0 (the "License"); 5 // you may not use this file except in compliance with the License. 6 // You may obtain a copy of the License at 7 // 8 // http://www.apache.org/licenses/LICENSE-2.0 9 // 10 // Unless required by applicable law or agreed to in writing, software 11 // distributed under the License is distributed on an "AS IS" BASIS, 12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 // See the License for the specific language governing permissions and 14 // limitations under the License. 15 */ 16 #pragma once 17 18 #include "async_resolve.hpp" 19 #include "http_body.hpp" 20 #include "http_response.hpp" 21 #include "logging.hpp" 22 #include "ssl_key_handler.hpp" 23 24 #include <boost/asio/connect.hpp> 25 #include <boost/asio/io_context.hpp> 26 #include <boost/asio/ip/address.hpp> 27 #include <boost/asio/ip/basic_endpoint.hpp> 28 #include <boost/asio/ip/tcp.hpp> 29 #include <boost/asio/ssl/context.hpp> 30 #include <boost/asio/ssl/error.hpp> 31 #include <boost/asio/steady_timer.hpp> 32 #include <boost/beast/core/flat_buffer.hpp> 33 #include <boost/beast/core/flat_static_buffer.hpp> 34 #include <boost/beast/http/message.hpp> 35 #include <boost/beast/http/parser.hpp> 36 #include <boost/beast/http/read.hpp> 37 #include <boost/beast/http/write.hpp> 38 #include <boost/beast/ssl/ssl_stream.hpp> 39 #include <boost/beast/version.hpp> 40 #include <boost/container/devector.hpp> 41 #include <boost/system/error_code.hpp> 42 #include <boost/url/format.hpp> 43 #include <boost/url/url.hpp> 44 #include <boost/url/url_view_base.hpp> 45 46 #include <cstdlib> 47 #include <functional> 48 #include <iostream> 49 #include <memory> 50 #include <queue> 51 #include <string> 52 53 namespace crow 54 { 55 // With Redfish Aggregation it is assumed we will connect to another 56 // instance of BMCWeb which can handle 100 simultaneous connections. 57 constexpr size_t maxPoolSize = 20; 58 constexpr size_t maxRequestQueueSize = 500; 59 constexpr unsigned int httpReadBodyLimit = 131072; 60 constexpr unsigned int httpReadBufferSize = 4096; 61 62 enum class ConnState 63 { 64 initialized, 65 resolveInProgress, 66 resolveFailed, 67 connectInProgress, 68 connectFailed, 69 connected, 70 handshakeInProgress, 71 handshakeFailed, 72 sendInProgress, 73 sendFailed, 74 recvInProgress, 75 recvFailed, 76 idle, 77 closed, 78 suspended, 79 terminated, 80 abortConnection, 81 sslInitFailed, 82 retry 83 }; 84 85 static inline boost::system::error_code 86 defaultRetryHandler(unsigned int respCode) 87 { 88 // As a default, assume 200X is alright 89 BMCWEB_LOG_DEBUG("Using default check for response code validity"); 90 if ((respCode < 200) || (respCode >= 300)) 91 { 92 return boost::system::errc::make_error_code( 93 boost::system::errc::result_out_of_range); 94 } 95 96 // Return 0 if the response code is valid 97 return boost::system::errc::make_error_code(boost::system::errc::success); 98 }; 99 100 // We need to allow retry information to be set before a message has been 101 // sent and a connection pool has been created 102 struct ConnectionPolicy 103 { 104 uint32_t maxRetryAttempts = 5; 105 106 // the max size of requests in bytes. 0 for unlimited 107 boost::optional<uint64_t> requestByteLimit = httpReadBodyLimit; 108 109 size_t maxConnections = 1; 110 111 std::string retryPolicyAction = "TerminateAfterRetries"; 112 113 std::chrono::seconds retryIntervalSecs = std::chrono::seconds(0); 114 std::function<boost::system::error_code(unsigned int respCode)> 115 invalidResp = defaultRetryHandler; 116 }; 117 118 struct PendingRequest 119 { 120 boost::beast::http::request<bmcweb::HttpBody> req; 121 std::function<void(bool, uint32_t, Response&)> callback; 122 PendingRequest( 123 boost::beast::http::request<bmcweb::HttpBody>&& reqIn, 124 const std::function<void(bool, uint32_t, Response&)>& callbackIn) : 125 req(std::move(reqIn)), 126 callback(callbackIn) 127 {} 128 }; 129 130 namespace http = boost::beast::http; 131 class ConnectionInfo : public std::enable_shared_from_this<ConnectionInfo> 132 { 133 private: 134 ConnState state = ConnState::initialized; 135 uint32_t retryCount = 0; 136 std::string subId; 137 std::shared_ptr<ConnectionPolicy> connPolicy; 138 boost::urls::url host; 139 uint32_t connId; 140 141 // Data buffers 142 http::request<bmcweb::HttpBody> req; 143 using parser_type = http::response_parser<bmcweb::HttpBody>; 144 std::optional<parser_type> parser; 145 boost::beast::flat_static_buffer<httpReadBufferSize> buffer; 146 Response res; 147 148 // Ascync callables 149 std::function<void(bool, uint32_t, Response&)> callback; 150 151 boost::asio::io_context& ioc; 152 153 #ifdef BMCWEB_DBUS_DNS_RESOLVER 154 using Resolver = async_resolve::Resolver; 155 #else 156 using Resolver = boost::asio::ip::tcp::resolver; 157 #endif 158 Resolver resolver; 159 160 boost::asio::ip::tcp::socket conn; 161 std::optional<boost::beast::ssl_stream<boost::asio::ip::tcp::socket&>> 162 sslConn; 163 164 boost::asio::steady_timer timer; 165 166 friend class ConnectionPool; 167 168 void doResolve() 169 { 170 state = ConnState::resolveInProgress; 171 BMCWEB_LOG_DEBUG("Trying to resolve: {}, id: {}", host, connId); 172 173 resolver.async_resolve(host.encoded_host_address(), host.port(), 174 std::bind_front(&ConnectionInfo::afterResolve, 175 this, shared_from_this())); 176 } 177 178 void afterResolve(const std::shared_ptr<ConnectionInfo>& /*self*/, 179 const boost::system::error_code& ec, 180 const Resolver::results_type& endpointList) 181 { 182 if (ec || (endpointList.empty())) 183 { 184 BMCWEB_LOG_ERROR("Resolve failed: {} {}", ec.message(), host); 185 state = ConnState::resolveFailed; 186 waitAndRetry(); 187 return; 188 } 189 BMCWEB_LOG_DEBUG("Resolved {}, id: {}", host, connId); 190 state = ConnState::connectInProgress; 191 192 BMCWEB_LOG_DEBUG("Trying to connect to: {}, id: {}", host, connId); 193 194 timer.expires_after(std::chrono::seconds(30)); 195 timer.async_wait(std::bind_front(onTimeout, weak_from_this())); 196 197 boost::asio::async_connect( 198 conn, endpointList, 199 std::bind_front(&ConnectionInfo::afterConnect, this, 200 shared_from_this())); 201 } 202 203 void afterConnect(const std::shared_ptr<ConnectionInfo>& /*self*/, 204 const boost::beast::error_code& ec, 205 const boost::asio::ip::tcp::endpoint& endpoint) 206 { 207 // The operation already timed out. We don't want do continue down 208 // this branch 209 if (ec && ec == boost::asio::error::operation_aborted) 210 { 211 return; 212 } 213 214 timer.cancel(); 215 if (ec) 216 { 217 BMCWEB_LOG_ERROR("Connect {}:{}, id: {} failed: {}", 218 endpoint.address().to_string(), endpoint.port(), 219 connId, ec.message()); 220 state = ConnState::connectFailed; 221 waitAndRetry(); 222 return; 223 } 224 BMCWEB_LOG_DEBUG("Connected to: {}:{}, id: {}", 225 endpoint.address().to_string(), endpoint.port(), 226 connId); 227 if (sslConn) 228 { 229 doSslHandshake(); 230 return; 231 } 232 state = ConnState::connected; 233 sendMessage(); 234 } 235 236 void doSslHandshake() 237 { 238 if (!sslConn) 239 { 240 return; 241 } 242 state = ConnState::handshakeInProgress; 243 timer.expires_after(std::chrono::seconds(30)); 244 timer.async_wait(std::bind_front(onTimeout, weak_from_this())); 245 sslConn->async_handshake( 246 boost::asio::ssl::stream_base::client, 247 std::bind_front(&ConnectionInfo::afterSslHandshake, this, 248 shared_from_this())); 249 } 250 251 void afterSslHandshake(const std::shared_ptr<ConnectionInfo>& /*self*/, 252 const boost::beast::error_code& ec) 253 { 254 // The operation already timed out. We don't want do continue down 255 // this branch 256 if (ec && ec == boost::asio::error::operation_aborted) 257 { 258 return; 259 } 260 261 timer.cancel(); 262 if (ec) 263 { 264 BMCWEB_LOG_ERROR("SSL Handshake failed - id: {} error: {}", connId, 265 ec.message()); 266 state = ConnState::handshakeFailed; 267 waitAndRetry(); 268 return; 269 } 270 BMCWEB_LOG_DEBUG("SSL Handshake successful - id: {}", connId); 271 state = ConnState::connected; 272 sendMessage(); 273 } 274 275 void sendMessage() 276 { 277 state = ConnState::sendInProgress; 278 279 // Set a timeout on the operation 280 timer.expires_after(std::chrono::seconds(30)); 281 timer.async_wait(std::bind_front(onTimeout, weak_from_this())); 282 283 // Send the HTTP request to the remote host 284 if (sslConn) 285 { 286 boost::beast::http::async_write( 287 *sslConn, req, 288 std::bind_front(&ConnectionInfo::afterWrite, this, 289 shared_from_this())); 290 } 291 else 292 { 293 boost::beast::http::async_write( 294 conn, req, 295 std::bind_front(&ConnectionInfo::afterWrite, this, 296 shared_from_this())); 297 } 298 } 299 300 void afterWrite(const std::shared_ptr<ConnectionInfo>& /*self*/, 301 const boost::beast::error_code& ec, size_t bytesTransferred) 302 { 303 // The operation already timed out. We don't want do continue down 304 // this branch 305 if (ec && ec == boost::asio::error::operation_aborted) 306 { 307 return; 308 } 309 310 timer.cancel(); 311 if (ec) 312 { 313 BMCWEB_LOG_ERROR("sendMessage() failed: {} {}", ec.message(), host); 314 state = ConnState::sendFailed; 315 waitAndRetry(); 316 return; 317 } 318 BMCWEB_LOG_DEBUG("sendMessage() bytes transferred: {}", 319 bytesTransferred); 320 321 recvMessage(); 322 } 323 324 void recvMessage() 325 { 326 state = ConnState::recvInProgress; 327 328 parser_type& thisParser = parser.emplace(std::piecewise_construct, 329 std::make_tuple()); 330 331 thisParser.body_limit(connPolicy->requestByteLimit); 332 333 timer.expires_after(std::chrono::seconds(30)); 334 timer.async_wait(std::bind_front(onTimeout, weak_from_this())); 335 336 // Receive the HTTP response 337 if (sslConn) 338 { 339 boost::beast::http::async_read( 340 *sslConn, buffer, thisParser, 341 std::bind_front(&ConnectionInfo::afterRead, this, 342 shared_from_this())); 343 } 344 else 345 { 346 boost::beast::http::async_read( 347 conn, buffer, thisParser, 348 std::bind_front(&ConnectionInfo::afterRead, this, 349 shared_from_this())); 350 } 351 } 352 353 void afterRead(const std::shared_ptr<ConnectionInfo>& /*self*/, 354 const boost::beast::error_code& ec, 355 const std::size_t& bytesTransferred) 356 { 357 // The operation already timed out. We don't want do continue down 358 // this branch 359 if (ec && ec == boost::asio::error::operation_aborted) 360 { 361 return; 362 } 363 364 timer.cancel(); 365 if (ec && ec != boost::asio::ssl::error::stream_truncated) 366 { 367 BMCWEB_LOG_ERROR("recvMessage() failed: {} from {}", ec.message(), 368 host); 369 state = ConnState::recvFailed; 370 waitAndRetry(); 371 return; 372 } 373 BMCWEB_LOG_DEBUG("recvMessage() bytes transferred: {}", 374 bytesTransferred); 375 if (!parser) 376 { 377 return; 378 } 379 BMCWEB_LOG_DEBUG("recvMessage() data: {}", parser->get().body().str()); 380 381 unsigned int respCode = parser->get().result_int(); 382 BMCWEB_LOG_DEBUG("recvMessage() Header Response Code: {}", respCode); 383 384 // Handle the case of stream_truncated. Some servers close the ssl 385 // connection uncleanly, so check to see if we got a full response 386 // before we handle this as an error. 387 if (!parser->is_done()) 388 { 389 state = ConnState::recvFailed; 390 waitAndRetry(); 391 return; 392 } 393 394 // Make sure the received response code is valid as defined by 395 // the associated retry policy 396 if (connPolicy->invalidResp(respCode)) 397 { 398 // The listener failed to receive the Sent-Event 399 BMCWEB_LOG_ERROR( 400 "recvMessage() Listener Failed to " 401 "receive Sent-Event. Header Response Code: {} from {}", 402 respCode, host); 403 state = ConnState::recvFailed; 404 waitAndRetry(); 405 return; 406 } 407 408 // Send is successful 409 // Reset the counter just in case this was after retrying 410 retryCount = 0; 411 412 // Keep the connection alive if server supports it 413 // Else close the connection 414 BMCWEB_LOG_DEBUG("recvMessage() keepalive : {}", parser->keep_alive()); 415 416 // Copy the response into a Response object so that it can be 417 // processed by the callback function. 418 res.response = parser->release(); 419 callback(parser->keep_alive(), connId, res); 420 res.clear(); 421 } 422 423 static void onTimeout(const std::weak_ptr<ConnectionInfo>& weakSelf, 424 const boost::system::error_code& ec) 425 { 426 if (ec == boost::asio::error::operation_aborted) 427 { 428 BMCWEB_LOG_DEBUG( 429 "async_wait failed since the operation is aborted"); 430 return; 431 } 432 if (ec) 433 { 434 BMCWEB_LOG_ERROR("async_wait failed: {}", ec.message()); 435 // If the timer fails, we need to close the socket anyway, same 436 // as if it expired. 437 } 438 std::shared_ptr<ConnectionInfo> self = weakSelf.lock(); 439 if (self == nullptr) 440 { 441 return; 442 } 443 self->waitAndRetry(); 444 } 445 446 void waitAndRetry() 447 { 448 if ((retryCount >= connPolicy->maxRetryAttempts) || 449 (state == ConnState::sslInitFailed)) 450 { 451 BMCWEB_LOG_ERROR("Maximum number of retries reached. {}", host); 452 BMCWEB_LOG_DEBUG("Retry policy: {}", connPolicy->retryPolicyAction); 453 454 if (connPolicy->retryPolicyAction == "TerminateAfterRetries") 455 { 456 // TODO: delete subscription 457 state = ConnState::terminated; 458 } 459 if (connPolicy->retryPolicyAction == "SuspendRetries") 460 { 461 state = ConnState::suspended; 462 } 463 464 // We want to return a 502 to indicate there was an error with 465 // the external server 466 res.result(boost::beast::http::status::bad_gateway); 467 callback(false, connId, res); 468 res.clear(); 469 470 // Reset the retrycount to zero so that client can try 471 // connecting again if needed 472 retryCount = 0; 473 return; 474 } 475 476 retryCount++; 477 478 BMCWEB_LOG_DEBUG("Attempt retry after {} seconds. RetryCount = {}", 479 connPolicy->retryIntervalSecs.count(), retryCount); 480 timer.expires_after(connPolicy->retryIntervalSecs); 481 timer.async_wait(std::bind_front(&ConnectionInfo::onTimerDone, this, 482 shared_from_this())); 483 } 484 485 void onTimerDone(const std::shared_ptr<ConnectionInfo>& /*self*/, 486 const boost::system::error_code& ec) 487 { 488 if (ec == boost::asio::error::operation_aborted) 489 { 490 BMCWEB_LOG_DEBUG( 491 "async_wait failed since the operation is aborted{}", 492 ec.message()); 493 } 494 else if (ec) 495 { 496 BMCWEB_LOG_ERROR("async_wait failed: {}", ec.message()); 497 // Ignore the error and continue the retry loop to attempt 498 // sending the event as per the retry policy 499 } 500 501 // Let's close the connection and restart from resolve. 502 shutdownConn(true); 503 } 504 505 void restartConnection() 506 { 507 BMCWEB_LOG_DEBUG("{}, id: {} restartConnection", host, 508 std::to_string(connId)); 509 initializeConnection(host.scheme() == "https"); 510 doResolve(); 511 } 512 513 void shutdownConn(bool retry) 514 { 515 boost::beast::error_code ec; 516 conn.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec); 517 conn.close(); 518 519 // not_connected happens sometimes so don't bother reporting it. 520 if (ec && ec != boost::beast::errc::not_connected) 521 { 522 BMCWEB_LOG_ERROR("{}, id: {} shutdown failed: {}", host, connId, 523 ec.message()); 524 } 525 else 526 { 527 BMCWEB_LOG_DEBUG("{}, id: {} closed gracefully", host, connId); 528 } 529 530 if (retry) 531 { 532 // Now let's try to resend the data 533 state = ConnState::retry; 534 restartConnection(); 535 } 536 else 537 { 538 state = ConnState::closed; 539 } 540 } 541 542 void doClose(bool retry = false) 543 { 544 if (!sslConn) 545 { 546 shutdownConn(retry); 547 return; 548 } 549 550 sslConn->async_shutdown( 551 std::bind_front(&ConnectionInfo::afterSslShutdown, this, 552 shared_from_this(), retry)); 553 } 554 555 void afterSslShutdown(const std::shared_ptr<ConnectionInfo>& /*self*/, 556 bool retry, const boost::system::error_code& ec) 557 { 558 if (ec) 559 { 560 BMCWEB_LOG_ERROR("{}, id: {} shutdown failed: {}", host, connId, 561 ec.message()); 562 } 563 else 564 { 565 BMCWEB_LOG_DEBUG("{}, id: {} closed gracefully", host, connId); 566 } 567 shutdownConn(retry); 568 } 569 570 void setCipherSuiteTLSext() 571 { 572 if (!sslConn) 573 { 574 return; 575 } 576 577 if (host.host_type() != boost::urls::host_type::name) 578 { 579 // Avoid setting SNI hostname if its IP address 580 return; 581 } 582 // Create a null terminated string for SSL 583 std::string hostname(host.encoded_host_address()); 584 // NOTE: The SSL_set_tlsext_host_name is defined in tlsv1.h header 585 // file but its having old style casting (name is cast to void*). 586 // Since bmcweb compiler treats all old-style-cast as error, its 587 // causing the build failure. So replaced the same macro inline and 588 // did corrected the code by doing static_cast to viod*. This has to 589 // be fixed in openssl library in long run. Set SNI Hostname (many 590 // hosts need this to handshake successfully) 591 if (SSL_ctrl(sslConn->native_handle(), SSL_CTRL_SET_TLSEXT_HOSTNAME, 592 TLSEXT_NAMETYPE_host_name, 593 static_cast<void*>(hostname.data())) == 0) 594 595 { 596 boost::beast::error_code ec{static_cast<int>(::ERR_get_error()), 597 boost::asio::error::get_ssl_category()}; 598 599 BMCWEB_LOG_ERROR("SSL_set_tlsext_host_name {}, id: {} failed: {}", 600 host, connId, ec.message()); 601 // Set state as sslInit failed so that we close the connection 602 // and take appropriate action as per retry configuration. 603 state = ConnState::sslInitFailed; 604 waitAndRetry(); 605 return; 606 } 607 } 608 609 void initializeConnection(bool ssl) 610 { 611 conn = boost::asio::ip::tcp::socket(ioc); 612 if (ssl) 613 { 614 std::optional<boost::asio::ssl::context> sslCtx = 615 ensuressl::getSSLClientContext(); 616 617 if (!sslCtx) 618 { 619 BMCWEB_LOG_ERROR("prepareSSLContext failed - {}, id: {}", host, 620 connId); 621 // Don't retry if failure occurs while preparing SSL context 622 // such as certificate is invalid or set cipher failure or 623 // set host name failure etc... Setting conn state to 624 // sslInitFailed and connection state will be transitioned 625 // to next state depending on retry policy set by 626 // subscription. 627 state = ConnState::sslInitFailed; 628 waitAndRetry(); 629 return; 630 } 631 sslConn.emplace(conn, *sslCtx); 632 setCipherSuiteTLSext(); 633 } 634 } 635 636 public: 637 explicit ConnectionInfo( 638 boost::asio::io_context& iocIn, const std::string& idIn, 639 const std::shared_ptr<ConnectionPolicy>& connPolicyIn, 640 const boost::urls::url_view_base& hostIn, unsigned int connIdIn) : 641 subId(idIn), 642 connPolicy(connPolicyIn), host(hostIn), connId(connIdIn), ioc(iocIn), 643 resolver(iocIn), conn(iocIn), timer(iocIn) 644 { 645 initializeConnection(host.scheme() == "https"); 646 } 647 }; 648 649 class ConnectionPool : public std::enable_shared_from_this<ConnectionPool> 650 { 651 private: 652 boost::asio::io_context& ioc; 653 std::string id; 654 std::shared_ptr<ConnectionPolicy> connPolicy; 655 boost::urls::url destIP; 656 std::vector<std::shared_ptr<ConnectionInfo>> connections; 657 boost::container::devector<PendingRequest> requestQueue; 658 659 friend class HttpClient; 660 661 // Configure a connections's request, callback, and retry info in 662 // preparation to begin sending the request 663 void setConnProps(ConnectionInfo& conn) 664 { 665 if (requestQueue.empty()) 666 { 667 BMCWEB_LOG_ERROR( 668 "setConnProps() should not have been called when requestQueue is empty"); 669 return; 670 } 671 672 PendingRequest& nextReq = requestQueue.front(); 673 conn.req = std::move(nextReq.req); 674 conn.callback = std::move(nextReq.callback); 675 676 BMCWEB_LOG_DEBUG("Setting properties for connection {}, id: {}", 677 conn.host, conn.connId); 678 679 // We can remove the request from the queue at this point 680 requestQueue.pop_front(); 681 } 682 683 // Gets called as part of callback after request is sent 684 // Reuses the connection if there are any requests waiting to be sent 685 // Otherwise closes the connection if it is not a keep-alive 686 void sendNext(bool keepAlive, uint32_t connId) 687 { 688 auto conn = connections[connId]; 689 690 // Allow the connection's handler to be deleted 691 // This is needed because of Redfish Aggregation passing an 692 // AsyncResponse shared_ptr to this callback 693 conn->callback = nullptr; 694 695 // Reuse the connection to send the next request in the queue 696 if (!requestQueue.empty()) 697 { 698 BMCWEB_LOG_DEBUG( 699 "{} requests remaining in queue for {}, reusing connection {}", 700 requestQueue.size(), destIP, connId); 701 702 setConnProps(*conn); 703 704 if (keepAlive) 705 { 706 conn->sendMessage(); 707 } 708 else 709 { 710 // Server is not keep-alive enabled so we need to close the 711 // connection and then start over from resolve 712 conn->doClose(); 713 conn->doResolve(); 714 } 715 return; 716 } 717 718 // No more messages to send so close the connection if necessary 719 if (keepAlive) 720 { 721 conn->state = ConnState::idle; 722 } 723 else 724 { 725 // Abort the connection since server is not keep-alive enabled 726 conn->state = ConnState::abortConnection; 727 conn->doClose(); 728 } 729 } 730 731 void sendData(std::string&& data, const boost::urls::url_view_base& destUri, 732 const boost::beast::http::fields& httpHeader, 733 const boost::beast::http::verb verb, 734 const std::function<void(Response&)>& resHandler) 735 { 736 // Construct the request to be sent 737 boost::beast::http::request<bmcweb::HttpBody> thisReq( 738 verb, destUri.encoded_target(), 11, "", httpHeader); 739 thisReq.set(boost::beast::http::field::host, 740 destUri.encoded_host_address()); 741 thisReq.keep_alive(true); 742 thisReq.body().str() = std::move(data); 743 thisReq.prepare_payload(); 744 auto cb = std::bind_front(&ConnectionPool::afterSendData, 745 weak_from_this(), resHandler); 746 // Reuse an existing connection if one is available 747 for (unsigned int i = 0; i < connections.size(); i++) 748 { 749 auto conn = connections[i]; 750 if ((conn->state == ConnState::idle) || 751 (conn->state == ConnState::initialized) || 752 (conn->state == ConnState::closed)) 753 { 754 conn->req = std::move(thisReq); 755 conn->callback = std::move(cb); 756 std::string commonMsg = std::format("{} from pool {}", i, id); 757 758 if (conn->state == ConnState::idle) 759 { 760 BMCWEB_LOG_DEBUG("Grabbing idle connection {}", commonMsg); 761 conn->sendMessage(); 762 } 763 else 764 { 765 BMCWEB_LOG_DEBUG("Reusing existing connection {}", 766 commonMsg); 767 conn->doResolve(); 768 } 769 return; 770 } 771 } 772 773 // All connections in use so create a new connection or add request 774 // to the queue 775 if (connections.size() < connPolicy->maxConnections) 776 { 777 BMCWEB_LOG_DEBUG("Adding new connection to pool {}", id); 778 auto conn = addConnection(); 779 conn->req = std::move(thisReq); 780 conn->callback = std::move(cb); 781 conn->doResolve(); 782 } 783 else if (requestQueue.size() < maxRequestQueueSize) 784 { 785 BMCWEB_LOG_DEBUG("Max pool size reached. Adding data to queue {}", 786 id); 787 requestQueue.emplace_back(std::move(thisReq), std::move(cb)); 788 } 789 else 790 { 791 // If we can't buffer the request then we should let the 792 // callback handle a 429 Too Many Requests dummy response 793 BMCWEB_LOG_ERROR("{} request queue full. Dropping request.", id); 794 Response dummyRes; 795 dummyRes.result(boost::beast::http::status::too_many_requests); 796 resHandler(dummyRes); 797 } 798 } 799 800 // Callback to be called once the request has been sent 801 static void afterSendData(const std::weak_ptr<ConnectionPool>& weakSelf, 802 const std::function<void(Response&)>& resHandler, 803 bool keepAlive, uint32_t connId, Response& res) 804 { 805 // Allow provided callback to perform additional processing of the 806 // request 807 resHandler(res); 808 809 // If requests remain in the queue then we want to reuse this 810 // connection to send the next request 811 std::shared_ptr<ConnectionPool> self = weakSelf.lock(); 812 if (!self) 813 { 814 BMCWEB_LOG_CRITICAL("{} Failed to capture connection", 815 logPtr(self.get())); 816 return; 817 } 818 819 self->sendNext(keepAlive, connId); 820 } 821 822 std::shared_ptr<ConnectionInfo>& addConnection() 823 { 824 unsigned int newId = static_cast<unsigned int>(connections.size()); 825 826 auto& ret = connections.emplace_back(std::make_shared<ConnectionInfo>( 827 ioc, id, connPolicy, destIP, newId)); 828 829 BMCWEB_LOG_DEBUG("Added connection {} to pool {}", 830 connections.size() - 1, id); 831 832 return ret; 833 } 834 835 public: 836 explicit ConnectionPool( 837 boost::asio::io_context& iocIn, const std::string& idIn, 838 const std::shared_ptr<ConnectionPolicy>& connPolicyIn, 839 const boost::urls::url_view_base& destIPIn) : 840 ioc(iocIn), 841 id(idIn), connPolicy(connPolicyIn), destIP(destIPIn) 842 { 843 BMCWEB_LOG_DEBUG("Initializing connection pool for {}", id); 844 845 // Initialize the pool with a single connection 846 addConnection(); 847 } 848 }; 849 850 class HttpClient 851 { 852 private: 853 std::unordered_map<std::string, std::shared_ptr<ConnectionPool>> 854 connectionPools; 855 boost::asio::io_context& ioc; 856 std::shared_ptr<ConnectionPolicy> connPolicy; 857 858 // Used as a dummy callback by sendData() in order to call 859 // sendDataWithCallback() 860 static void genericResHandler(const Response& res) 861 { 862 BMCWEB_LOG_DEBUG("Response handled with return code: {}", 863 res.resultInt()); 864 } 865 866 public: 867 HttpClient() = delete; 868 explicit HttpClient(boost::asio::io_context& iocIn, 869 const std::shared_ptr<ConnectionPolicy>& connPolicyIn) : 870 ioc(iocIn), 871 connPolicy(connPolicyIn) 872 {} 873 874 HttpClient(const HttpClient&) = delete; 875 HttpClient& operator=(const HttpClient&) = delete; 876 HttpClient(HttpClient&&) = delete; 877 HttpClient& operator=(HttpClient&&) = delete; 878 ~HttpClient() = default; 879 880 // Send a request to destIP where additional processing of the 881 // result is not required 882 void sendData(std::string&& data, const boost::urls::url_view_base& destUri, 883 const boost::beast::http::fields& httpHeader, 884 const boost::beast::http::verb verb) 885 { 886 const std::function<void(Response&)> cb = genericResHandler; 887 sendDataWithCallback(std::move(data), destUri, httpHeader, verb, cb); 888 } 889 890 // Send request to destIP and use the provided callback to 891 // handle the response 892 void sendDataWithCallback(std::string&& data, 893 const boost::urls::url_view_base& destUrl, 894 const boost::beast::http::fields& httpHeader, 895 const boost::beast::http::verb verb, 896 const std::function<void(Response&)>& resHandler) 897 { 898 std::string clientKey = std::format("{}://{}", destUrl.scheme(), 899 destUrl.encoded_host_and_port()); 900 auto pool = connectionPools.try_emplace(clientKey); 901 if (pool.first->second == nullptr) 902 { 903 pool.first->second = std::make_shared<ConnectionPool>( 904 ioc, clientKey, connPolicy, destUrl); 905 } 906 // Send the data using either the existing connection pool or the 907 // newly created connection pool 908 pool.first->second->sendData(std::move(data), destUrl, httpHeader, verb, 909 resHandler); 910 } 911 }; 912 } // namespace crow 913