1 /* 2 // Copyright (c) 2020 Intel Corporation 3 // 4 // Licensed under the Apache License, Version 2.0 (the "License"); 5 // you may not use this file except in compliance with the License. 6 // You may obtain a copy of the License at 7 // 8 // http://www.apache.org/licenses/LICENSE-2.0 9 // 10 // Unless required by applicable law or agreed to in writing, software 11 // distributed under the License is distributed on an "AS IS" BASIS, 12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 // See the License for the specific language governing permissions and 14 // limitations under the License. 15 */ 16 #pragma once 17 #include <boost/asio/strand.hpp> 18 #include <boost/beast/core.hpp> 19 #include <boost/beast/http.hpp> 20 #include <boost/beast/version.hpp> 21 22 #include <cstdlib> 23 #include <functional> 24 #include <iostream> 25 #include <memory> 26 #include <queue> 27 #include <string> 28 29 namespace crow 30 { 31 32 static constexpr uint8_t maxRequestQueueSize = 50; 33 34 enum class ConnState 35 { 36 initialized, 37 connectInProgress, 38 connectFailed, 39 connected, 40 sendInProgress, 41 sendFailed, 42 recvFailed, 43 idle, 44 suspended, 45 closed 46 }; 47 48 class HttpClient : public std::enable_shared_from_this<HttpClient> 49 { 50 private: 51 boost::beast::tcp_stream conn; 52 boost::beast::flat_buffer buffer; 53 boost::beast::http::request<boost::beast::http::string_body> req; 54 boost::beast::http::response<boost::beast::http::string_body> res; 55 boost::asio::ip::tcp::resolver::results_type endpoint; 56 std::vector<std::pair<std::string, std::string>> headers; 57 std::queue<std::string> requestDataQueue; 58 ConnState state; 59 std::string host; 60 std::string port; 61 std::string uri; 62 int retryCount; 63 int maxRetryAttempts; 64 65 void doConnect() 66 { 67 if (state == ConnState::connectInProgress) 68 { 69 return; 70 } 71 state = ConnState::connectInProgress; 72 73 BMCWEB_LOG_DEBUG << "Trying to connect to: " << host << ":" << port; 74 // Set a timeout on the operation 75 conn.expires_after(std::chrono::seconds(30)); 76 conn.async_connect(endpoint, [self(shared_from_this())]( 77 const boost::beast::error_code& ec, 78 const boost::asio::ip::tcp::resolver:: 79 results_type::endpoint_type& ep) { 80 if (ec) 81 { 82 BMCWEB_LOG_ERROR << "Connect " << ep 83 << " failed: " << ec.message(); 84 self->state = ConnState::connectFailed; 85 self->checkQueue(); 86 return; 87 } 88 self->state = ConnState::connected; 89 BMCWEB_LOG_DEBUG << "Connected to: " << ep; 90 91 self->checkQueue(); 92 }); 93 } 94 95 void sendMessage(const std::string& data) 96 { 97 if (state == ConnState::sendInProgress) 98 { 99 return; 100 } 101 state = ConnState::sendInProgress; 102 103 BMCWEB_LOG_DEBUG << __FUNCTION__ << "(): " << host << ":" << port; 104 105 req.version(static_cast<int>(11)); // HTTP 1.1 106 req.target(uri); 107 req.method(boost::beast::http::verb::post); 108 109 // Set headers 110 for (const auto& [key, value] : headers) 111 { 112 req.set(key, value); 113 } 114 req.set(boost::beast::http::field::host, host); 115 req.keep_alive(true); 116 117 req.body() = data; 118 req.prepare_payload(); 119 120 // Set a timeout on the operation 121 conn.expires_after(std::chrono::seconds(30)); 122 123 // Send the HTTP request to the remote host 124 boost::beast::http::async_write( 125 conn, req, 126 [self(shared_from_this())](const boost::beast::error_code& ec, 127 const std::size_t& bytesTransferred) { 128 if (ec) 129 { 130 BMCWEB_LOG_ERROR << "sendMessage() failed: " 131 << ec.message(); 132 self->state = ConnState::sendFailed; 133 self->checkQueue(); 134 return; 135 } 136 BMCWEB_LOG_DEBUG << "sendMessage() bytes transferred: " 137 << bytesTransferred; 138 boost::ignore_unused(bytesTransferred); 139 140 self->recvMessage(); 141 }); 142 } 143 144 void recvMessage() 145 { 146 // Receive the HTTP response 147 boost::beast::http::async_read( 148 conn, buffer, res, 149 [self(shared_from_this())](const boost::beast::error_code& ec, 150 const std::size_t& bytesTransferred) { 151 if (ec) 152 { 153 BMCWEB_LOG_ERROR << "recvMessage() failed: " 154 << ec.message(); 155 self->state = ConnState::recvFailed; 156 self->checkQueue(); 157 return; 158 } 159 BMCWEB_LOG_DEBUG << "recvMessage() bytes transferred: " 160 << bytesTransferred; 161 boost::ignore_unused(bytesTransferred); 162 163 // Discard received data. We are not interested. 164 BMCWEB_LOG_DEBUG << "recvMessage() data: " << self->res; 165 166 // Send is successful, Lets remove data from queue 167 // check for next request data in queue. 168 self->requestDataQueue.pop(); 169 self->state = ConnState::idle; 170 self->checkQueue(); 171 }); 172 } 173 174 void doClose() 175 { 176 boost::beast::error_code ec; 177 conn.socket().shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec); 178 179 state = ConnState::closed; 180 // not_connected happens sometimes so don't bother reporting it. 181 if (ec && ec != boost::beast::errc::not_connected) 182 { 183 BMCWEB_LOG_ERROR << "shutdown failed: " << ec.message(); 184 return; 185 } 186 BMCWEB_LOG_DEBUG << "Connection closed gracefully"; 187 } 188 189 void checkQueue(const bool newRecord = false) 190 { 191 if (requestDataQueue.empty()) 192 { 193 // TODO: Having issue in keeping connection alive. So lets close if 194 // nothing to be trasferred. 195 doClose(); 196 197 BMCWEB_LOG_DEBUG << "requestDataQueue is empty\n"; 198 return; 199 } 200 201 if (retryCount >= maxRetryAttempts) 202 { 203 BMCWEB_LOG_ERROR << "Maximum number of retries is reached."; 204 205 // Clear queue. 206 while (!requestDataQueue.empty()) 207 { 208 requestDataQueue.pop(); 209 } 210 211 // TODO: Take 'DeliveryRetryPolicy' action. 212 // For now, doing 'SuspendRetries' action. 213 state = ConnState::suspended; 214 return; 215 } 216 217 if ((state == ConnState::connectFailed) || 218 (state == ConnState::sendFailed) || 219 (state == ConnState::recvFailed)) 220 { 221 if (newRecord) 222 { 223 // We are already running async wait and retry. 224 // Since record is added to queue, it gets the 225 // turn in FIFO. 226 return; 227 } 228 229 retryCount++; 230 // TODO: Perform async wait for retryTimeoutInterval before proceed. 231 } 232 else 233 { 234 // reset retry count. 235 retryCount = 0; 236 } 237 238 switch (state) 239 { 240 case ConnState::connectInProgress: 241 case ConnState::sendInProgress: 242 case ConnState::suspended: 243 // do nothing 244 break; 245 case ConnState::initialized: 246 case ConnState::closed: 247 case ConnState::connectFailed: 248 case ConnState::sendFailed: 249 case ConnState::recvFailed: 250 { 251 // After establishing the connection, checkQueue() will 252 // get called and it will attempt to send data. 253 doConnect(); 254 break; 255 } 256 case ConnState::connected: 257 case ConnState::idle: 258 { 259 std::string data = requestDataQueue.front(); 260 sendMessage(data); 261 break; 262 } 263 default: 264 break; 265 } 266 267 return; 268 } 269 270 public: 271 explicit HttpClient(boost::asio::io_context& ioc, const std::string& destIP, 272 const std::string& destPort, 273 const std::string& destUri) : 274 conn(ioc), 275 host(destIP), port(destPort), uri(destUri) 276 { 277 boost::asio::ip::tcp::resolver resolver(ioc); 278 endpoint = resolver.resolve(host, port); 279 state = ConnState::initialized; 280 retryCount = 0; 281 maxRetryAttempts = 5; 282 } 283 284 void sendData(const std::string& data) 285 { 286 if (state == ConnState::suspended) 287 { 288 return; 289 } 290 291 if (requestDataQueue.size() <= maxRequestQueueSize) 292 { 293 requestDataQueue.push(data); 294 checkQueue(true); 295 } 296 else 297 { 298 BMCWEB_LOG_ERROR << "Request queue is full. So ignoring data."; 299 } 300 301 return; 302 } 303 304 void setHeaders( 305 const std::vector<std::pair<std::string, std::string>>& httpHeaders) 306 { 307 headers = httpHeaders; 308 } 309 }; 310 311 } // namespace crow 312