xref: /openbmc/bmcweb/http/server_sent_event.hpp (revision 56431b29998d58c43b101b5f55401e505c85be5e)
1 #pragma once
2 #include "boost_formatters.hpp"
3 #include "http_body.hpp"
4 #include "http_request.hpp"
5 #include "http_response.hpp"
6 
7 #include <boost/asio/buffer.hpp>
8 #include <boost/asio/steady_timer.hpp>
9 #include <boost/beast/core/multi_buffer.hpp>
10 #include <boost/beast/websocket.hpp>
11 
12 #include <array>
13 #include <cstddef>
14 #include <functional>
15 #include <optional>
16 
17 namespace crow
18 {
19 
20 namespace sse_socket
21 {
22 struct Connection : public std::enable_shared_from_this<Connection>
23 {
24   public:
25     Connection() = default;
26 
27     Connection(const Connection&) = delete;
28     Connection(Connection&&) = delete;
29     Connection& operator=(const Connection&) = delete;
30     Connection& operator=(const Connection&&) = delete;
31     virtual ~Connection() = default;
32 
33     virtual boost::asio::io_context& getIoContext() = 0;
34     virtual void close(std::string_view msg = "quit") = 0;
35     virtual void sendSseEvent(std::string_view id, std::string_view msg) = 0;
36 };
37 
38 template <typename Adaptor>
39 class ConnectionImpl : public Connection
40 {
41   public:
42     ConnectionImpl(
43         Adaptor&& adaptorIn,
44         std::function<void(Connection&, const Request&)> openHandlerIn,
45         std::function<void(Connection&)> closeHandlerIn) :
46         adaptor(std::move(adaptorIn)),
47         timer(static_cast<boost::asio::io_context&>(
48             adaptor.get_executor().context())),
49         openHandler(std::move(openHandlerIn)),
50         closeHandler(std::move(closeHandlerIn))
51 
52     {
53         BMCWEB_LOG_DEBUG("SseConnectionImpl: SSE constructor {}", logPtr(this));
54     }
55 
56     ConnectionImpl(const ConnectionImpl&) = delete;
57     ConnectionImpl(const ConnectionImpl&&) = delete;
58     ConnectionImpl& operator=(const ConnectionImpl&) = delete;
59     ConnectionImpl& operator=(const ConnectionImpl&&) = delete;
60 
61     ~ConnectionImpl() override
62     {
63         BMCWEB_LOG_DEBUG("SSE ConnectionImpl: SSE destructor {}", logPtr(this));
64     }
65 
66     boost::asio::io_context& getIoContext() override
67     {
68         return static_cast<boost::asio::io_context&>(
69             adaptor.get_executor().context());
70     }
71 
72     void start(const Request& req)
73     {
74         BMCWEB_LOG_DEBUG("Starting SSE connection");
75 
76         res.set(boost::beast::http::field::content_type, "text/event-stream");
77         boost::beast::http::response_serializer<BodyType>& serial =
78             serializer.emplace(res);
79 
80         boost::beast::http::async_write_header(
81             adaptor, serial,
82             std::bind_front(&ConnectionImpl::sendSSEHeaderCallback, this,
83                             shared_from_this(), req));
84     }
85 
86     void close(const std::string_view msg) override
87     {
88         BMCWEB_LOG_DEBUG("Closing connection with reason {}", msg);
89         // send notification to handler for cleanup
90         if (closeHandler)
91         {
92             closeHandler(*this);
93         }
94         BMCWEB_LOG_DEBUG("Closing SSE connection {} - {}", logPtr(this), msg);
95         boost::beast::get_lowest_layer(adaptor).close();
96     }
97 
98     void sendSSEHeaderCallback(const std::shared_ptr<Connection>& /*self*/,
99                                const Request& req,
100                                const boost::system::error_code& ec,
101                                size_t /*bytesSent*/)
102     {
103         serializer.reset();
104         if (ec)
105         {
106             BMCWEB_LOG_ERROR("Error sending header{}", ec);
107             close("async_write_header failed");
108             return;
109         }
110         BMCWEB_LOG_DEBUG("SSE header sent - Connection established");
111         if (!openHandler)
112         {
113             BMCWEB_LOG_CRITICAL("No open handler???");
114             return;
115         }
116         openHandler(*this, req);
117 
118         // SSE stream header sent, So let us setup monitor.
119         // Any read data on this stream will be error in case of SSE.
120         adaptor.async_read_some(boost::asio::buffer(buffer),
121                                 std::bind_front(&ConnectionImpl::afterReadError,
122                                                 this, shared_from_this()));
123     }
124 
125     void afterReadError(const std::shared_ptr<Connection>& /*self*/,
126                         const boost::system::error_code& ec, size_t bytesRead)
127     {
128         BMCWEB_LOG_DEBUG("Read {}", bytesRead);
129         if (ec == boost::asio::error::operation_aborted)
130         {
131             return;
132         }
133         if (ec)
134         {
135             BMCWEB_LOG_ERROR("Read error: {}", ec);
136         }
137 
138         close("Close SSE connection");
139     }
140 
141     void doWrite()
142     {
143         if (doingWrite)
144         {
145             return;
146         }
147         if (inputBuffer.size() == 0)
148         {
149             BMCWEB_LOG_DEBUG("inputBuffer is empty... Bailing out");
150             return;
151         }
152         startTimeout();
153         doingWrite = true;
154 
155         adaptor.async_write_some(
156             inputBuffer.data(),
157             std::bind_front(&ConnectionImpl::doWriteCallback, this,
158                             shared_from_this()));
159     }
160 
161     void doWriteCallback(const std::shared_ptr<Connection>& /*self*/,
162                          const boost::beast::error_code& ec,
163                          size_t bytesTransferred)
164     {
165         timer.cancel();
166         doingWrite = false;
167         inputBuffer.consume(bytesTransferred);
168 
169         if (ec == boost::asio::error::eof)
170         {
171             BMCWEB_LOG_ERROR("async_write_some() SSE stream closed");
172             close("SSE stream closed");
173             return;
174         }
175 
176         if (ec)
177         {
178             BMCWEB_LOG_ERROR("async_write_some() failed: {}", ec.message());
179             close("async_write_some failed");
180             return;
181         }
182         BMCWEB_LOG_DEBUG("async_write_some() bytes transferred: {}",
183                          bytesTransferred);
184 
185         doWrite();
186     }
187 
188     void sendSseEvent(std::string_view id, std::string_view msg) override
189     {
190         if (msg.empty())
191         {
192             BMCWEB_LOG_DEBUG("Empty data, bailing out.");
193             return;
194         }
195 
196         dataFormat(id, msg);
197 
198         doWrite();
199     }
200 
201     void dataFormat(std::string_view id, std::string_view msg)
202     {
203         constexpr size_t bufferLimit = 10485760U; // 10MB
204         if (id.size() + msg.size() + inputBuffer.size() >= bufferLimit)
205         {
206             BMCWEB_LOG_ERROR("SSE Buffer overflow while waiting for client");
207             close("Buffer overflow");
208             return;
209         }
210         std::string rawData;
211         if (!id.empty())
212         {
213             rawData += "id: ";
214             rawData.append(id);
215             rawData += "\n";
216         }
217 
218         rawData += "data: ";
219         for (char character : msg)
220         {
221             rawData += character;
222             if (character == '\n')
223             {
224                 rawData += "data: ";
225             }
226         }
227         rawData += "\n\n";
228 
229         size_t copied = boost::asio::buffer_copy(
230             inputBuffer.prepare(rawData.size()), boost::asio::buffer(rawData));
231         inputBuffer.commit(copied);
232     }
233 
234     void startTimeout()
235     {
236         std::weak_ptr<Connection> weakSelf = weak_from_this();
237         timer.expires_after(std::chrono::seconds(30));
238         timer.async_wait(std::bind_front(&ConnectionImpl::onTimeoutCallback,
239                                          this, weak_from_this()));
240     }
241 
242     void onTimeoutCallback(const std::weak_ptr<Connection>& weakSelf,
243                            const boost::system::error_code& ec)
244     {
245         std::shared_ptr<Connection> self = weakSelf.lock();
246         if (!self)
247         {
248             BMCWEB_LOG_CRITICAL("{} Failed to capture connection",
249                                 logPtr(self.get()));
250             return;
251         }
252 
253         if (ec == boost::asio::error::operation_aborted)
254         {
255             BMCWEB_LOG_DEBUG("Timer operation aborted");
256             // Canceled wait means the path succeeded.
257             return;
258         }
259         if (ec)
260         {
261             BMCWEB_LOG_CRITICAL("{} timer failed {}", logPtr(self.get()), ec);
262         }
263 
264         BMCWEB_LOG_WARNING("{} Connection timed out, closing",
265                            logPtr(self.get()));
266 
267         self->close("closing connection");
268     }
269 
270   private:
271     std::array<char, 1> buffer{};
272     boost::beast::multi_buffer inputBuffer;
273 
274     Adaptor adaptor;
275 
276     using BodyType = bmcweb::HttpBody;
277     boost::beast::http::response<BodyType> res;
278     std::optional<boost::beast::http::response_serializer<BodyType>> serializer;
279     boost::asio::steady_timer timer;
280     bool doingWrite = false;
281 
282     std::function<void(Connection&, const Request&)> openHandler;
283     std::function<void(Connection&)> closeHandler;
284 };
285 } // namespace sse_socket
286 } // namespace crow
287