xref: /openbmc/bmcweb/redfish-core/src/subscription.cpp (revision 32ea63d365aeef63a53e4e89d4db56c70cea940e)
1 // SPDX-License-Identifier: Apache-2.0
2 // SPDX-FileCopyrightText: Copyright OpenBMC Authors
3 // SPDX-FileCopyrightText: Copyright 2020 Intel Corporation
4 #include "subscription.hpp"
5 
6 #include "dbus_singleton.hpp"
7 #include "event_log.hpp"
8 #include "event_logs_object_type.hpp"
9 #include "event_matches_filter.hpp"
10 #include "event_service_store.hpp"
11 #include "filter_expr_executor.hpp"
12 #include "heartbeat_messages.hpp"
13 #include "http_client.hpp"
14 #include "http_response.hpp"
15 #include "logging.hpp"
16 #include "server_sent_event.hpp"
17 #include "ssl_key_handler.hpp"
18 #include "telemetry_readings.hpp"
19 #include "utils/time_utils.hpp"
20 
21 #include <boost/asio/error.hpp>
22 #include <boost/asio/io_context.hpp>
23 #include <boost/asio/steady_timer.hpp>
24 #include <boost/beast/http/field.hpp>
25 #include <boost/beast/http/verb.hpp>
26 #include <boost/system/errc.hpp>
27 #include <boost/url/format.hpp>
28 #include <boost/url/url_view_base.hpp>
29 #include <nlohmann/json.hpp>
30 
31 #include <algorithm>
32 #include <chrono>
33 #include <cstdint>
34 #include <functional>
35 #include <memory>
36 #include <span>
37 #include <string>
38 #include <string_view>
39 #include <utility>
40 #include <vector>
41 
42 namespace redfish
43 {
44 
Subscription(std::shared_ptr<persistent_data::UserSubscription> userSubIn,const boost::urls::url_view_base & url,boost::asio::io_context & ioc)45 Subscription::Subscription(
46     std::shared_ptr<persistent_data::UserSubscription> userSubIn,
47     const boost::urls::url_view_base& url, boost::asio::io_context& ioc) :
48     userSub{std::move(userSubIn)},
49     policy(std::make_shared<crow::ConnectionPolicy>()), hbTimer(ioc)
50 {
51     userSub->destinationUrl = url;
52     client.emplace(ioc, policy);
53     // Subscription constructor
54     policy->invalidResp = retryRespHandler;
55 }
56 
Subscription(crow::sse_socket::Connection & connIn)57 Subscription::Subscription(crow::sse_socket::Connection& connIn) :
58     userSub{std::make_shared<persistent_data::UserSubscription>()},
59     sseConn(&connIn), hbTimer(crow::connections::systemBus->get_io_context())
60 {}
61 
62 // callback for subscription sendData
resHandler(const std::shared_ptr<Subscription> &,const crow::Response & res)63 void Subscription::resHandler(const std::shared_ptr<Subscription>& /*self*/,
64                               const crow::Response& res)
65 {
66     BMCWEB_LOG_DEBUG("Response handled with return code: {}", res.resultInt());
67 
68     if (!client)
69     {
70         BMCWEB_LOG_ERROR(
71             "Http client wasn't filled but http client callback was called.");
72         return;
73     }
74 
75     if (userSub->retryPolicy != "TerminateAfterRetries")
76     {
77         return;
78     }
79     if (client->isTerminated())
80     {
81         hbTimer.cancel();
82         if (deleter)
83         {
84             BMCWEB_LOG_INFO("Subscription {} is deleted after MaxRetryAttempts",
85                             userSub->id);
86             deleter();
87         }
88     }
89 }
90 
sendHeartbeatEvent()91 void Subscription::sendHeartbeatEvent()
92 {
93     // send the heartbeat message
94     nlohmann::json eventMessage = messages::redfishServiceFunctional();
95     eventMessage["EventTimestamp"] = time_utils::getDateTimeOffsetNow().first;
96     eventMessage["OriginOfCondition"] = boost::urls::format(
97         "/redfish/v1/EventService/Subscriptions/{}", userSub->id);
98     eventMessage["MemberId"] = "0";
99 
100     nlohmann::json::array_t eventRecord;
101     eventRecord.emplace_back(std::move(eventMessage));
102 
103     nlohmann::json msgJson;
104     msgJson["@odata.type"] = "#Event.v1_4_0.Event";
105     msgJson["Name"] = "Heartbeat";
106     msgJson["Events"] = std::move(eventRecord);
107 
108     std::string strMsg =
109         msgJson.dump(2, ' ', true, nlohmann::json::error_handler_t::replace);
110 
111     // Note, eventId here is always zero, because this is a a per subscription
112     // event and doesn't have an "ID"
113     uint64_t eventId = 0;
114     sendEventToSubscriber(eventId, std::move(strMsg));
115 }
116 
scheduleNextHeartbeatEvent()117 void Subscription::scheduleNextHeartbeatEvent()
118 {
119     hbTimer.expires_after(std::chrono::minutes(userSub->hbIntervalMinutes));
120     hbTimer.async_wait(
121         std::bind_front(&Subscription::onHbTimeout, this, weak_from_this()));
122 }
123 
heartbeatParametersChanged()124 void Subscription::heartbeatParametersChanged()
125 {
126     hbTimer.cancel();
127 
128     if (userSub->sendHeartbeat)
129     {
130         scheduleNextHeartbeatEvent();
131     }
132 }
133 
onHbTimeout(const std::weak_ptr<Subscription> & weakSelf,const boost::system::error_code & ec)134 void Subscription::onHbTimeout(const std::weak_ptr<Subscription>& weakSelf,
135                                const boost::system::error_code& ec)
136 {
137     if (ec == boost::asio::error::operation_aborted)
138     {
139         BMCWEB_LOG_DEBUG("heartbeat timer async_wait is aborted");
140         return;
141     }
142     if (ec == boost::system::errc::operation_canceled)
143     {
144         BMCWEB_LOG_DEBUG("heartbeat timer async_wait canceled");
145         return;
146     }
147     if (ec)
148     {
149         BMCWEB_LOG_CRITICAL("heartbeat timer async_wait failed: {}", ec);
150         return;
151     }
152 
153     std::shared_ptr<Subscription> self = weakSelf.lock();
154     if (!self)
155     {
156         BMCWEB_LOG_CRITICAL("onHbTimeout failed on Subscription");
157         return;
158     }
159 
160     // Timer expired.
161     sendHeartbeatEvent();
162 
163     // reschedule heartbeat timer
164     scheduleNextHeartbeatEvent();
165 }
166 
sendEventToSubscriber(uint64_t eventId,std::string && msg)167 bool Subscription::sendEventToSubscriber(uint64_t eventId, std::string&& msg)
168 {
169     persistent_data::EventServiceConfig eventServiceConfig =
170         persistent_data::EventServiceStore::getInstance()
171             .getEventServiceConfig();
172     if (!eventServiceConfig.enabled)
173     {
174         return false;
175     }
176 
177     if (client)
178     {
179         boost::beast::http::fields httpHeadersCopy(userSub->httpHeaders);
180         httpHeadersCopy.set(boost::beast::http::field::content_type,
181                             "application/json");
182         client->sendDataWithCallback(
183             std::move(msg), userSub->destinationUrl,
184             static_cast<ensuressl::VerifyCertificate>(
185                 userSub->verifyCertificate),
186             httpHeadersCopy, boost::beast::http::verb::post,
187             std::bind_front(&Subscription::resHandler, this,
188                             shared_from_this()));
189         return true;
190     }
191 
192     if (sseConn != nullptr)
193     {
194         sseConn->sendSseEvent(std::to_string(eventId), msg);
195     }
196     return true;
197 }
198 
filterAndSendEventLogs(uint64_t eventId,const std::vector<EventLogObjectsType> & eventRecords)199 void Subscription::filterAndSendEventLogs(
200     uint64_t eventId, const std::vector<EventLogObjectsType>& eventRecords)
201 {
202     nlohmann::json::array_t logEntryArray;
203     for (const EventLogObjectsType& logEntry : eventRecords)
204     {
205         BMCWEB_LOG_DEBUG("Processing logEntry: {}, {} '{}'", logEntry.id,
206                          logEntry.timestamp, logEntry.messageId);
207         std::vector<std::string_view> messageArgsView(
208             logEntry.messageArgs.begin(), logEntry.messageArgs.end());
209 
210         nlohmann::json::object_t bmcLogEntry;
211         if (event_log::formatEventLogEntry(
212                 eventId, logEntry.id, logEntry.messageId, messageArgsView,
213                 logEntry.timestamp, userSub->customText, bmcLogEntry) != 0)
214         {
215             BMCWEB_LOG_WARNING("Read eventLog entry failed");
216             continue;
217         }
218 
219         if (!eventMatchesFilter(*userSub, bmcLogEntry, ""))
220         {
221             BMCWEB_LOG_DEBUG("Event {} did not match the filter",
222                              nlohmann::json(bmcLogEntry).dump());
223             continue;
224         }
225 
226         if (filter)
227         {
228             if (!memberMatches(bmcLogEntry, *filter))
229             {
230                 BMCWEB_LOG_DEBUG("Filter didn't match");
231                 continue;
232             }
233         }
234 
235         logEntryArray.emplace_back(std::move(bmcLogEntry));
236         eventId++;
237     }
238 
239     if (logEntryArray.empty())
240     {
241         BMCWEB_LOG_DEBUG("No log entries available to be transferred.");
242         return;
243     }
244 
245     nlohmann::json msg;
246     msg["@odata.type"] = "#Event.v1_4_0.Event";
247     msg["Id"] = std::to_string(eventId);
248     msg["Name"] = "Event Log";
249     msg["Events"] = std::move(logEntryArray);
250     std::string strMsg =
251         msg.dump(2, ' ', true, nlohmann::json::error_handler_t::replace);
252     sendEventToSubscriber(eventId, std::move(strMsg));
253 }
254 
filterAndSendReports(uint64_t eventId,const std::string & reportId,const telemetry::TimestampReadings & var)255 void Subscription::filterAndSendReports(uint64_t eventId,
256                                         const std::string& reportId,
257                                         const telemetry::TimestampReadings& var)
258 {
259     boost::urls::url mrdUri = boost::urls::format(
260         "/redfish/v1/TelemetryService/MetricReportDefinitions/{}", reportId);
261 
262     // Empty list means no filter. Send everything.
263     if (!userSub->metricReportDefinitions.empty())
264     {
265         if (std::ranges::find(userSub->metricReportDefinitions,
266                               mrdUri.buffer()) ==
267             userSub->metricReportDefinitions.end())
268         {
269             return;
270         }
271     }
272 
273     nlohmann::json msg;
274     if (!telemetry::fillReport(msg, reportId, var))
275     {
276         BMCWEB_LOG_ERROR("Failed to fill the MetricReport for DBus "
277                          "Report with id {}",
278                          reportId);
279         return;
280     }
281 
282     // Context is set by user during Event subscription and it must be
283     // set for MetricReport response.
284     if (!userSub->customText.empty())
285     {
286         msg["Context"] = userSub->customText;
287     }
288 
289     std::string strMsg =
290         msg.dump(2, ' ', true, nlohmann::json::error_handler_t::replace);
291     sendEventToSubscriber(eventId, std::move(strMsg));
292 }
293 
updateRetryConfig(uint32_t retryAttempts,uint32_t retryTimeoutInterval)294 void Subscription::updateRetryConfig(uint32_t retryAttempts,
295                                      uint32_t retryTimeoutInterval)
296 {
297     if (policy == nullptr)
298     {
299         BMCWEB_LOG_DEBUG("Retry policy was nullptr, ignoring set");
300         return;
301     }
302     policy->maxRetryAttempts = retryAttempts;
303     policy->retryIntervalSecs = std::chrono::seconds(retryTimeoutInterval);
304 }
305 
matchSseId(const crow::sse_socket::Connection & thisConn)306 bool Subscription::matchSseId(const crow::sse_socket::Connection& thisConn)
307 {
308     return &thisConn == sseConn;
309 }
310 
311 // Check used to indicate what response codes are valid as part of our retry
312 // policy.  2XX is considered acceptable
retryRespHandler(unsigned int respCode)313 boost::system::error_code Subscription::retryRespHandler(unsigned int respCode)
314 {
315     BMCWEB_LOG_DEBUG("Checking response code validity for SubscriptionEvent");
316     if ((respCode < 200) || (respCode >= 300))
317     {
318         return boost::system::errc::make_error_code(
319             boost::system::errc::result_out_of_range);
320     }
321 
322     // Return 0 if the response code is valid
323     return boost::system::errc::make_error_code(boost::system::errc::success);
324 }
325 
326 } // namespace redfish
327