xref: /openbmc/bmcweb/http/server_sent_event.hpp (revision 88ada3bc)
1 #pragma once
2 #include "async_resolve.hpp"
3 #include "async_resp.hpp"
4 #include "http_request.hpp"
5 #include "http_response.hpp"
6 
7 #include <boost/algorithm/string/predicate.hpp>
8 #include <boost/asio/buffer.hpp>
9 #include <boost/asio/steady_timer.hpp>
10 #include <boost/beast/core/multi_buffer.hpp>
11 #include <boost/beast/http/buffer_body.hpp>
12 #include <boost/beast/websocket.hpp>
13 
14 #include <array>
15 #include <functional>
16 
17 #ifdef BMCWEB_ENABLE_SSL
18 #include <boost/beast/websocket/ssl.hpp>
19 #endif
20 
21 namespace crow
22 {
23 
24 namespace sse_socket
25 {
26 static constexpr const std::array<const char*, 1> sseRoutes = {
27     "/redfish/v1/EventService/SSE"};
28 
29 struct Connection : std::enable_shared_from_this<Connection>
30 {
31   public:
32     explicit Connection(const crow::Request& reqIn) : req(reqIn) {}
33 
34     Connection(const Connection&) = delete;
35     Connection(Connection&&) = delete;
36     Connection& operator=(const Connection&) = delete;
37     Connection& operator=(const Connection&&) = delete;
38     virtual ~Connection() = default;
39 
40     virtual boost::asio::io_context& getIoContext() = 0;
41     virtual void sendSSEHeader() = 0;
42     virtual void completeRequest(crow::Response& thisRes) = 0;
43     virtual void close(std::string_view msg = "quit") = 0;
44     virtual void sendEvent(std::string_view id, std::string_view msg) = 0;
45 
46     crow::Request req;
47 };
48 
49 template <typename Adaptor>
50 class ConnectionImpl : public Connection
51 {
52   public:
53     ConnectionImpl(
54         const crow::Request& reqIn, Adaptor adaptorIn,
55         std::function<void(std::shared_ptr<Connection>&, const crow::Request&,
56                            const std::shared_ptr<bmcweb::AsyncResp>&)>
57             openHandlerIn,
58         std::function<void(std::shared_ptr<Connection>&)> closeHandlerIn) :
59         Connection(reqIn),
60         adaptor(std::move(adaptorIn)), openHandler(std::move(openHandlerIn)),
61         closeHandler(std::move(closeHandlerIn))
62     {
63         BMCWEB_LOG_DEBUG << "SseConnectionImpl: SSE constructor " << this;
64     }
65 
66     ConnectionImpl(const ConnectionImpl&) = delete;
67     ConnectionImpl(const ConnectionImpl&&) = delete;
68     ConnectionImpl& operator=(const ConnectionImpl&) = delete;
69     ConnectionImpl& operator=(const ConnectionImpl&&) = delete;
70 
71     ~ConnectionImpl() override
72     {
73         BMCWEB_LOG_DEBUG << "SSE ConnectionImpl: SSE destructor " << this;
74     }
75 
76     boost::asio::io_context& getIoContext() override
77     {
78         return static_cast<boost::asio::io_context&>(
79             adaptor.get_executor().context());
80     }
81 
82     void start()
83     {
84         if (openHandler)
85         {
86             auto asyncResp = std::make_shared<bmcweb::AsyncResp>();
87             std::shared_ptr<Connection> self = this->shared_from_this();
88 
89             asyncResp->res.setCompleteRequestHandler(
90                 [self(shared_from_this())](crow::Response& thisRes) {
91                 if (thisRes.resultInt() != 200)
92                 {
93                     self->completeRequest(thisRes);
94                 }
95             });
96 
97             openHandler(self, req, asyncResp);
98         }
99     }
100 
101     void close(const std::string_view msg) override
102     {
103         BMCWEB_LOG_DEBUG << "Closing SSE connection " << this << " - " << msg;
104         boost::beast::get_lowest_layer(adaptor).close();
105 
106         // send notification to handler for cleanup
107         if (closeHandler)
108         {
109             std::shared_ptr<Connection> self = shared_from_this();
110             closeHandler(self);
111         }
112     }
113 
114     void sendSSEHeader() override
115     {
116         BMCWEB_LOG_DEBUG << "Starting SSE connection";
117         auto asyncResp = std::make_shared<bmcweb::AsyncResp>();
118         using BodyType = boost::beast::http::buffer_body;
119         auto response =
120             std::make_shared<boost::beast::http::response<BodyType>>(
121                 boost::beast::http::status::ok, 11);
122 
123         serializer.emplace(*asyncResp->res.stringResponse);
124 
125         response->set(boost::beast::http::field::content_type,
126                       "text/event-stream");
127         response->body().more = true;
128 
129         boost::beast::http::async_write_header(
130             adaptor, *serializer,
131             std::bind_front(&ConnectionImpl::sendSSEHeaderCallback, this,
132                             shared_from_this()));
133     }
134 
135     void sendSSEHeaderCallback(const std::shared_ptr<Connection>& /*self*/,
136                                const boost::beast::error_code& ec,
137                                const std::size_t& /*unused*/)
138     {
139         if (ec)
140         {
141             BMCWEB_LOG_ERROR << "Error sending header" << ec;
142             close("async_write_header failed");
143             return;
144         }
145         BMCWEB_LOG_DEBUG << "SSE header sent - Connection established";
146 
147         serializer.reset();
148 
149         // SSE stream header sent, So let us setup monitor.
150         // Any read data on this stream will be error in case of SSE.
151         setupRead();
152     }
153 
154     void setupRead()
155     {
156         std::weak_ptr<Connection> weakSelf = weak_from_this();
157 
158         boost::beast::http::async_read_some(
159             adaptor, outputBuffer, *parser,
160             std::bind_front(&ConnectionImpl::setupReadCallback, this,
161                             weak_from_this()));
162     }
163 
164     void setupReadCallback(const std::weak_ptr<Connection>& weakSelf,
165                            const boost::system::error_code& ec,
166                            size_t bytesRead)
167     {
168         std::shared_ptr<Connection> self = weakSelf.lock();
169         BMCWEB_LOG_DEBUG << "async_read_some: Read " << bytesRead << " bytes";
170         if (ec)
171         {
172             BMCWEB_LOG_ERROR << "Read error: " << ec;
173         }
174 
175         // After establishing SSE stream, Reading data on this
176         // stream means client is disobeys the SSE protocol.
177         // Read the data to avoid buffer attacks and close connection.
178 
179         self->close("Close SSE connection");
180     }
181 
182     void doWrite()
183     {
184         onTimeout();
185 
186         if (doingWrite)
187         {
188             return;
189         }
190         if (inputBuffer.size() == 0)
191         {
192             BMCWEB_LOG_DEBUG << "inputBuffer is empty... Bailing out";
193             return;
194         }
195         doingWrite = true;
196 
197         adaptor.async_write_some(
198             inputBuffer.data(),
199             std::bind_front(&ConnectionImpl::doWriteCallback, this,
200                             shared_from_this()));
201     }
202 
203     void doWriteCallback(const std::shared_ptr<Connection>& /*self*/,
204                          const boost::beast::error_code& ec,
205                          const size_t bytesTransferred)
206     {
207         doingWrite = false;
208         inputBuffer.consume(bytesTransferred);
209 
210         if (ec == boost::asio::error::eof)
211         {
212             BMCWEB_LOG_ERROR << "async_write_some() SSE stream closed";
213             close("SSE stream closed");
214             return;
215         }
216 
217         if (ec)
218         {
219             BMCWEB_LOG_ERROR << "async_write_some() failed: " << ec.message();
220             close("async_write_some failed");
221             return;
222         }
223         BMCWEB_LOG_DEBUG << "async_write_some() bytes transferred: "
224                          << bytesTransferred;
225 
226         doWrite();
227     }
228 
229     void completeRequest(crow::Response& thisRes) override
230     {
231         auto asyncResp = std::make_shared<bmcweb::AsyncResp>();
232         asyncResp->res = std::move(thisRes);
233 
234         if (asyncResp->res.body().empty() && !asyncResp->res.jsonValue.empty())
235         {
236             asyncResp->res.addHeader(boost::beast::http::field::content_type,
237                                      "application/json");
238             asyncResp->res.body() = asyncResp->res.jsonValue.dump(
239                 2, ' ', true, nlohmann::json::error_handler_t::replace);
240         }
241 
242         asyncResp->res.preparePayload();
243 
244         serializer.emplace(*asyncResp->res.stringResponse);
245 
246         boost::beast::http::async_write_some(
247             adaptor, *serializer,
248             std::bind_front(&ConnectionImpl::completeRequestCallback, this,
249                             shared_from_this()));
250     }
251 
252     void completeRequestCallback(const std::shared_ptr<Connection>& /*self*/,
253                                  const boost::system::error_code& ec,
254                                  std::size_t bytesTransferred)
255     {
256         auto asyncResp = std::make_shared<bmcweb::AsyncResp>();
257         BMCWEB_LOG_DEBUG << this << " async_write " << bytesTransferred
258                          << " bytes";
259         if (ec)
260         {
261             BMCWEB_LOG_DEBUG << this << " from async_write failed";
262             return;
263         }
264 
265         BMCWEB_LOG_DEBUG << this << " Closing SSE connection - Request invalid";
266         serializer.reset();
267         close("Request invalid");
268         asyncResp->res.releaseCompleteRequestHandler();
269     }
270 
271     void sendEvent(std::string_view id, std::string_view msg) override
272     {
273         if (msg.empty())
274         {
275             BMCWEB_LOG_DEBUG << "Empty data, bailing out.";
276             return;
277         }
278 
279         dataFormat(id);
280 
281         doWrite();
282     }
283 
284     void dataFormat(std::string_view id)
285     {
286         std::string_view msg;
287         std::string rawData;
288         if (!id.empty())
289         {
290             rawData += "id: ";
291             rawData.append(id.begin(), id.end());
292             rawData += "\n";
293         }
294 
295         rawData += "data: ";
296         for (char character : msg)
297         {
298             rawData += character;
299             if (character == '\n')
300             {
301                 rawData += "data: ";
302             }
303         }
304         rawData += "\n\n";
305 
306         boost::asio::buffer_copy(inputBuffer.prepare(rawData.size()),
307                                  boost::asio::buffer(rawData));
308         inputBuffer.commit(rawData.size());
309     }
310 
311     void onTimeout()
312     {
313         boost::asio::steady_timer timer(ioc);
314         std::weak_ptr<Connection> weakSelf = weak_from_this();
315         timer.expires_after(std::chrono::seconds(30));
316         timer.async_wait(std::bind_front(&ConnectionImpl::onTimeoutCallback,
317                                          this, weak_from_this()));
318     }
319 
320     void onTimeoutCallback(const std::weak_ptr<Connection>& weakSelf,
321                            const boost::system::error_code ec)
322     {
323         std::shared_ptr<Connection> self = weakSelf.lock();
324         if (!self)
325         {
326             BMCWEB_LOG_CRITICAL << self << " Failed to capture connection";
327             return;
328         }
329 
330         if (ec == boost::asio::error::operation_aborted)
331         {
332             BMCWEB_LOG_DEBUG << "operation aborted";
333             // Canceled wait means the path succeeeded.
334             return;
335         }
336         if (ec)
337         {
338             BMCWEB_LOG_CRITICAL << self << " timer failed " << ec;
339         }
340 
341         BMCWEB_LOG_WARNING << self << "Connection timed out, closing";
342 
343         self->close("closing connection");
344     }
345 
346   private:
347     Adaptor adaptor;
348 
349     boost::beast::multi_buffer outputBuffer;
350     boost::beast::multi_buffer inputBuffer;
351 
352     std::optional<boost::beast::http::response_serializer<
353         boost::beast::http::string_body>>
354         serializer;
355     boost::asio::io_context& ioc =
356         crow::connections::systemBus->get_io_context();
357     bool doingWrite = false;
358     std::optional<
359         boost::beast::http::request_parser<boost::beast::http::string_body>>
360         parser;
361 
362     std::function<void(std::shared_ptr<Connection>&, const crow::Request&,
363                        const std::shared_ptr<bmcweb::AsyncResp>&)>
364         openHandler;
365     std::function<void(std::shared_ptr<Connection>&)> closeHandler;
366 };
367 } // namespace sse_socket
368 } // namespace crow
369