1 /* 2 // Copyright (c) 2020 Intel Corporation 3 // 4 // Licensed under the Apache License, Version 2.0 (the "License"); 5 // you may not use this file except in compliance with the License. 6 // You may obtain a copy of the License at 7 // 8 // http://www.apache.org/licenses/LICENSE-2.0 9 // 10 // Unless required by applicable law or agreed to in writing, software 11 // distributed under the License is distributed on an "AS IS" BASIS, 12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 // See the License for the specific language governing permissions and 14 // limitations under the License. 15 */ 16 #pragma once 17 #include <boost/asio/ip/address.hpp> 18 #include <boost/asio/ip/basic_endpoint.hpp> 19 #include <boost/asio/steady_timer.hpp> 20 #include <boost/beast/core/flat_buffer.hpp> 21 #include <boost/beast/core/tcp_stream.hpp> 22 #include <boost/beast/http/message.hpp> 23 #include <boost/beast/version.hpp> 24 #include <include/async_resolve.hpp> 25 26 #include <cstdlib> 27 #include <functional> 28 #include <iostream> 29 #include <memory> 30 #include <queue> 31 #include <string> 32 33 namespace crow 34 { 35 36 static constexpr uint8_t maxRequestQueueSize = 50; 37 38 enum class ConnState 39 { 40 initialized, 41 resolveInProgress, 42 resolveFailed, 43 connectInProgress, 44 connectFailed, 45 connected, 46 sendInProgress, 47 sendFailed, 48 recvFailed, 49 idle, 50 suspended, 51 closed, 52 terminated 53 }; 54 55 class HttpClient : public std::enable_shared_from_this<HttpClient> 56 { 57 private: 58 crow::async_resolve::Resolver resolver; 59 boost::beast::tcp_stream conn; 60 boost::asio::steady_timer timer; 61 boost::beast::flat_buffer buffer; 62 boost::beast::http::request<boost::beast::http::string_body> req; 63 boost::beast::http::response<boost::beast::http::string_body> res; 64 std::vector<std::pair<std::string, std::string>> headers; 65 std::queue<std::string> requestDataQueue; 66 ConnState state; 67 std::string subId; 68 std::string host; 69 std::string port; 70 std::string uri; 71 uint32_t retryCount; 72 uint32_t maxRetryAttempts; 73 uint32_t retryIntervalSecs; 74 std::string retryPolicyAction; 75 bool runningTimer; 76 77 void doResolve() 78 { 79 if (state == ConnState::resolveInProgress) 80 { 81 return; 82 } 83 state = ConnState::resolveInProgress; 84 85 BMCWEB_LOG_DEBUG << "Trying to resolve: " << host << ":" << port; 86 87 auto respHandler = 88 [self(shared_from_this())]( 89 const boost::beast::error_code ec, 90 const std::vector<boost::asio::ip::tcp::endpoint>& 91 endpointList) { 92 if (ec) 93 { 94 BMCWEB_LOG_ERROR << "Resolve failed: " << ec.message(); 95 self->state = ConnState::resolveFailed; 96 self->checkQueue(); 97 return; 98 } 99 BMCWEB_LOG_DEBUG << "Resolved"; 100 self->doConnect(endpointList); 101 }; 102 resolver.asyncResolve(host, port, std::move(respHandler)); 103 } 104 105 void doConnect( 106 const std::vector<boost::asio::ip::tcp::endpoint>& endpointList) 107 { 108 if (state == ConnState::connectInProgress) 109 { 110 return; 111 } 112 state = ConnState::connectInProgress; 113 114 BMCWEB_LOG_DEBUG << "Trying to connect to: " << host << ":" << port; 115 116 conn.expires_after(std::chrono::seconds(30)); 117 conn.async_connect( 118 endpointList, [self(shared_from_this())]( 119 const boost::beast::error_code ec, 120 const boost::asio::ip::tcp::endpoint& endpoint) { 121 if (ec) 122 { 123 BMCWEB_LOG_ERROR << "Connect " << endpoint 124 << " failed: " << ec.message(); 125 self->state = ConnState::connectFailed; 126 self->checkQueue(); 127 return; 128 } 129 self->state = ConnState::connected; 130 BMCWEB_LOG_DEBUG << "Connected to: " << endpoint; 131 132 self->checkQueue(); 133 }); 134 } 135 136 void sendMessage(const std::string& data) 137 { 138 if (state == ConnState::sendInProgress) 139 { 140 return; 141 } 142 state = ConnState::sendInProgress; 143 144 BMCWEB_LOG_DEBUG << __FUNCTION__ << "(): " << host << ":" << port; 145 146 req.version(static_cast<int>(11)); // HTTP 1.1 147 req.target(uri); 148 req.method(boost::beast::http::verb::post); 149 150 // Set headers 151 for (const auto& [key, value] : headers) 152 { 153 req.set(key, value); 154 } 155 req.set(boost::beast::http::field::host, host); 156 req.keep_alive(true); 157 158 req.body() = data; 159 req.prepare_payload(); 160 161 // Set a timeout on the operation 162 conn.expires_after(std::chrono::seconds(30)); 163 164 // Send the HTTP request to the remote host 165 boost::beast::http::async_write( 166 conn, req, 167 [self(shared_from_this())](const boost::beast::error_code& ec, 168 const std::size_t& bytesTransferred) { 169 if (ec) 170 { 171 BMCWEB_LOG_ERROR << "sendMessage() failed: " 172 << ec.message(); 173 self->state = ConnState::sendFailed; 174 self->checkQueue(); 175 return; 176 } 177 BMCWEB_LOG_DEBUG << "sendMessage() bytes transferred: " 178 << bytesTransferred; 179 boost::ignore_unused(bytesTransferred); 180 181 self->recvMessage(); 182 }); 183 } 184 185 void recvMessage() 186 { 187 // Receive the HTTP response 188 boost::beast::http::async_read( 189 conn, buffer, res, 190 [self(shared_from_this())](const boost::beast::error_code& ec, 191 const std::size_t& bytesTransferred) { 192 if (ec) 193 { 194 BMCWEB_LOG_ERROR << "recvMessage() failed: " 195 << ec.message(); 196 self->state = ConnState::recvFailed; 197 self->checkQueue(); 198 return; 199 } 200 BMCWEB_LOG_DEBUG << "recvMessage() bytes transferred: " 201 << bytesTransferred; 202 boost::ignore_unused(bytesTransferred); 203 204 // Discard received data. We are not interested. 205 BMCWEB_LOG_DEBUG << "recvMessage() data: " << self->res; 206 207 // Send is successful, Lets remove data from queue 208 // check for next request data in queue. 209 self->requestDataQueue.pop(); 210 self->state = ConnState::idle; 211 self->checkQueue(); 212 }); 213 } 214 215 void doClose() 216 { 217 boost::beast::error_code ec; 218 conn.socket().shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec); 219 220 state = ConnState::closed; 221 // not_connected happens sometimes so don't bother reporting it. 222 if (ec && ec != boost::beast::errc::not_connected) 223 { 224 BMCWEB_LOG_ERROR << "shutdown failed: " << ec.message(); 225 return; 226 } 227 BMCWEB_LOG_DEBUG << "Connection closed gracefully"; 228 } 229 230 void checkQueue(const bool newRecord = false) 231 { 232 if (requestDataQueue.empty()) 233 { 234 // TODO: Having issue in keeping connection alive. So lets close if 235 // nothing to be transferred. 236 doClose(); 237 238 BMCWEB_LOG_DEBUG << "requestDataQueue is empty\n"; 239 return; 240 } 241 242 if (retryCount >= maxRetryAttempts) 243 { 244 BMCWEB_LOG_ERROR << "Maximum number of retries is reached."; 245 246 // Clear queue. 247 while (!requestDataQueue.empty()) 248 { 249 requestDataQueue.pop(); 250 } 251 252 BMCWEB_LOG_DEBUG << "Retry policy is set to " << retryPolicyAction; 253 if (retryPolicyAction == "TerminateAfterRetries") 254 { 255 // TODO: delete subscription 256 state = ConnState::terminated; 257 return; 258 } 259 if (retryPolicyAction == "SuspendRetries") 260 { 261 state = ConnState::suspended; 262 return; 263 } 264 // keep retrying, reset count and continue. 265 retryCount = 0; 266 } 267 268 if ((state == ConnState::connectFailed) || 269 (state == ConnState::sendFailed) || 270 (state == ConnState::recvFailed)) 271 { 272 if (newRecord) 273 { 274 // We are already running async wait and retry. 275 // Since record is added to queue, it gets the 276 // turn in FIFO. 277 return; 278 } 279 280 if (runningTimer) 281 { 282 BMCWEB_LOG_DEBUG << "Retry timer is already running."; 283 return; 284 } 285 runningTimer = true; 286 287 retryCount++; 288 289 BMCWEB_LOG_DEBUG << "Attempt retry after " << retryIntervalSecs 290 << " seconds. RetryCount = " << retryCount; 291 timer.expires_after(std::chrono::seconds(retryIntervalSecs)); 292 timer.async_wait( 293 [self = shared_from_this()](const boost::system::error_code&) { 294 self->runningTimer = false; 295 self->connStateCheck(); 296 }); 297 return; 298 } 299 // reset retry count. 300 retryCount = 0; 301 connStateCheck(); 302 303 return; 304 } 305 306 void connStateCheck() 307 { 308 switch (state) 309 { 310 case ConnState::resolveInProgress: 311 case ConnState::connectInProgress: 312 case ConnState::sendInProgress: 313 case ConnState::suspended: 314 case ConnState::terminated: 315 // do nothing 316 break; 317 case ConnState::initialized: 318 case ConnState::closed: 319 case ConnState::connectFailed: 320 case ConnState::sendFailed: 321 case ConnState::recvFailed: 322 case ConnState::resolveFailed: 323 { 324 doResolve(); 325 break; 326 } 327 case ConnState::connected: 328 case ConnState::idle: 329 { 330 std::string data = requestDataQueue.front(); 331 sendMessage(data); 332 break; 333 } 334 } 335 } 336 337 public: 338 explicit HttpClient(boost::asio::io_context& ioc, const std::string& id, 339 const std::string& destIP, const std::string& destPort, 340 const std::string& destUri) : 341 conn(ioc), 342 timer(ioc), subId(id), host(destIP), port(destPort), uri(destUri), 343 retryCount(0), maxRetryAttempts(5), retryIntervalSecs(0), 344 retryPolicyAction("TerminateAfterRetries"), runningTimer(false) 345 { 346 state = ConnState::initialized; 347 } 348 349 void sendData(const std::string& data) 350 { 351 if (state == ConnState::suspended) 352 { 353 return; 354 } 355 356 if (requestDataQueue.size() <= maxRequestQueueSize) 357 { 358 requestDataQueue.push(data); 359 checkQueue(true); 360 } 361 else 362 { 363 BMCWEB_LOG_ERROR << "Request queue is full. So ignoring data."; 364 } 365 366 return; 367 } 368 369 void setHeaders( 370 const std::vector<std::pair<std::string, std::string>>& httpHeaders) 371 { 372 headers = httpHeaders; 373 } 374 375 void setRetryConfig(const uint32_t retryAttempts, 376 const uint32_t retryTimeoutInterval) 377 { 378 maxRetryAttempts = retryAttempts; 379 retryIntervalSecs = retryTimeoutInterval; 380 } 381 382 void setRetryPolicy(const std::string& retryPolicy) 383 { 384 retryPolicyAction = retryPolicy; 385 } 386 }; 387 388 } // namespace crow 389