/* // Copyright (c) 2020 Intel Corporation // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. */ #pragma once #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace crow { static constexpr uint8_t maxRequestQueueSize = 50; static constexpr unsigned int httpReadBodyLimit = 8192; enum class ConnState { initialized, resolveInProgress, resolveFailed, connectInProgress, connectFailed, connected, sendInProgress, sendFailed, recvInProgress, recvFailed, idle, closeInProgress, closed, suspended, terminated, abortConnection, retry }; class HttpClient : public std::enable_shared_from_this { private: crow::async_resolve::Resolver resolver; boost::beast::tcp_stream conn; boost::asio::steady_timer timer; boost::beast::flat_static_buffer buffer; boost::beast::http::request req; std::optional< boost::beast::http::response_parser> parser; boost::circular_buffer_space_optimized requestDataQueue{}; ConnState state = ConnState::initialized; std::string subId; std::string host; std::string port; uint32_t retryCount = 0; uint32_t maxRetryAttempts = 5; uint32_t retryIntervalSecs = 0; std::string retryPolicyAction = "TerminateAfterRetries"; bool runningTimer = false; void doResolve() { state = ConnState::resolveInProgress; BMCWEB_LOG_DEBUG << "Trying to resolve: " << host << ":" << port; auto respHandler = [self(shared_from_this())]( const boost::beast::error_code ec, const std::vector& endpointList) { if (ec || (endpointList.size() == 0)) { BMCWEB_LOG_ERROR << "Resolve failed: " << ec.message(); self->state = ConnState::resolveFailed; self->handleConnState(); return; } BMCWEB_LOG_DEBUG << "Resolved"; self->doConnect(endpointList); }; resolver.asyncResolve(host, port, std::move(respHandler)); } void doConnect( const std::vector& endpointList) { state = ConnState::connectInProgress; BMCWEB_LOG_DEBUG << "Trying to connect to: " << host << ":" << port; conn.expires_after(std::chrono::seconds(30)); conn.async_connect( endpointList, [self(shared_from_this())]( const boost::beast::error_code ec, const boost::asio::ip::tcp::endpoint& endpoint) { if (ec) { BMCWEB_LOG_ERROR << "Connect " << endpoint << " failed: " << ec.message(); self->state = ConnState::connectFailed; self->handleConnState(); return; } BMCWEB_LOG_DEBUG << "Connected to: " << endpoint; self->state = ConnState::connected; self->handleConnState(); }); } void sendMessage(const std::string& data) { state = ConnState::sendInProgress; req.body() = data; req.prepare_payload(); // Set a timeout on the operation conn.expires_after(std::chrono::seconds(30)); // Send the HTTP request to the remote host boost::beast::http::async_write( conn, req, [self(shared_from_this())](const boost::beast::error_code& ec, const std::size_t& bytesTransferred) { if (ec) { BMCWEB_LOG_ERROR << "sendMessage() failed: " << ec.message(); self->state = ConnState::sendFailed; self->handleConnState(); return; } BMCWEB_LOG_DEBUG << "sendMessage() bytes transferred: " << bytesTransferred; boost::ignore_unused(bytesTransferred); self->recvMessage(); }); } void recvMessage() { state = ConnState::recvInProgress; parser.emplace(std::piecewise_construct, std::make_tuple()); parser->body_limit(httpReadBodyLimit); // Check only for the response header parser->skip(true); // Receive the HTTP response boost::beast::http::async_read( conn, buffer, *parser, [self(shared_from_this())](const boost::beast::error_code& ec, const std::size_t& bytesTransferred) { if (ec) { BMCWEB_LOG_ERROR << "recvMessage() failed: " << ec.message(); self->state = ConnState::recvFailed; self->handleConnState(); return; } BMCWEB_LOG_DEBUG << "recvMessage() bytes transferred: " << bytesTransferred; BMCWEB_LOG_DEBUG << "recvMessage() data: " << self->parser->get(); unsigned int respCode = self->parser->get().result_int(); BMCWEB_LOG_DEBUG << "recvMessage() Header Response Code: " << respCode; // 2XX response is considered to be successful if ((respCode < 200) || (respCode >= 300)) { // The listener failed to receive the Sent-Event BMCWEB_LOG_ERROR << "recvMessage() Listener Failed to " "receive Sent-Event"; self->state = ConnState::recvFailed; self->handleConnState(); return; } // Send is successful, Lets remove data from queue // check for next request data in queue. if (!self->requestDataQueue.empty()) { self->requestDataQueue.pop_front(); } self->state = ConnState::idle; // Keep the connection alive if server supports it // Else close the connection BMCWEB_LOG_DEBUG << "recvMessage() keepalive : " << self->parser->keep_alive(); if (!self->parser->keep_alive()) { // Abort the connection since server is not keep-alive // enabled self->state = ConnState::abortConnection; } self->handleConnState(); }); } void doClose() { state = ConnState::closeInProgress; boost::beast::error_code ec; conn.socket().shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec); conn.close(); // not_connected happens sometimes so don't bother reporting it. if (ec && ec != boost::beast::errc::not_connected) { BMCWEB_LOG_ERROR << "shutdown failed: " << ec.message(); return; } BMCWEB_LOG_DEBUG << "Connection closed gracefully"; if ((state != ConnState::suspended) && (state != ConnState::terminated)) { state = ConnState::closed; handleConnState(); } } void waitAndRetry() { if (retryCount >= maxRetryAttempts) { BMCWEB_LOG_ERROR << "Maximum number of retries reached."; // Clear queue. while (!requestDataQueue.empty()) { requestDataQueue.pop_front(); } BMCWEB_LOG_DEBUG << "Retry policy: " << retryPolicyAction; if (retryPolicyAction == "TerminateAfterRetries") { // TODO: delete subscription state = ConnState::terminated; } if (retryPolicyAction == "SuspendRetries") { state = ConnState::suspended; } // Reset the retrycount to zero so that client can try connecting // again if needed retryCount = 0; handleConnState(); return; } if (runningTimer) { BMCWEB_LOG_DEBUG << "Retry timer is already running."; return; } runningTimer = true; retryCount++; BMCWEB_LOG_DEBUG << "Attempt retry after " << retryIntervalSecs << " seconds. RetryCount = " << retryCount; timer.expires_after(std::chrono::seconds(retryIntervalSecs)); timer.async_wait( [self = shared_from_this()](const boost::system::error_code ec) { if (ec == boost::asio::error::operation_aborted) { BMCWEB_LOG_DEBUG << "async_wait failed since the operation is aborted" << ec.message(); } else if (ec) { BMCWEB_LOG_ERROR << "async_wait failed: " << ec.message(); // Ignore the error and continue the retry loop to attempt // sending the event as per the retry policy } self->runningTimer = false; // Lets close connection and start from resolve. self->doClose(); }); return; } void handleConnState() { switch (state) { case ConnState::resolveInProgress: case ConnState::connectInProgress: case ConnState::sendInProgress: case ConnState::recvInProgress: case ConnState::closeInProgress: { BMCWEB_LOG_DEBUG << "Async operation is already in progress"; break; } case ConnState::initialized: case ConnState::closed: { if (requestDataQueue.empty()) { BMCWEB_LOG_DEBUG << "requestDataQueue is empty"; return; } doResolve(); break; } case ConnState::suspended: case ConnState::terminated: { doClose(); break; } case ConnState::resolveFailed: case ConnState::connectFailed: case ConnState::sendFailed: case ConnState::recvFailed: case ConnState::retry: { // In case of failures during connect and handshake // the retry policy will be applied waitAndRetry(); break; } case ConnState::connected: case ConnState::idle: { // State idle means, previous attempt is successful // State connected means, client connection is established // successfully if (requestDataQueue.empty()) { BMCWEB_LOG_DEBUG << "requestDataQueue is empty"; return; } std::string data = requestDataQueue.front(); sendMessage(data); break; } case ConnState::abortConnection: { // Server did not want to keep alive the session doClose(); break; } default: break; } } public: explicit HttpClient(boost::asio::io_context& ioc, const std::string& id, const std::string& destIP, const std::string& destPort, const std::string& destUri, const boost::beast::http::fields& httpHeader) : conn(ioc), timer(ioc), req(boost::beast::http::verb::post, destUri, 11, "", httpHeader), subId(id), host(destIP), port(destPort) { req.set(boost::beast::http::field::host, host); req.keep_alive(true); } void sendData(const std::string& data) { if ((state == ConnState::suspended) || (state == ConnState::terminated)) { return; } if (requestDataQueue.size() <= maxRequestQueueSize) { requestDataQueue.push_back(data); handleConnState(); } else { BMCWEB_LOG_ERROR << "Request queue is full. So ignoring data."; } return; } void setRetryConfig(const uint32_t retryAttempts, const uint32_t retryTimeoutInterval) { maxRetryAttempts = retryAttempts; retryIntervalSecs = retryTimeoutInterval; } void setRetryPolicy(const std::string& retryPolicy) { retryPolicyAction = retryPolicy; } }; } // namespace crow