1 /* 2 // Copyright (c) 2020 Intel Corporation 3 // 4 // Licensed under the Apache License, Version 2.0 (the "License"); 5 // you may not use this file except in compliance with the License. 6 // You may obtain a copy of the License at 7 // 8 // http://www.apache.org/licenses/LICENSE-2.0 9 // 10 // Unless required by applicable law or agreed to in writing, software 11 // distributed under the License is distributed on an "AS IS" BASIS, 12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 // See the License for the specific language governing permissions and 14 // limitations under the License. 15 */ 16 #pragma once 17 #include <boost/asio/strand.hpp> 18 #include <boost/beast/core.hpp> 19 #include <boost/beast/http.hpp> 20 #include <boost/beast/version.hpp> 21 22 #include <cstdlib> 23 #include <functional> 24 #include <iostream> 25 #include <memory> 26 #include <queue> 27 #include <string> 28 29 namespace crow 30 { 31 32 static constexpr uint8_t maxRequestQueueSize = 50; 33 34 enum class ConnState 35 { 36 initialized, 37 connectInProgress, 38 connectFailed, 39 connected, 40 sendInProgress, 41 sendFailed, 42 recvFailed, 43 idle, 44 suspended, 45 closed, 46 terminated 47 }; 48 49 class HttpClient : public std::enable_shared_from_this<HttpClient> 50 { 51 private: 52 boost::beast::tcp_stream conn; 53 boost::asio::steady_timer timer; 54 boost::beast::flat_buffer buffer; 55 boost::beast::http::request<boost::beast::http::string_body> req; 56 boost::beast::http::response<boost::beast::http::string_body> res; 57 boost::asio::ip::tcp::resolver::results_type endpoint; 58 std::vector<std::pair<std::string, std::string>> headers; 59 std::queue<std::string> requestDataQueue; 60 ConnState state; 61 std::string subId; 62 std::string host; 63 std::string port; 64 std::string uri; 65 uint32_t retryCount; 66 uint32_t maxRetryAttempts; 67 uint32_t retryIntervalSecs; 68 std::string retryPolicyAction; 69 bool runningTimer; 70 71 void doConnect() 72 { 73 if (state == ConnState::connectInProgress) 74 { 75 return; 76 } 77 state = ConnState::connectInProgress; 78 79 BMCWEB_LOG_DEBUG << "Trying to connect to: " << host << ":" << port; 80 // Set a timeout on the operation 81 conn.expires_after(std::chrono::seconds(30)); 82 conn.async_connect(endpoint, [self(shared_from_this())]( 83 const boost::beast::error_code& ec, 84 const boost::asio::ip::tcp::resolver:: 85 results_type::endpoint_type& ep) { 86 if (ec) 87 { 88 BMCWEB_LOG_ERROR << "Connect " << ep 89 << " failed: " << ec.message(); 90 self->state = ConnState::connectFailed; 91 self->checkQueue(); 92 return; 93 } 94 self->state = ConnState::connected; 95 BMCWEB_LOG_DEBUG << "Connected to: " << ep; 96 97 self->checkQueue(); 98 }); 99 } 100 101 void sendMessage(const std::string& data) 102 { 103 if (state == ConnState::sendInProgress) 104 { 105 return; 106 } 107 state = ConnState::sendInProgress; 108 109 BMCWEB_LOG_DEBUG << __FUNCTION__ << "(): " << host << ":" << port; 110 111 req.version(static_cast<int>(11)); // HTTP 1.1 112 req.target(uri); 113 req.method(boost::beast::http::verb::post); 114 115 // Set headers 116 for (const auto& [key, value] : headers) 117 { 118 req.set(key, value); 119 } 120 req.set(boost::beast::http::field::host, host); 121 req.keep_alive(true); 122 123 req.body() = data; 124 req.prepare_payload(); 125 126 // Set a timeout on the operation 127 conn.expires_after(std::chrono::seconds(30)); 128 129 // Send the HTTP request to the remote host 130 boost::beast::http::async_write( 131 conn, req, 132 [self(shared_from_this())](const boost::beast::error_code& ec, 133 const std::size_t& bytesTransferred) { 134 if (ec) 135 { 136 BMCWEB_LOG_ERROR << "sendMessage() failed: " 137 << ec.message(); 138 self->state = ConnState::sendFailed; 139 self->checkQueue(); 140 return; 141 } 142 BMCWEB_LOG_DEBUG << "sendMessage() bytes transferred: " 143 << bytesTransferred; 144 boost::ignore_unused(bytesTransferred); 145 146 self->recvMessage(); 147 }); 148 } 149 150 void recvMessage() 151 { 152 // Receive the HTTP response 153 boost::beast::http::async_read( 154 conn, buffer, res, 155 [self(shared_from_this())](const boost::beast::error_code& ec, 156 const std::size_t& bytesTransferred) { 157 if (ec) 158 { 159 BMCWEB_LOG_ERROR << "recvMessage() failed: " 160 << ec.message(); 161 self->state = ConnState::recvFailed; 162 self->checkQueue(); 163 return; 164 } 165 BMCWEB_LOG_DEBUG << "recvMessage() bytes transferred: " 166 << bytesTransferred; 167 boost::ignore_unused(bytesTransferred); 168 169 // Discard received data. We are not interested. 170 BMCWEB_LOG_DEBUG << "recvMessage() data: " << self->res; 171 172 // Send is successful, Lets remove data from queue 173 // check for next request data in queue. 174 self->requestDataQueue.pop(); 175 self->state = ConnState::idle; 176 self->checkQueue(); 177 }); 178 } 179 180 void doClose() 181 { 182 boost::beast::error_code ec; 183 conn.socket().shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec); 184 185 state = ConnState::closed; 186 // not_connected happens sometimes so don't bother reporting it. 187 if (ec && ec != boost::beast::errc::not_connected) 188 { 189 BMCWEB_LOG_ERROR << "shutdown failed: " << ec.message(); 190 return; 191 } 192 BMCWEB_LOG_DEBUG << "Connection closed gracefully"; 193 } 194 195 void checkQueue(const bool newRecord = false) 196 { 197 if (requestDataQueue.empty()) 198 { 199 // TODO: Having issue in keeping connection alive. So lets close if 200 // nothing to be transferred. 201 doClose(); 202 203 BMCWEB_LOG_DEBUG << "requestDataQueue is empty\n"; 204 return; 205 } 206 207 if (retryCount >= maxRetryAttempts) 208 { 209 BMCWEB_LOG_ERROR << "Maximum number of retries is reached."; 210 211 // Clear queue. 212 while (!requestDataQueue.empty()) 213 { 214 requestDataQueue.pop(); 215 } 216 217 BMCWEB_LOG_DEBUG << "Retry policy is set to " << retryPolicyAction; 218 if (retryPolicyAction == "TerminateAfterRetries") 219 { 220 // TODO: delete subscription 221 state = ConnState::terminated; 222 return; 223 } 224 else if (retryPolicyAction == "SuspendRetries") 225 { 226 state = ConnState::suspended; 227 return; 228 } 229 else 230 { 231 // keep retrying, reset count and continue. 232 retryCount = 0; 233 } 234 } 235 236 if ((state == ConnState::connectFailed) || 237 (state == ConnState::sendFailed) || 238 (state == ConnState::recvFailed)) 239 { 240 if (newRecord) 241 { 242 // We are already running async wait and retry. 243 // Since record is added to queue, it gets the 244 // turn in FIFO. 245 return; 246 } 247 248 if (runningTimer) 249 { 250 BMCWEB_LOG_DEBUG << "Retry timer is already running."; 251 return; 252 } 253 runningTimer = true; 254 255 retryCount++; 256 257 BMCWEB_LOG_DEBUG << "Attempt retry after " << retryIntervalSecs 258 << " seconds. RetryCount = " << retryCount; 259 timer.expires_after(std::chrono::seconds(retryIntervalSecs)); 260 timer.async_wait( 261 [self = shared_from_this()](const boost::system::error_code&) { 262 self->runningTimer = false; 263 self->connStateCheck(); 264 }); 265 return; 266 } 267 else 268 { 269 // reset retry count. 270 retryCount = 0; 271 } 272 connStateCheck(); 273 274 return; 275 } 276 277 void connStateCheck() 278 { 279 switch (state) 280 { 281 case ConnState::connectInProgress: 282 case ConnState::sendInProgress: 283 case ConnState::suspended: 284 case ConnState::terminated: 285 // do nothing 286 break; 287 case ConnState::initialized: 288 case ConnState::closed: 289 case ConnState::connectFailed: 290 case ConnState::sendFailed: 291 case ConnState::recvFailed: 292 { 293 // After establishing the connection, checkQueue() will 294 // get called and it will attempt to send data. 295 doConnect(); 296 break; 297 } 298 case ConnState::connected: 299 case ConnState::idle: 300 { 301 std::string data = requestDataQueue.front(); 302 sendMessage(data); 303 break; 304 } 305 } 306 } 307 308 public: 309 explicit HttpClient(boost::asio::io_context& ioc, const std::string& id, 310 const std::string& destIP, const std::string& destPort, 311 const std::string& destUri) : 312 conn(ioc), 313 timer(ioc), subId(id), host(destIP), port(destPort), uri(destUri), 314 retryCount(0), maxRetryAttempts(5), 315 retryPolicyAction("TerminateAfterRetries"), runningTimer(false) 316 { 317 boost::asio::ip::tcp::resolver resolver(ioc); 318 endpoint = resolver.resolve(host, port); 319 state = ConnState::initialized; 320 } 321 322 void sendData(const std::string& data) 323 { 324 if (state == ConnState::suspended) 325 { 326 return; 327 } 328 329 if (requestDataQueue.size() <= maxRequestQueueSize) 330 { 331 requestDataQueue.push(data); 332 checkQueue(true); 333 } 334 else 335 { 336 BMCWEB_LOG_ERROR << "Request queue is full. So ignoring data."; 337 } 338 339 return; 340 } 341 342 void setHeaders( 343 const std::vector<std::pair<std::string, std::string>>& httpHeaders) 344 { 345 headers = httpHeaders; 346 } 347 348 void setRetryConfig(const uint32_t retryAttempts, 349 const uint32_t retryTimeoutInterval) 350 { 351 maxRetryAttempts = retryAttempts; 352 retryIntervalSecs = retryTimeoutInterval; 353 } 354 355 void setRetryPolicy(const std::string& retryPolicy) 356 { 357 retryPolicyAction = retryPolicy; 358 } 359 }; 360 361 } // namespace crow 362