xref: /openbmc/bmcweb/redfish-core/src/subscription.cpp (revision 99ff0ddcc083c149d3a64190e6c1b0caab1d77f2)
1 // SPDX-License-Identifier: Apache-2.0
2 // SPDX-FileCopyrightText: Copyright OpenBMC Authors
3 // SPDX-FileCopyrightText: Copyright 2020 Intel Corporation
4 #include "subscription.hpp"
5 
6 #include "dbus_singleton.hpp"
7 #include "event_log.hpp"
8 #include "event_logs_object_type.hpp"
9 #include "event_matches_filter.hpp"
10 #include "event_service_store.hpp"
11 #include "filter_expr_executor.hpp"
12 #include "heartbeat_messages.hpp"
13 #include "http_client.hpp"
14 #include "http_response.hpp"
15 #include "logging.hpp"
16 #include "metric_report.hpp"
17 #include "server_sent_event.hpp"
18 #include "ssl_key_handler.hpp"
19 #include "utils/time_utils.hpp"
20 
21 #include <boost/asio/error.hpp>
22 #include <boost/asio/io_context.hpp>
23 #include <boost/asio/steady_timer.hpp>
24 #include <boost/beast/http/field.hpp>
25 #include <boost/beast/http/fields.hpp>
26 #include <boost/beast/http/verb.hpp>
27 #include <boost/system/errc.hpp>
28 #include <boost/url/format.hpp>
29 #include <boost/url/url_view_base.hpp>
30 #include <nlohmann/json.hpp>
31 
32 #include <algorithm>
33 #include <chrono>
34 #include <cstdint>
35 #include <cstdlib>
36 #include <ctime>
37 #include <format>
38 #include <functional>
39 #include <memory>
40 #include <span>
41 #include <string>
42 #include <string_view>
43 #include <utility>
44 #include <vector>
45 
46 namespace redfish
47 {
48 
Subscription(std::shared_ptr<persistent_data::UserSubscription> userSubIn,const boost::urls::url_view_base & url,boost::asio::io_context & ioc)49 Subscription::Subscription(
50     std::shared_ptr<persistent_data::UserSubscription> userSubIn,
51     const boost::urls::url_view_base& url, boost::asio::io_context& ioc) :
52     userSub{std::move(userSubIn)},
53     policy(std::make_shared<crow::ConnectionPolicy>()), hbTimer(ioc)
54 {
55     userSub->destinationUrl = url;
56     client.emplace(ioc, policy);
57     // Subscription constructor
58     policy->invalidResp = retryRespHandler;
59 }
60 
Subscription(crow::sse_socket::Connection & connIn)61 Subscription::Subscription(crow::sse_socket::Connection& connIn) :
62     userSub{std::make_shared<persistent_data::UserSubscription>()},
63     sseConn(&connIn), hbTimer(crow::connections::systemBus->get_io_context())
64 {}
65 
66 // callback for subscription sendData
resHandler(const std::shared_ptr<Subscription> &,const crow::Response & res)67 void Subscription::resHandler(const std::shared_ptr<Subscription>& /*self*/,
68                               const crow::Response& res)
69 {
70     BMCWEB_LOG_DEBUG("Response handled with return code: {}", res.resultInt());
71 
72     if (!client)
73     {
74         BMCWEB_LOG_ERROR(
75             "Http client wasn't filled but http client callback was called.");
76         return;
77     }
78 
79     if (userSub->retryPolicy != "TerminateAfterRetries")
80     {
81         return;
82     }
83     if (client->isTerminated())
84     {
85         hbTimer.cancel();
86         if (deleter)
87         {
88             BMCWEB_LOG_INFO("Subscription {} is deleted after MaxRetryAttempts",
89                             userSub->id);
90             deleter();
91         }
92     }
93 }
94 
sendHeartbeatEvent()95 void Subscription::sendHeartbeatEvent()
96 {
97     // send the heartbeat message
98     nlohmann::json eventMessage = messages::redfishServiceFunctional();
99     eventMessage["EventTimestamp"] = time_utils::getDateTimeOffsetNow().first;
100     eventMessage["OriginOfCondition"] =
101         std::format("/redfish/v1/EventService/Subscriptions/{}", userSub->id);
102     eventMessage["MemberId"] = "0";
103 
104     nlohmann::json::array_t eventRecord;
105     eventRecord.emplace_back(std::move(eventMessage));
106 
107     nlohmann::json msgJson;
108     msgJson["@odata.type"] = "#Event.v1_4_0.Event";
109     msgJson["Name"] = "Heartbeat";
110     msgJson["Events"] = std::move(eventRecord);
111 
112     std::string strMsg =
113         msgJson.dump(2, ' ', true, nlohmann::json::error_handler_t::replace);
114 
115     // Note, eventId here is always zero, because this is a a per subscription
116     // event and doesn't have an "ID"
117     uint64_t eventId = 0;
118     sendEventToSubscriber(eventId, std::move(strMsg));
119 }
120 
scheduleNextHeartbeatEvent()121 void Subscription::scheduleNextHeartbeatEvent()
122 {
123     hbTimer.expires_after(std::chrono::minutes(userSub->hbIntervalMinutes));
124     hbTimer.async_wait(
125         std::bind_front(&Subscription::onHbTimeout, this, weak_from_this()));
126 }
127 
heartbeatParametersChanged()128 void Subscription::heartbeatParametersChanged()
129 {
130     hbTimer.cancel();
131 
132     if (userSub->sendHeartbeat)
133     {
134         scheduleNextHeartbeatEvent();
135     }
136 }
137 
onHbTimeout(const std::weak_ptr<Subscription> & weakSelf,const boost::system::error_code & ec)138 void Subscription::onHbTimeout(const std::weak_ptr<Subscription>& weakSelf,
139                                const boost::system::error_code& ec)
140 {
141     if (ec == boost::asio::error::operation_aborted)
142     {
143         BMCWEB_LOG_DEBUG("heartbeat timer async_wait is aborted");
144         return;
145     }
146     if (ec == boost::system::errc::operation_canceled)
147     {
148         BMCWEB_LOG_DEBUG("heartbeat timer async_wait canceled");
149         return;
150     }
151     if (ec)
152     {
153         BMCWEB_LOG_CRITICAL("heartbeat timer async_wait failed: {}", ec);
154         return;
155     }
156 
157     std::shared_ptr<Subscription> self = weakSelf.lock();
158     if (!self)
159     {
160         BMCWEB_LOG_CRITICAL("onHbTimeout failed on Subscription");
161         return;
162     }
163 
164     // Timer expired.
165     sendHeartbeatEvent();
166 
167     // reschedule heartbeat timer
168     scheduleNextHeartbeatEvent();
169 }
170 
sendEventToSubscriber(uint64_t eventId,std::string && msg)171 bool Subscription::sendEventToSubscriber(uint64_t eventId, std::string&& msg)
172 {
173     persistent_data::EventServiceConfig eventServiceConfig =
174         persistent_data::EventServiceStore::getInstance()
175             .getEventServiceConfig();
176     if (!eventServiceConfig.enabled)
177     {
178         return false;
179     }
180 
181     if (client)
182     {
183         boost::beast::http::fields httpHeadersCopy(userSub->httpHeaders);
184         httpHeadersCopy.set(boost::beast::http::field::content_type,
185                             "application/json");
186         client->sendDataWithCallback(
187             std::move(msg), userSub->destinationUrl,
188             static_cast<ensuressl::VerifyCertificate>(
189                 userSub->verifyCertificate),
190             httpHeadersCopy, boost::beast::http::verb::post,
191             std::bind_front(&Subscription::resHandler, this,
192                             shared_from_this()));
193         return true;
194     }
195 
196     if (sseConn != nullptr)
197     {
198         sseConn->sendSseEvent(std::to_string(eventId), msg);
199     }
200     return true;
201 }
202 
filterAndSendEventLogs(uint64_t eventId,const std::vector<EventLogObjectsType> & eventRecords)203 void Subscription::filterAndSendEventLogs(
204     uint64_t eventId, const std::vector<EventLogObjectsType>& eventRecords)
205 {
206     nlohmann::json::array_t logEntryArray;
207     for (const EventLogObjectsType& logEntry : eventRecords)
208     {
209         BMCWEB_LOG_DEBUG("Processing logEntry: {}, {} '{}'", logEntry.id,
210                          logEntry.timestamp, logEntry.messageId);
211         std::vector<std::string_view> messageArgsView(
212             logEntry.messageArgs.begin(), logEntry.messageArgs.end());
213 
214         nlohmann::json::object_t bmcLogEntry;
215         if (event_log::formatEventLogEntry(
216                 eventId, logEntry.id, logEntry.messageId, messageArgsView,
217                 logEntry.timestamp, userSub->customText, bmcLogEntry) != 0)
218         {
219             BMCWEB_LOG_WARNING("Read eventLog entry failed");
220             continue;
221         }
222 
223         if (!eventMatchesFilter(*userSub, bmcLogEntry, ""))
224         {
225             BMCWEB_LOG_DEBUG("Event {} did not match the filter",
226                              nlohmann::json(bmcLogEntry).dump());
227             continue;
228         }
229 
230         if (filter)
231         {
232             if (!memberMatches(bmcLogEntry, *filter))
233             {
234                 BMCWEB_LOG_DEBUG("Filter didn't match");
235                 continue;
236             }
237         }
238 
239         logEntryArray.emplace_back(std::move(bmcLogEntry));
240         eventId++;
241     }
242 
243     if (logEntryArray.empty())
244     {
245         BMCWEB_LOG_DEBUG("No log entries available to be transferred.");
246         return;
247     }
248 
249     nlohmann::json msg;
250     msg["@odata.type"] = "#Event.v1_4_0.Event";
251     msg["Id"] = std::to_string(eventId);
252     msg["Name"] = "Event Log";
253     msg["Events"] = std::move(logEntryArray);
254     std::string strMsg =
255         msg.dump(2, ' ', true, nlohmann::json::error_handler_t::replace);
256     sendEventToSubscriber(eventId, std::move(strMsg));
257 }
258 
filterAndSendReports(uint64_t eventId,const std::string & reportId,const telemetry::TimestampReadings & var)259 void Subscription::filterAndSendReports(uint64_t eventId,
260                                         const std::string& reportId,
261                                         const telemetry::TimestampReadings& var)
262 {
263     boost::urls::url mrdUri = boost::urls::format(
264         "/redfish/v1/TelemetryService/MetricReportDefinitions/{}", reportId);
265 
266     // Empty list means no filter. Send everything.
267     if (!userSub->metricReportDefinitions.empty())
268     {
269         if (std::ranges::find(userSub->metricReportDefinitions,
270                               mrdUri.buffer()) ==
271             userSub->metricReportDefinitions.end())
272         {
273             return;
274         }
275     }
276 
277     nlohmann::json msg;
278     if (!telemetry::fillReport(msg, reportId, var))
279     {
280         BMCWEB_LOG_ERROR("Failed to fill the MetricReport for DBus "
281                          "Report with id {}",
282                          reportId);
283         return;
284     }
285 
286     // Context is set by user during Event subscription and it must be
287     // set for MetricReport response.
288     if (!userSub->customText.empty())
289     {
290         msg["Context"] = userSub->customText;
291     }
292 
293     std::string strMsg =
294         msg.dump(2, ' ', true, nlohmann::json::error_handler_t::replace);
295     sendEventToSubscriber(eventId, std::move(strMsg));
296 }
297 
updateRetryConfig(uint32_t retryAttempts,uint32_t retryTimeoutInterval)298 void Subscription::updateRetryConfig(uint32_t retryAttempts,
299                                      uint32_t retryTimeoutInterval)
300 {
301     if (policy == nullptr)
302     {
303         BMCWEB_LOG_DEBUG("Retry policy was nullptr, ignoring set");
304         return;
305     }
306     policy->maxRetryAttempts = retryAttempts;
307     policy->retryIntervalSecs = std::chrono::seconds(retryTimeoutInterval);
308 }
309 
matchSseId(const crow::sse_socket::Connection & thisConn)310 bool Subscription::matchSseId(const crow::sse_socket::Connection& thisConn)
311 {
312     return &thisConn == sseConn;
313 }
314 
315 // Check used to indicate what response codes are valid as part of our retry
316 // policy.  2XX is considered acceptable
retryRespHandler(unsigned int respCode)317 boost::system::error_code Subscription::retryRespHandler(unsigned int respCode)
318 {
319     BMCWEB_LOG_DEBUG("Checking response code validity for SubscriptionEvent");
320     if ((respCode < 200) || (respCode >= 300))
321     {
322         return boost::system::errc::make_error_code(
323             boost::system::errc::result_out_of_range);
324     }
325 
326     // Return 0 if the response code is valid
327     return boost::system::errc::make_error_code(boost::system::errc::success);
328 }
329 
330 } // namespace redfish
331