1 /* 2 // Copyright (c) 2020 Intel Corporation 3 // 4 // Licensed under the Apache License, Version 2.0 (the "License"); 5 // you may not use this file except in compliance with the License. 6 // You may obtain a copy of the License at 7 // 8 // http://www.apache.org/licenses/LICENSE-2.0 9 // 10 // Unless required by applicable law or agreed to in writing, software 11 // distributed under the License is distributed on an "AS IS" BASIS, 12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 // See the License for the specific language governing permissions and 14 // limitations under the License. 15 */ 16 #pragma once 17 #include <boost/asio/steady_timer.hpp> 18 #include <boost/beast/core/flat_buffer.hpp> 19 #include <boost/beast/core/tcp_stream.hpp> 20 #include <boost/beast/http/message.hpp> 21 #include <boost/beast/version.hpp> 22 23 #include <cstdlib> 24 #include <functional> 25 #include <iostream> 26 #include <memory> 27 #include <queue> 28 #include <string> 29 30 namespace crow 31 { 32 33 static constexpr uint8_t maxRequestQueueSize = 50; 34 35 enum class ConnState 36 { 37 initialized, 38 connectInProgress, 39 connectFailed, 40 connected, 41 sendInProgress, 42 sendFailed, 43 recvFailed, 44 idle, 45 suspended, 46 closed, 47 terminated 48 }; 49 50 class HttpClient : public std::enable_shared_from_this<HttpClient> 51 { 52 private: 53 boost::beast::tcp_stream conn; 54 boost::asio::steady_timer timer; 55 boost::beast::flat_buffer buffer; 56 boost::beast::http::request<boost::beast::http::string_body> req; 57 boost::beast::http::response<boost::beast::http::string_body> res; 58 boost::asio::ip::tcp::resolver::results_type endpoint; 59 std::vector<std::pair<std::string, std::string>> headers; 60 std::queue<std::string> requestDataQueue; 61 ConnState state; 62 std::string subId; 63 std::string host; 64 std::string port; 65 std::string uri; 66 uint32_t retryCount; 67 uint32_t maxRetryAttempts; 68 uint32_t retryIntervalSecs; 69 std::string retryPolicyAction; 70 bool runningTimer; 71 72 void doConnect() 73 { 74 if (state == ConnState::connectInProgress) 75 { 76 return; 77 } 78 state = ConnState::connectInProgress; 79 80 BMCWEB_LOG_DEBUG << "Trying to connect to: " << host << ":" << port; 81 // Set a timeout on the operation 82 conn.expires_after(std::chrono::seconds(30)); 83 conn.async_connect(endpoint, [self(shared_from_this())]( 84 const boost::beast::error_code& ec, 85 const boost::asio::ip::tcp::resolver:: 86 results_type::endpoint_type& ep) { 87 if (ec) 88 { 89 BMCWEB_LOG_ERROR << "Connect " << ep 90 << " failed: " << ec.message(); 91 self->state = ConnState::connectFailed; 92 self->checkQueue(); 93 return; 94 } 95 self->state = ConnState::connected; 96 BMCWEB_LOG_DEBUG << "Connected to: " << ep; 97 98 self->checkQueue(); 99 }); 100 } 101 102 void sendMessage(const std::string& data) 103 { 104 if (state == ConnState::sendInProgress) 105 { 106 return; 107 } 108 state = ConnState::sendInProgress; 109 110 BMCWEB_LOG_DEBUG << __FUNCTION__ << "(): " << host << ":" << port; 111 112 req.version(static_cast<int>(11)); // HTTP 1.1 113 req.target(uri); 114 req.method(boost::beast::http::verb::post); 115 116 // Set headers 117 for (const auto& [key, value] : headers) 118 { 119 req.set(key, value); 120 } 121 req.set(boost::beast::http::field::host, host); 122 req.keep_alive(true); 123 124 req.body() = data; 125 req.prepare_payload(); 126 127 // Set a timeout on the operation 128 conn.expires_after(std::chrono::seconds(30)); 129 130 // Send the HTTP request to the remote host 131 boost::beast::http::async_write( 132 conn, req, 133 [self(shared_from_this())](const boost::beast::error_code& ec, 134 const std::size_t& bytesTransferred) { 135 if (ec) 136 { 137 BMCWEB_LOG_ERROR << "sendMessage() failed: " 138 << ec.message(); 139 self->state = ConnState::sendFailed; 140 self->checkQueue(); 141 return; 142 } 143 BMCWEB_LOG_DEBUG << "sendMessage() bytes transferred: " 144 << bytesTransferred; 145 boost::ignore_unused(bytesTransferred); 146 147 self->recvMessage(); 148 }); 149 } 150 151 void recvMessage() 152 { 153 // Receive the HTTP response 154 boost::beast::http::async_read( 155 conn, buffer, res, 156 [self(shared_from_this())](const boost::beast::error_code& ec, 157 const std::size_t& bytesTransferred) { 158 if (ec) 159 { 160 BMCWEB_LOG_ERROR << "recvMessage() failed: " 161 << ec.message(); 162 self->state = ConnState::recvFailed; 163 self->checkQueue(); 164 return; 165 } 166 BMCWEB_LOG_DEBUG << "recvMessage() bytes transferred: " 167 << bytesTransferred; 168 boost::ignore_unused(bytesTransferred); 169 170 // Discard received data. We are not interested. 171 BMCWEB_LOG_DEBUG << "recvMessage() data: " << self->res; 172 173 // Send is successful, Lets remove data from queue 174 // check for next request data in queue. 175 self->requestDataQueue.pop(); 176 self->state = ConnState::idle; 177 self->checkQueue(); 178 }); 179 } 180 181 void doClose() 182 { 183 boost::beast::error_code ec; 184 conn.socket().shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec); 185 186 state = ConnState::closed; 187 // not_connected happens sometimes so don't bother reporting it. 188 if (ec && ec != boost::beast::errc::not_connected) 189 { 190 BMCWEB_LOG_ERROR << "shutdown failed: " << ec.message(); 191 return; 192 } 193 BMCWEB_LOG_DEBUG << "Connection closed gracefully"; 194 } 195 196 void checkQueue(const bool newRecord = false) 197 { 198 if (requestDataQueue.empty()) 199 { 200 // TODO: Having issue in keeping connection alive. So lets close if 201 // nothing to be transferred. 202 doClose(); 203 204 BMCWEB_LOG_DEBUG << "requestDataQueue is empty\n"; 205 return; 206 } 207 208 if (retryCount >= maxRetryAttempts) 209 { 210 BMCWEB_LOG_ERROR << "Maximum number of retries is reached."; 211 212 // Clear queue. 213 while (!requestDataQueue.empty()) 214 { 215 requestDataQueue.pop(); 216 } 217 218 BMCWEB_LOG_DEBUG << "Retry policy is set to " << retryPolicyAction; 219 if (retryPolicyAction == "TerminateAfterRetries") 220 { 221 // TODO: delete subscription 222 state = ConnState::terminated; 223 return; 224 } 225 if (retryPolicyAction == "SuspendRetries") 226 { 227 state = ConnState::suspended; 228 return; 229 } 230 // keep retrying, reset count and continue. 231 retryCount = 0; 232 } 233 234 if ((state == ConnState::connectFailed) || 235 (state == ConnState::sendFailed) || 236 (state == ConnState::recvFailed)) 237 { 238 if (newRecord) 239 { 240 // We are already running async wait and retry. 241 // Since record is added to queue, it gets the 242 // turn in FIFO. 243 return; 244 } 245 246 if (runningTimer) 247 { 248 BMCWEB_LOG_DEBUG << "Retry timer is already running."; 249 return; 250 } 251 runningTimer = true; 252 253 retryCount++; 254 255 BMCWEB_LOG_DEBUG << "Attempt retry after " << retryIntervalSecs 256 << " seconds. RetryCount = " << retryCount; 257 timer.expires_after(std::chrono::seconds(retryIntervalSecs)); 258 timer.async_wait( 259 [self = shared_from_this()](const boost::system::error_code&) { 260 self->runningTimer = false; 261 self->connStateCheck(); 262 }); 263 return; 264 } 265 // reset retry count. 266 retryCount = 0; 267 connStateCheck(); 268 269 return; 270 } 271 272 void connStateCheck() 273 { 274 switch (state) 275 { 276 case ConnState::connectInProgress: 277 case ConnState::sendInProgress: 278 case ConnState::suspended: 279 case ConnState::terminated: 280 // do nothing 281 break; 282 case ConnState::initialized: 283 case ConnState::closed: 284 case ConnState::connectFailed: 285 case ConnState::sendFailed: 286 case ConnState::recvFailed: 287 { 288 // After establishing the connection, checkQueue() will 289 // get called and it will attempt to send data. 290 doConnect(); 291 break; 292 } 293 case ConnState::connected: 294 case ConnState::idle: 295 { 296 std::string data = requestDataQueue.front(); 297 sendMessage(data); 298 break; 299 } 300 } 301 } 302 303 public: 304 explicit HttpClient(boost::asio::io_context& ioc, const std::string& id, 305 const std::string& destIP, const std::string& destPort, 306 const std::string& destUri) : 307 conn(ioc), 308 timer(ioc), subId(id), host(destIP), port(destPort), uri(destUri), 309 retryCount(0), maxRetryAttempts(5), 310 retryPolicyAction("TerminateAfterRetries"), runningTimer(false) 311 { 312 boost::asio::ip::tcp::resolver resolver(ioc); 313 endpoint = resolver.resolve(host, port); 314 state = ConnState::initialized; 315 } 316 317 void sendData(const std::string& data) 318 { 319 if (state == ConnState::suspended) 320 { 321 return; 322 } 323 324 if (requestDataQueue.size() <= maxRequestQueueSize) 325 { 326 requestDataQueue.push(data); 327 checkQueue(true); 328 } 329 else 330 { 331 BMCWEB_LOG_ERROR << "Request queue is full. So ignoring data."; 332 } 333 334 return; 335 } 336 337 void setHeaders( 338 const std::vector<std::pair<std::string, std::string>>& httpHeaders) 339 { 340 headers = httpHeaders; 341 } 342 343 void setRetryConfig(const uint32_t retryAttempts, 344 const uint32_t retryTimeoutInterval) 345 { 346 maxRetryAttempts = retryAttempts; 347 retryIntervalSecs = retryTimeoutInterval; 348 } 349 350 void setRetryPolicy(const std::string& retryPolicy) 351 { 352 retryPolicyAction = retryPolicy; 353 } 354 }; 355 356 } // namespace crow 357