xref: /openbmc/bmcweb/http/server_sent_event.hpp (revision c6178aba)
1 #pragma once
2 #include "http_body.hpp"
3 #include "http_request.hpp"
4 #include "http_response.hpp"
5 
6 #include <boost/asio/buffer.hpp>
7 #include <boost/asio/steady_timer.hpp>
8 #include <boost/beast/core/multi_buffer.hpp>
9 #include <boost/beast/websocket.hpp>
10 
11 #include <array>
12 #include <cstddef>
13 #include <functional>
14 #include <optional>
15 
16 namespace crow
17 {
18 
19 namespace sse_socket
20 {
21 struct Connection : public std::enable_shared_from_this<Connection>
22 {
23   public:
24     Connection() = default;
25 
26     Connection(const Connection&) = delete;
27     Connection(Connection&&) = delete;
28     Connection& operator=(const Connection&) = delete;
29     Connection& operator=(const Connection&&) = delete;
30     virtual ~Connection() = default;
31 
32     virtual boost::asio::io_context& getIoContext() = 0;
33     virtual void close(std::string_view msg = "quit") = 0;
34     virtual void sendEvent(std::string_view id, std::string_view msg) = 0;
35 };
36 
37 template <typename Adaptor>
38 class ConnectionImpl : public Connection
39 {
40   public:
41     ConnectionImpl(
42         Adaptor&& adaptorIn,
43         std::function<void(Connection&, const Request&)> openHandlerIn,
44         std::function<void(Connection&)> closeHandlerIn) :
45         adaptor(std::move(adaptorIn)),
46         timer(static_cast<boost::asio::io_context&>(
47             adaptor.get_executor().context())),
48         openHandler(std::move(openHandlerIn)),
49         closeHandler(std::move(closeHandlerIn))
50 
51     {
52         BMCWEB_LOG_DEBUG("SseConnectionImpl: SSE constructor {}", logPtr(this));
53     }
54 
55     ConnectionImpl(const ConnectionImpl&) = delete;
56     ConnectionImpl(const ConnectionImpl&&) = delete;
57     ConnectionImpl& operator=(const ConnectionImpl&) = delete;
58     ConnectionImpl& operator=(const ConnectionImpl&&) = delete;
59 
60     ~ConnectionImpl() override
61     {
62         BMCWEB_LOG_DEBUG("SSE ConnectionImpl: SSE destructor {}", logPtr(this));
63     }
64 
65     boost::asio::io_context& getIoContext() override
66     {
67         return static_cast<boost::asio::io_context&>(
68             adaptor.get_executor().context());
69     }
70 
71     void start(const Request& req)
72     {
73         if (!openHandler)
74         {
75             BMCWEB_LOG_CRITICAL("No open handler???");
76             return;
77         }
78         openHandler(*this, req);
79         sendSSEHeader();
80     }
81 
82     void close(const std::string_view msg) override
83     {
84         BMCWEB_LOG_DEBUG("Closing connection with reason {}", msg);
85         // send notification to handler for cleanup
86         if (closeHandler)
87         {
88             closeHandler(*this);
89         }
90         BMCWEB_LOG_DEBUG("Closing SSE connection {} - {}", logPtr(this), msg);
91         boost::beast::get_lowest_layer(adaptor).close();
92     }
93 
94     void sendSSEHeader()
95     {
96         BMCWEB_LOG_DEBUG("Starting SSE connection");
97 
98         res.set(boost::beast::http::field::content_type, "text/event-stream");
99         boost::beast::http::response_serializer<BodyType>& serial =
100             serializer.emplace(res);
101 
102         boost::beast::http::async_write_header(
103             adaptor, serial,
104             std::bind_front(&ConnectionImpl::sendSSEHeaderCallback, this,
105                             shared_from_this()));
106     }
107 
108     void sendSSEHeaderCallback(const std::shared_ptr<Connection>& /*self*/,
109                                const boost::system::error_code& ec,
110                                size_t /*bytesSent*/)
111     {
112         serializer.reset();
113         if (ec)
114         {
115             BMCWEB_LOG_ERROR("Error sending header{}", ec);
116             close("async_write_header failed");
117             return;
118         }
119         BMCWEB_LOG_DEBUG("SSE header sent - Connection established");
120 
121         // SSE stream header sent, So let us setup monitor.
122         // Any read data on this stream will be error in case of SSE.
123         adaptor.async_read_some(boost::asio::buffer(buffer),
124                                 std::bind_front(&ConnectionImpl::afterReadError,
125                                                 this, shared_from_this()));
126     }
127 
128     void afterReadError(const std::shared_ptr<Connection>& /*self*/,
129                         const boost::system::error_code& ec, size_t bytesRead)
130     {
131         BMCWEB_LOG_DEBUG("Read {}", bytesRead);
132         if (ec == boost::asio::error::operation_aborted)
133         {
134             return;
135         }
136         if (ec)
137         {
138             BMCWEB_LOG_ERROR("Read error: {}", ec);
139         }
140 
141         close("Close SSE connection");
142     }
143 
144     void doWrite()
145     {
146         if (doingWrite)
147         {
148             return;
149         }
150         if (inputBuffer.size() == 0)
151         {
152             BMCWEB_LOG_DEBUG("inputBuffer is empty... Bailing out");
153             return;
154         }
155         startTimeout();
156         doingWrite = true;
157 
158         adaptor.async_write_some(
159             inputBuffer.data(),
160             std::bind_front(&ConnectionImpl::doWriteCallback, this,
161                             shared_from_this()));
162     }
163 
164     void doWriteCallback(const std::shared_ptr<Connection>& /*self*/,
165                          const boost::beast::error_code& ec,
166                          size_t bytesTransferred)
167     {
168         timer.cancel();
169         doingWrite = false;
170         inputBuffer.consume(bytesTransferred);
171 
172         if (ec == boost::asio::error::eof)
173         {
174             BMCWEB_LOG_ERROR("async_write_some() SSE stream closed");
175             close("SSE stream closed");
176             return;
177         }
178 
179         if (ec)
180         {
181             BMCWEB_LOG_ERROR("async_write_some() failed: {}", ec.message());
182             close("async_write_some failed");
183             return;
184         }
185         BMCWEB_LOG_DEBUG("async_write_some() bytes transferred: {}",
186                          bytesTransferred);
187 
188         doWrite();
189     }
190 
191     void sendEvent(std::string_view id, std::string_view msg) override
192     {
193         if (msg.empty())
194         {
195             BMCWEB_LOG_DEBUG("Empty data, bailing out.");
196             return;
197         }
198 
199         dataFormat(id, msg);
200 
201         doWrite();
202     }
203 
204     void dataFormat(std::string_view id, std::string_view msg)
205     {
206         constexpr size_t bufferLimit = 10485760U; // 10MB
207         if (id.size() + msg.size() + inputBuffer.size() >= bufferLimit)
208         {
209             BMCWEB_LOG_ERROR("SSE Buffer overflow while waiting for client");
210             close("Buffer overflow");
211             return;
212         }
213         std::string rawData;
214         if (!id.empty())
215         {
216             rawData += "id: ";
217             rawData.append(id);
218             rawData += "\n";
219         }
220 
221         rawData += "data: ";
222         for (char character : msg)
223         {
224             rawData += character;
225             if (character == '\n')
226             {
227                 rawData += "data: ";
228             }
229         }
230         rawData += "\n\n";
231 
232         size_t copied = boost::asio::buffer_copy(
233             inputBuffer.prepare(rawData.size()), boost::asio::buffer(rawData));
234         inputBuffer.commit(copied);
235     }
236 
237     void startTimeout()
238     {
239         std::weak_ptr<Connection> weakSelf = weak_from_this();
240         timer.expires_after(std::chrono::seconds(30));
241         timer.async_wait(std::bind_front(&ConnectionImpl::onTimeoutCallback,
242                                          this, weak_from_this()));
243     }
244 
245     void onTimeoutCallback(const std::weak_ptr<Connection>& weakSelf,
246                            const boost::system::error_code& ec)
247     {
248         std::shared_ptr<Connection> self = weakSelf.lock();
249         if (!self)
250         {
251             BMCWEB_LOG_CRITICAL("{} Failed to capture connection",
252                                 logPtr(self.get()));
253             return;
254         }
255 
256         if (ec == boost::asio::error::operation_aborted)
257         {
258             BMCWEB_LOG_DEBUG("Timer operation aborted");
259             // Canceled wait means the path succeeded.
260             return;
261         }
262         if (ec)
263         {
264             BMCWEB_LOG_CRITICAL("{} timer failed {}", logPtr(self.get()), ec);
265         }
266 
267         BMCWEB_LOG_WARNING("{} Connection timed out, closing",
268                            logPtr(self.get()));
269 
270         self->close("closing connection");
271     }
272 
273   private:
274     std::array<char, 1> buffer{};
275     boost::beast::multi_buffer inputBuffer;
276 
277     Adaptor adaptor;
278 
279     using BodyType = bmcweb::HttpBody;
280     boost::beast::http::response<BodyType> res;
281     std::optional<boost::beast::http::response_serializer<BodyType>> serializer;
282     boost::asio::steady_timer timer;
283     bool doingWrite = false;
284 
285     std::function<void(Connection&, const Request&)> openHandler;
286     std::function<void(Connection&)> closeHandler;
287 };
288 } // namespace sse_socket
289 } // namespace crow
290