xref: /openbmc/bmcweb/redfish-core/src/subscription.cpp (revision 2ca561945f9c518e4916cd23c194f89816fdbeee)
1 // SPDX-License-Identifier: Apache-2.0
2 // SPDX-FileCopyrightText: Copyright OpenBMC Authors
3 // SPDX-FileCopyrightText: Copyright 2020 Intel Corporation
4 #include "subscription.hpp"
5 
6 #include "dbus_singleton.hpp"
7 #include "event_log.hpp"
8 #include "event_logs_object_type.hpp"
9 #include "event_matches_filter.hpp"
10 #include "event_service_store.hpp"
11 #include "filter_expr_executor.hpp"
12 #include "heartbeat_messages.hpp"
13 #include "http_client.hpp"
14 #include "http_response.hpp"
15 #include "logging.hpp"
16 #include "server_sent_event.hpp"
17 #include "ssl_key_handler.hpp"
18 #include "telemetry_readings.hpp"
19 #include "utils/time_utils.hpp"
20 
21 #include <boost/asio/error.hpp>
22 #include <boost/asio/io_context.hpp>
23 #include <boost/asio/steady_timer.hpp>
24 #include <boost/beast/http/field.hpp>
25 #include <boost/beast/http/verb.hpp>
26 #include <boost/system/errc.hpp>
27 #include <boost/url/format.hpp>
28 #include <boost/url/url_view_base.hpp>
29 #include <nlohmann/json.hpp>
30 
31 #include <algorithm>
32 #include <chrono>
33 #include <cstdint>
34 #include <format>
35 #include <functional>
36 #include <memory>
37 #include <span>
38 #include <string>
39 #include <string_view>
40 #include <utility>
41 #include <vector>
42 
43 namespace redfish
44 {
45 
Subscription(std::shared_ptr<persistent_data::UserSubscription> userSubIn,const boost::urls::url_view_base & url,boost::asio::io_context & ioc)46 Subscription::Subscription(
47     std::shared_ptr<persistent_data::UserSubscription> userSubIn,
48     const boost::urls::url_view_base& url, boost::asio::io_context& ioc) :
49     userSub{std::move(userSubIn)},
50     policy(std::make_shared<crow::ConnectionPolicy>()), hbTimer(ioc)
51 {
52     userSub->destinationUrl = url;
53     client.emplace(ioc, policy);
54     // Subscription constructor
55     policy->invalidResp = retryRespHandler;
56 }
57 
Subscription(crow::sse_socket::Connection & connIn)58 Subscription::Subscription(crow::sse_socket::Connection& connIn) :
59     userSub{std::make_shared<persistent_data::UserSubscription>()},
60     sseConn(&connIn), hbTimer(crow::connections::systemBus->get_io_context())
61 {}
62 
63 // callback for subscription sendData
resHandler(const std::shared_ptr<Subscription> &,const crow::Response & res)64 void Subscription::resHandler(const std::shared_ptr<Subscription>& /*self*/,
65                               const crow::Response& res)
66 {
67     BMCWEB_LOG_DEBUG("Response handled with return code: {}", res.resultInt());
68 
69     if (!client)
70     {
71         BMCWEB_LOG_ERROR(
72             "Http client wasn't filled but http client callback was called.");
73         return;
74     }
75 
76     if (userSub->retryPolicy != "TerminateAfterRetries")
77     {
78         return;
79     }
80     if (client->isTerminated())
81     {
82         hbTimer.cancel();
83         if (deleter)
84         {
85             BMCWEB_LOG_INFO("Subscription {} is deleted after MaxRetryAttempts",
86                             userSub->id);
87             deleter();
88         }
89     }
90 }
91 
sendHeartbeatEvent()92 void Subscription::sendHeartbeatEvent()
93 {
94     // send the heartbeat message
95     nlohmann::json eventMessage = messages::redfishServiceFunctional();
96     eventMessage["EventTimestamp"] = time_utils::getDateTimeOffsetNow().first;
97     eventMessage["OriginOfCondition"] =
98         std::format("/redfish/v1/EventService/Subscriptions/{}", userSub->id);
99     eventMessage["MemberId"] = "0";
100 
101     nlohmann::json::array_t eventRecord;
102     eventRecord.emplace_back(std::move(eventMessage));
103 
104     nlohmann::json msgJson;
105     msgJson["@odata.type"] = "#Event.v1_4_0.Event";
106     msgJson["Name"] = "Heartbeat";
107     msgJson["Events"] = std::move(eventRecord);
108 
109     std::string strMsg =
110         msgJson.dump(2, ' ', true, nlohmann::json::error_handler_t::replace);
111 
112     // Note, eventId here is always zero, because this is a a per subscription
113     // event and doesn't have an "ID"
114     uint64_t eventId = 0;
115     sendEventToSubscriber(eventId, std::move(strMsg));
116 }
117 
scheduleNextHeartbeatEvent()118 void Subscription::scheduleNextHeartbeatEvent()
119 {
120     hbTimer.expires_after(std::chrono::minutes(userSub->hbIntervalMinutes));
121     hbTimer.async_wait(
122         std::bind_front(&Subscription::onHbTimeout, this, weak_from_this()));
123 }
124 
heartbeatParametersChanged()125 void Subscription::heartbeatParametersChanged()
126 {
127     hbTimer.cancel();
128 
129     if (userSub->sendHeartbeat)
130     {
131         scheduleNextHeartbeatEvent();
132     }
133 }
134 
onHbTimeout(const std::weak_ptr<Subscription> & weakSelf,const boost::system::error_code & ec)135 void Subscription::onHbTimeout(const std::weak_ptr<Subscription>& weakSelf,
136                                const boost::system::error_code& ec)
137 {
138     if (ec == boost::asio::error::operation_aborted)
139     {
140         BMCWEB_LOG_DEBUG("heartbeat timer async_wait is aborted");
141         return;
142     }
143     if (ec == boost::system::errc::operation_canceled)
144     {
145         BMCWEB_LOG_DEBUG("heartbeat timer async_wait canceled");
146         return;
147     }
148     if (ec)
149     {
150         BMCWEB_LOG_CRITICAL("heartbeat timer async_wait failed: {}", ec);
151         return;
152     }
153 
154     std::shared_ptr<Subscription> self = weakSelf.lock();
155     if (!self)
156     {
157         BMCWEB_LOG_CRITICAL("onHbTimeout failed on Subscription");
158         return;
159     }
160 
161     // Timer expired.
162     sendHeartbeatEvent();
163 
164     // reschedule heartbeat timer
165     scheduleNextHeartbeatEvent();
166 }
167 
sendEventToSubscriber(uint64_t eventId,std::string && msg)168 bool Subscription::sendEventToSubscriber(uint64_t eventId, std::string&& msg)
169 {
170     persistent_data::EventServiceConfig eventServiceConfig =
171         persistent_data::EventServiceStore::getInstance()
172             .getEventServiceConfig();
173     if (!eventServiceConfig.enabled)
174     {
175         return false;
176     }
177 
178     if (client)
179     {
180         boost::beast::http::fields httpHeadersCopy(userSub->httpHeaders);
181         httpHeadersCopy.set(boost::beast::http::field::content_type,
182                             "application/json");
183         client->sendDataWithCallback(
184             std::move(msg), userSub->destinationUrl,
185             static_cast<ensuressl::VerifyCertificate>(
186                 userSub->verifyCertificate),
187             httpHeadersCopy, boost::beast::http::verb::post,
188             std::bind_front(&Subscription::resHandler, this,
189                             shared_from_this()));
190         return true;
191     }
192 
193     if (sseConn != nullptr)
194     {
195         sseConn->sendSseEvent(std::to_string(eventId), msg);
196     }
197     return true;
198 }
199 
filterAndSendEventLogs(uint64_t eventId,const std::vector<EventLogObjectsType> & eventRecords)200 void Subscription::filterAndSendEventLogs(
201     uint64_t eventId, const std::vector<EventLogObjectsType>& eventRecords)
202 {
203     nlohmann::json::array_t logEntryArray;
204     for (const EventLogObjectsType& logEntry : eventRecords)
205     {
206         BMCWEB_LOG_DEBUG("Processing logEntry: {}, {} '{}'", logEntry.id,
207                          logEntry.timestamp, logEntry.messageId);
208         std::vector<std::string_view> messageArgsView(
209             logEntry.messageArgs.begin(), logEntry.messageArgs.end());
210 
211         nlohmann::json::object_t bmcLogEntry;
212         if (event_log::formatEventLogEntry(
213                 eventId, logEntry.id, logEntry.messageId, messageArgsView,
214                 logEntry.timestamp, userSub->customText, bmcLogEntry) != 0)
215         {
216             BMCWEB_LOG_WARNING("Read eventLog entry failed");
217             continue;
218         }
219 
220         if (!eventMatchesFilter(*userSub, bmcLogEntry, ""))
221         {
222             BMCWEB_LOG_DEBUG("Event {} did not match the filter",
223                              nlohmann::json(bmcLogEntry).dump());
224             continue;
225         }
226 
227         if (filter)
228         {
229             if (!memberMatches(bmcLogEntry, *filter))
230             {
231                 BMCWEB_LOG_DEBUG("Filter didn't match");
232                 continue;
233             }
234         }
235 
236         logEntryArray.emplace_back(std::move(bmcLogEntry));
237         eventId++;
238     }
239 
240     if (logEntryArray.empty())
241     {
242         BMCWEB_LOG_DEBUG("No log entries available to be transferred.");
243         return;
244     }
245 
246     nlohmann::json msg;
247     msg["@odata.type"] = "#Event.v1_4_0.Event";
248     msg["Id"] = std::to_string(eventId);
249     msg["Name"] = "Event Log";
250     msg["Events"] = std::move(logEntryArray);
251     std::string strMsg =
252         msg.dump(2, ' ', true, nlohmann::json::error_handler_t::replace);
253     sendEventToSubscriber(eventId, std::move(strMsg));
254 }
255 
filterAndSendReports(uint64_t eventId,const std::string & reportId,const telemetry::TimestampReadings & var)256 void Subscription::filterAndSendReports(uint64_t eventId,
257                                         const std::string& reportId,
258                                         const telemetry::TimestampReadings& var)
259 {
260     boost::urls::url mrdUri = boost::urls::format(
261         "/redfish/v1/TelemetryService/MetricReportDefinitions/{}", reportId);
262 
263     // Empty list means no filter. Send everything.
264     if (!userSub->metricReportDefinitions.empty())
265     {
266         if (std::ranges::find(userSub->metricReportDefinitions,
267                               mrdUri.buffer()) ==
268             userSub->metricReportDefinitions.end())
269         {
270             return;
271         }
272     }
273 
274     nlohmann::json msg;
275     if (!telemetry::fillReport(msg, reportId, var))
276     {
277         BMCWEB_LOG_ERROR("Failed to fill the MetricReport for DBus "
278                          "Report with id {}",
279                          reportId);
280         return;
281     }
282 
283     // Context is set by user during Event subscription and it must be
284     // set for MetricReport response.
285     if (!userSub->customText.empty())
286     {
287         msg["Context"] = userSub->customText;
288     }
289 
290     std::string strMsg =
291         msg.dump(2, ' ', true, nlohmann::json::error_handler_t::replace);
292     sendEventToSubscriber(eventId, std::move(strMsg));
293 }
294 
updateRetryConfig(uint32_t retryAttempts,uint32_t retryTimeoutInterval)295 void Subscription::updateRetryConfig(uint32_t retryAttempts,
296                                      uint32_t retryTimeoutInterval)
297 {
298     if (policy == nullptr)
299     {
300         BMCWEB_LOG_DEBUG("Retry policy was nullptr, ignoring set");
301         return;
302     }
303     policy->maxRetryAttempts = retryAttempts;
304     policy->retryIntervalSecs = std::chrono::seconds(retryTimeoutInterval);
305 }
306 
matchSseId(const crow::sse_socket::Connection & thisConn)307 bool Subscription::matchSseId(const crow::sse_socket::Connection& thisConn)
308 {
309     return &thisConn == sseConn;
310 }
311 
312 // Check used to indicate what response codes are valid as part of our retry
313 // policy.  2XX is considered acceptable
retryRespHandler(unsigned int respCode)314 boost::system::error_code Subscription::retryRespHandler(unsigned int respCode)
315 {
316     BMCWEB_LOG_DEBUG("Checking response code validity for SubscriptionEvent");
317     if ((respCode < 200) || (respCode >= 300))
318     {
319         return boost::system::errc::make_error_code(
320             boost::system::errc::result_out_of_range);
321     }
322 
323     // Return 0 if the response code is valid
324     return boost::system::errc::make_error_code(boost::system::errc::success);
325 }
326 
327 } // namespace redfish
328