/* // Copyright (c) 2020 Intel Corporation // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. */ #pragma once #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace crow { // It is assumed that the BMC should be able to handle 4 parallel connections constexpr uint8_t maxPoolSize = 4; constexpr uint8_t maxRequestQueueSize = 50; constexpr unsigned int httpReadBodyLimit = 16384; constexpr unsigned int httpReadBufferSize = 4096; enum class ConnState { initialized, resolveInProgress, resolveFailed, connectInProgress, connectFailed, connected, sendInProgress, sendFailed, recvInProgress, recvFailed, idle, closeInProgress, closed, suspended, terminated, abortConnection, retry }; static inline boost::system::error_code defaultRetryHandler(unsigned int respCode) { // As a default, assume 200X is alright BMCWEB_LOG_DEBUG << "Using default check for response code validity"; if ((respCode < 200) || (respCode >= 300)) { return boost::system::errc::make_error_code( boost::system::errc::result_out_of_range); } // Return 0 if the response code is valid return boost::system::errc::make_error_code(boost::system::errc::success); }; // We need to allow retry information to be set before a message has been sent // and a connection pool has been created struct RetryPolicyData { uint32_t maxRetryAttempts = 5; std::chrono::seconds retryIntervalSecs = std::chrono::seconds(0); std::string retryPolicyAction = "TerminateAfterRetries"; std::function invalidResp = defaultRetryHandler; }; struct PendingRequest { boost::beast::http::request req; std::function callback; RetryPolicyData retryPolicy; PendingRequest( boost::beast::http::request&& reqIn, const std::function& callbackIn, const RetryPolicyData& retryPolicyIn) : req(std::move(reqIn)), callback(callbackIn), retryPolicy(retryPolicyIn) {} }; class ConnectionInfo : public std::enable_shared_from_this { private: ConnState state = ConnState::initialized; uint32_t retryCount = 0; bool runningTimer = false; std::string subId; std::string host; uint16_t port; uint32_t connId; // Retry policy information // This should be updated before each message is sent RetryPolicyData retryPolicy; // Data buffers boost::beast::http::request req; std::optional< boost::beast::http::response_parser> parser; boost::beast::flat_static_buffer buffer; Response res; // Ascync callables std::function callback; crow::async_resolve::Resolver resolver; boost::beast::tcp_stream conn; boost::asio::steady_timer timer; friend class ConnectionPool; void doResolve() { state = ConnState::resolveInProgress; BMCWEB_LOG_DEBUG << "Trying to resolve: " << host << ":" << std::to_string(port) << ", id: " << std::to_string(connId); auto respHandler = [self(shared_from_this())]( const boost::beast::error_code ec, const std::vector& endpointList) { if (ec || (endpointList.empty())) { BMCWEB_LOG_ERROR << "Resolve failed: " << ec.message(); self->state = ConnState::resolveFailed; self->waitAndRetry(); return; } BMCWEB_LOG_DEBUG << "Resolved " << self->host << ":" << std::to_string(self->port) << ", id: " << std::to_string(self->connId); self->doConnect(endpointList); }; resolver.asyncResolve(host, port, std::move(respHandler)); } void doConnect( const std::vector& endpointList) { state = ConnState::connectInProgress; BMCWEB_LOG_DEBUG << "Trying to connect to: " << host << ":" << std::to_string(port) << ", id: " << std::to_string(connId); conn.expires_after(std::chrono::seconds(30)); conn.async_connect(endpointList, [self(shared_from_this())]( const boost::beast::error_code ec, const boost::asio::ip::tcp::endpoint& endpoint) { if (ec) { BMCWEB_LOG_ERROR << "Connect " << endpoint.address().to_string() << ":" << std::to_string(endpoint.port()) << ", id: " << std::to_string(self->connId) << " failed: " << ec.message(); self->state = ConnState::connectFailed; self->waitAndRetry(); return; } BMCWEB_LOG_DEBUG << "Connected to: " << endpoint.address().to_string() << ":" << std::to_string(endpoint.port()) << ", id: " << std::to_string(self->connId); self->state = ConnState::connected; self->sendMessage(); }); } void sendMessage() { state = ConnState::sendInProgress; // Set a timeout on the operation conn.expires_after(std::chrono::seconds(30)); // Send the HTTP request to the remote host boost::beast::http::async_write( conn, req, [self(shared_from_this())](const boost::beast::error_code& ec, const std::size_t& bytesTransferred) { if (ec) { BMCWEB_LOG_ERROR << "sendMessage() failed: " << ec.message(); self->state = ConnState::sendFailed; self->waitAndRetry(); return; } BMCWEB_LOG_DEBUG << "sendMessage() bytes transferred: " << bytesTransferred; boost::ignore_unused(bytesTransferred); self->recvMessage(); }); } void recvMessage() { state = ConnState::recvInProgress; parser.emplace(std::piecewise_construct, std::make_tuple()); parser->body_limit(httpReadBodyLimit); // Receive the HTTP response boost::beast::http::async_read( conn, buffer, *parser, [self(shared_from_this())](const boost::beast::error_code& ec, const std::size_t& bytesTransferred) { if (ec) { BMCWEB_LOG_ERROR << "recvMessage() failed: " << ec.message(); self->state = ConnState::recvFailed; self->waitAndRetry(); return; } BMCWEB_LOG_DEBUG << "recvMessage() bytes transferred: " << bytesTransferred; BMCWEB_LOG_DEBUG << "recvMessage() data: " << self->parser->get().body(); unsigned int respCode = self->parser->get().result_int(); BMCWEB_LOG_DEBUG << "recvMessage() Header Response Code: " << respCode; // Make sure the received response code is valid as defined by // the associated retry policy if (self->retryPolicy.invalidResp(respCode)) { // The listener failed to receive the Sent-Event BMCWEB_LOG_ERROR << "recvMessage() Listener Failed to " "receive Sent-Event. Header Response Code: " << respCode; self->state = ConnState::recvFailed; self->waitAndRetry(); return; } // Send is successful // Reset the counter just in case this was after retrying self->retryCount = 0; // Keep the connection alive if server supports it // Else close the connection BMCWEB_LOG_DEBUG << "recvMessage() keepalive : " << self->parser->keep_alive(); // Copy the response into a Response object so that it can be // processed by the callback function. self->res.clear(); self->res.stringResponse = self->parser->release(); self->callback(self->parser->keep_alive(), self->connId, self->res); }); } void waitAndRetry() { if (retryCount >= retryPolicy.maxRetryAttempts) { BMCWEB_LOG_ERROR << "Maximum number of retries reached."; BMCWEB_LOG_DEBUG << "Retry policy: " << retryPolicy.retryPolicyAction; // We want to return a 502 to indicate there was an error with the // external server res.clear(); redfish::messages::operationFailed(res); if (retryPolicy.retryPolicyAction == "TerminateAfterRetries") { // TODO: delete subscription state = ConnState::terminated; callback(false, connId, res); } if (retryPolicy.retryPolicyAction == "SuspendRetries") { state = ConnState::suspended; callback(false, connId, res); } // Reset the retrycount to zero so that client can try connecting // again if needed retryCount = 0; return; } if (runningTimer) { BMCWEB_LOG_DEBUG << "Retry timer is already running."; return; } runningTimer = true; retryCount++; BMCWEB_LOG_DEBUG << "Attempt retry after " << std::to_string( retryPolicy.retryIntervalSecs.count()) << " seconds. RetryCount = " << retryCount; timer.expires_after(retryPolicy.retryIntervalSecs); timer.async_wait( [self(shared_from_this())](const boost::system::error_code ec) { if (ec == boost::asio::error::operation_aborted) { BMCWEB_LOG_DEBUG << "async_wait failed since the operation is aborted" << ec.message(); } else if (ec) { BMCWEB_LOG_ERROR << "async_wait failed: " << ec.message(); // Ignore the error and continue the retry loop to attempt // sending the event as per the retry policy } self->runningTimer = false; // Let's close the connection and restart from resolve. self->doCloseAndRetry(); }); } void doClose() { state = ConnState::closeInProgress; boost::beast::error_code ec; conn.socket().shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec); conn.close(); // not_connected happens sometimes so don't bother reporting it. if (ec && ec != boost::beast::errc::not_connected) { BMCWEB_LOG_ERROR << host << ":" << std::to_string(port) << ", id: " << std::to_string(connId) << "shutdown failed: " << ec.message(); return; } BMCWEB_LOG_DEBUG << host << ":" << std::to_string(port) << ", id: " << std::to_string(connId) << " closed gracefully"; state = ConnState::closed; } void doCloseAndRetry() { state = ConnState::closeInProgress; boost::beast::error_code ec; conn.socket().shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec); conn.close(); // not_connected happens sometimes so don't bother reporting it. if (ec && ec != boost::beast::errc::not_connected) { BMCWEB_LOG_ERROR << host << ":" << std::to_string(port) << ", id: " << std::to_string(connId) << "shutdown failed: " << ec.message(); return; } BMCWEB_LOG_DEBUG << host << ":" << std::to_string(port) << ", id: " << std::to_string(connId) << " closed gracefully"; // Now let's try to resend the data state = ConnState::retry; doResolve(); } public: explicit ConnectionInfo(boost::asio::io_context& ioc, const std::string& idIn, const std::string& destIP, const uint16_t destPort, const unsigned int connIdIn) : subId(idIn), host(destIP), port(destPort), connId(connIdIn), conn(ioc), timer(ioc) {} }; class ConnectionPool : public std::enable_shared_from_this { private: boost::asio::io_context& ioc; const std::string id; const std::string destIP; const uint16_t destPort; std::vector> connections; boost::container::devector requestQueue; friend class HttpClient; // Configure a connections's request, callback, and retry info in // preparation to begin sending the request void setConnProps(ConnectionInfo& conn) { if (requestQueue.empty()) { BMCWEB_LOG_ERROR << "setConnProps() should not have been called when requestQueue is empty"; return; } auto nextReq = requestQueue.front(); conn.retryPolicy = std::move(nextReq.retryPolicy); conn.req = std::move(nextReq.req); conn.callback = std::move(nextReq.callback); BMCWEB_LOG_DEBUG << "Setting properties for connection " << conn.host << ":" << std::to_string(conn.port) << ", id: " << std::to_string(conn.connId); // We can remove the request from the queue at this point requestQueue.pop_front(); } // Configures a connection to use the specific retry policy. inline void setConnRetryPolicy(ConnectionInfo& conn, const RetryPolicyData& retryPolicy) { BMCWEB_LOG_DEBUG << destIP << ":" << std::to_string(destPort) << ", id: " << std::to_string(conn.connId); conn.retryPolicy = retryPolicy; } // Gets called as part of callback after request is sent // Reuses the connection if there are any requests waiting to be sent // Otherwise closes the connection if it is not a keep-alive void sendNext(bool keepAlive, uint32_t connId) { auto conn = connections[connId]; // Reuse the connection to send the next request in the queue if (!requestQueue.empty()) { BMCWEB_LOG_DEBUG << std::to_string(requestQueue.size()) << " requests remaining in queue for " << destIP << ":" << std::to_string(destPort) << ", reusing connnection " << std::to_string(connId); setConnProps(*conn); if (keepAlive) { conn->sendMessage(); } else { // Server is not keep-alive enabled so we need to close the // connection and then start over from resolve conn->doClose(); conn->doResolve(); } return; } // No more messages to send so close the connection if necessary if (keepAlive) { conn->state = ConnState::idle; } else { // Abort the connection since server is not keep-alive enabled conn->state = ConnState::abortConnection; conn->doClose(); } } void sendData(std::string& data, const std::string& destUri, const boost::beast::http::fields& httpHeader, const boost::beast::http::verb verb, const RetryPolicyData& retryPolicy, const std::function& resHandler) { std::weak_ptr weakSelf = weak_from_this(); // Callback to be called once the request has been sent auto cb = [weakSelf, resHandler](bool keepAlive, uint32_t connId, Response& res) { // Allow provided callback to perform additional processing of the // request resHandler(res); // If requests remain in the queue then we want to reuse this // connection to send the next request std::shared_ptr self = weakSelf.lock(); if (!self) { BMCWEB_LOG_CRITICAL << self << " Failed to capture connection"; return; } self->sendNext(keepAlive, connId); }; // Construct the request to be sent boost::beast::http::request thisReq( verb, destUri, 11, "", httpHeader); thisReq.set(boost::beast::http::field::host, destIP); thisReq.keep_alive(true); thisReq.body() = std::move(data); thisReq.prepare_payload(); // Reuse an existing connection if one is available for (unsigned int i = 0; i < connections.size(); i++) { auto conn = connections[i]; if ((conn->state == ConnState::idle) || (conn->state == ConnState::initialized) || (conn->state == ConnState::closed)) { conn->req = std::move(thisReq); conn->callback = std::move(cb); setConnRetryPolicy(*conn, retryPolicy); std::string commonMsg = std::to_string(i) + " from pool " + destIP + ":" + std::to_string(destPort); if (conn->state == ConnState::idle) { BMCWEB_LOG_DEBUG << "Grabbing idle connection " << commonMsg; conn->sendMessage(); } else { BMCWEB_LOG_DEBUG << "Reusing existing connection " << commonMsg; conn->doResolve(); } return; } } // All connections in use so create a new connection or add request to // the queue if (connections.size() < maxPoolSize) { BMCWEB_LOG_DEBUG << "Adding new connection to pool " << destIP << ":" << std::to_string(destPort); auto conn = addConnection(); conn->req = std::move(thisReq); conn->callback = std::move(cb); setConnRetryPolicy(*conn, retryPolicy); conn->doResolve(); } else if (requestQueue.size() < maxRequestQueueSize) { BMCWEB_LOG_ERROR << "Max pool size reached. Adding data to queue."; requestQueue.emplace_back(std::move(thisReq), std::move(cb), retryPolicy); } else { BMCWEB_LOG_ERROR << destIP << ":" << std::to_string(destPort) << " request queue full. Dropping request."; } } std::shared_ptr& addConnection() { unsigned int newId = static_cast(connections.size()); auto& ret = connections.emplace_back( std::make_shared(ioc, id, destIP, destPort, newId)); BMCWEB_LOG_DEBUG << "Added connection " << std::to_string(connections.size() - 1) << " to pool " << destIP << ":" << std::to_string(destPort); return ret; } public: explicit ConnectionPool(boost::asio::io_context& iocIn, const std::string& idIn, const std::string& destIPIn, const uint16_t destPortIn) : ioc(iocIn), id(idIn), destIP(destIPIn), destPort(destPortIn) { BMCWEB_LOG_DEBUG << "Initializing connection pool for " << destIP << ":" << std::to_string(destPort); // Initialize the pool with a single connection addConnection(); } }; class HttpClient { private: std::unordered_map> connectionPools; boost::asio::io_context& ioc = crow::connections::systemBus->get_io_context(); std::unordered_map retryInfo; HttpClient() = default; // Used as a dummy callback by sendData() in order to call // sendDataWithCallback() static void genericResHandler(Response& res) { BMCWEB_LOG_DEBUG << "Response handled with return code: " << std::to_string(res.resultInt()); } public: HttpClient(const HttpClient&) = delete; HttpClient& operator=(const HttpClient&) = delete; HttpClient(HttpClient&&) = delete; HttpClient& operator=(HttpClient&&) = delete; ~HttpClient() = default; static HttpClient& getInstance() { static HttpClient handler; return handler; } // Send a request to destIP:destPort where additional processing of the // result is not required void sendData(std::string& data, const std::string& id, const std::string& destIP, const uint16_t destPort, const std::string& destUri, const boost::beast::http::fields& httpHeader, const boost::beast::http::verb verb, const std::string& retryPolicyName) { std::function cb = genericResHandler; sendDataWithCallback(data, id, destIP, destPort, destUri, httpHeader, verb, retryPolicyName, cb); } // Send request to destIP:destPort and use the provided callback to // handle the response void sendDataWithCallback(std::string& data, const std::string& id, const std::string& destIP, const uint16_t destPort, const std::string& destUri, const boost::beast::http::fields& httpHeader, const boost::beast::http::verb verb, const std::string& retryPolicyName, const std::function& resHandler) { std::string clientKey = destIP + ":" + std::to_string(destPort); // Use nullptr to avoid creating a ConnectionPool each time auto result = connectionPools.try_emplace(clientKey, nullptr); if (result.second) { // Now actually create the ConnectionPool shared_ptr since it does // not already exist result.first->second = std::make_shared(ioc, id, destIP, destPort); BMCWEB_LOG_DEBUG << "Created connection pool for " << clientKey; } else { BMCWEB_LOG_DEBUG << "Using existing connection pool for " << clientKey; } // Get the associated retry policy auto policy = retryInfo.try_emplace(retryPolicyName); if (policy.second) { BMCWEB_LOG_DEBUG << "Creating retry policy \"" << retryPolicyName << "\" with default values"; } // Send the data using either the existing connection pool or the newly // created connection pool result.first->second->sendData(data, destUri, httpHeader, verb, policy.first->second, resHandler); } void setRetryConfig( const uint32_t retryAttempts, const uint32_t retryTimeoutInterval, const std::function& invalidResp, const std::string& retryPolicyName) { // We need to create the retry policy if one does not already exist for // the given retryPolicyName auto result = retryInfo.try_emplace(retryPolicyName); if (result.second) { BMCWEB_LOG_DEBUG << "setRetryConfig(): Creating new retry policy \"" << retryPolicyName << "\""; } else { BMCWEB_LOG_DEBUG << "setRetryConfig(): Updating retry info for \"" << retryPolicyName << "\""; } result.first->second.maxRetryAttempts = retryAttempts; result.first->second.retryIntervalSecs = std::chrono::seconds(retryTimeoutInterval); result.first->second.invalidResp = invalidResp; } void setRetryPolicy(const std::string& retryPolicy, const std::string& retryPolicyName) { // We need to create the retry policy if one does not already exist for // the given retryPolicyName auto result = retryInfo.try_emplace(retryPolicyName); if (result.second) { BMCWEB_LOG_DEBUG << "setRetryPolicy(): Creating new retry policy \"" << retryPolicyName << "\""; } else { BMCWEB_LOG_DEBUG << "setRetryPolicy(): Updating retry policy for \"" << retryPolicyName << "\""; } result.first->second.retryPolicyAction = retryPolicy; } }; } // namespace crow