xref: /openbmc/bmcweb/http/server_sent_event.hpp (revision b2896149)
1 #pragma once
2 #include "http_body.hpp"
3 #include "http_request.hpp"
4 #include "http_response.hpp"
5 
6 #include <boost/asio/buffer.hpp>
7 #include <boost/asio/steady_timer.hpp>
8 #include <boost/beast/core/multi_buffer.hpp>
9 #include <boost/beast/websocket.hpp>
10 
11 #include <array>
12 #include <cstddef>
13 #include <functional>
14 #include <optional>
15 
16 namespace crow
17 {
18 
19 namespace sse_socket
20 {
21 struct Connection : std::enable_shared_from_this<Connection>
22 {
23   public:
24     Connection() = default;
25 
26     Connection(const Connection&) = delete;
27     Connection(Connection&&) = delete;
28     Connection& operator=(const Connection&) = delete;
29     Connection& operator=(const Connection&&) = delete;
30     virtual ~Connection() = default;
31 
32     virtual boost::asio::io_context& getIoContext() = 0;
33     virtual void close(std::string_view msg = "quit") = 0;
34     virtual void sendEvent(std::string_view id, std::string_view msg) = 0;
35 };
36 
37 template <typename Adaptor>
38 class ConnectionImpl : public Connection
39 {
40   public:
41     ConnectionImpl(boost::asio::io_context& ioIn, Adaptor&& adaptorIn,
42                    std::function<void(Connection&)> openHandlerIn,
43                    std::function<void(Connection&)> closeHandlerIn) :
44         adaptor(std::move(adaptorIn)),
45         ioc(ioIn), timer(ioc), openHandler(std::move(openHandlerIn)),
46         closeHandler(std::move(closeHandlerIn))
47 
48     {
49         BMCWEB_LOG_DEBUG("SseConnectionImpl: SSE constructor {}", logPtr(this));
50     }
51 
52     ConnectionImpl(const ConnectionImpl&) = delete;
53     ConnectionImpl(const ConnectionImpl&&) = delete;
54     ConnectionImpl& operator=(const ConnectionImpl&) = delete;
55     ConnectionImpl& operator=(const ConnectionImpl&&) = delete;
56 
57     ~ConnectionImpl() override
58     {
59         BMCWEB_LOG_DEBUG("SSE ConnectionImpl: SSE destructor {}", logPtr(this));
60     }
61 
62     boost::asio::io_context& getIoContext() override
63     {
64         return static_cast<boost::asio::io_context&>(
65             adaptor.get_executor().context());
66     }
67 
68     void start()
69     {
70         if (!openHandler)
71         {
72             BMCWEB_LOG_CRITICAL("No open handler???");
73             return;
74         }
75         openHandler(*this);
76         sendSSEHeader();
77     }
78 
79     void close(const std::string_view msg) override
80     {
81         BMCWEB_LOG_DEBUG("Closing connection with reason {}", msg);
82         // send notification to handler for cleanup
83         if (closeHandler)
84         {
85             closeHandler(*this);
86         }
87         BMCWEB_LOG_DEBUG("Closing SSE connection {} - {}", logPtr(this), msg);
88         boost::beast::get_lowest_layer(adaptor).close();
89     }
90 
91     void sendSSEHeader()
92     {
93         BMCWEB_LOG_DEBUG("Starting SSE connection");
94 
95         res.set(boost::beast::http::field::content_type, "text/event-stream");
96         boost::beast::http::response_serializer<BodyType>& serial =
97             serializer.emplace(res);
98 
99         boost::beast::http::async_write_header(
100             adaptor, serial,
101             std::bind_front(&ConnectionImpl::sendSSEHeaderCallback, this,
102                             shared_from_this()));
103     }
104 
105     void sendSSEHeaderCallback(const std::shared_ptr<Connection>& /*self*/,
106                                const boost::system::error_code& ec,
107                                size_t /*bytesSent*/)
108     {
109         serializer.reset();
110         if (ec)
111         {
112             BMCWEB_LOG_ERROR("Error sending header{}", ec);
113             close("async_write_header failed");
114             return;
115         }
116         BMCWEB_LOG_DEBUG("SSE header sent - Connection established");
117 
118         // SSE stream header sent, So let us setup monitor.
119         // Any read data on this stream will be error in case of SSE.
120         adaptor.async_read_some(boost::asio::buffer(buffer),
121                                 std::bind_front(&ConnectionImpl::afterReadError,
122                                                 this, shared_from_this()));
123     }
124 
125     void afterReadError(const std::shared_ptr<Connection>& /*self*/,
126                         const boost::system::error_code& ec, size_t bytesRead)
127     {
128         BMCWEB_LOG_DEBUG("Read {}", bytesRead);
129         if (ec == boost::asio::error::operation_aborted)
130         {
131             return;
132         }
133         if (ec)
134         {
135             BMCWEB_LOG_ERROR("Read error: {}", ec);
136         }
137 
138         close("Close SSE connection");
139     }
140 
141     void doWrite()
142     {
143         if (doingWrite)
144         {
145             return;
146         }
147         if (inputBuffer.size() == 0)
148         {
149             BMCWEB_LOG_DEBUG("inputBuffer is empty... Bailing out");
150             return;
151         }
152         startTimeout();
153         doingWrite = true;
154 
155         adaptor.async_write_some(
156             inputBuffer.data(),
157             std::bind_front(&ConnectionImpl::doWriteCallback, this,
158                             shared_from_this()));
159     }
160 
161     void doWriteCallback(const std::shared_ptr<Connection>& /*self*/,
162                          const boost::beast::error_code& ec,
163                          size_t bytesTransferred)
164     {
165         timer.cancel();
166         doingWrite = false;
167         inputBuffer.consume(bytesTransferred);
168 
169         if (ec == boost::asio::error::eof)
170         {
171             BMCWEB_LOG_ERROR("async_write_some() SSE stream closed");
172             close("SSE stream closed");
173             return;
174         }
175 
176         if (ec)
177         {
178             BMCWEB_LOG_ERROR("async_write_some() failed: {}", ec.message());
179             close("async_write_some failed");
180             return;
181         }
182         BMCWEB_LOG_DEBUG("async_write_some() bytes transferred: {}",
183                          bytesTransferred);
184 
185         doWrite();
186     }
187 
188     void sendEvent(std::string_view id, std::string_view msg) override
189     {
190         if (msg.empty())
191         {
192             BMCWEB_LOG_DEBUG("Empty data, bailing out.");
193             return;
194         }
195 
196         dataFormat(id, msg);
197 
198         doWrite();
199     }
200 
201     void dataFormat(std::string_view id, std::string_view msg)
202     {
203         constexpr size_t bufferLimit = 10485760U; // 10MB
204         if (id.size() + msg.size() + inputBuffer.size() >= bufferLimit)
205         {
206             BMCWEB_LOG_ERROR("SSE Buffer overflow while waiting for client");
207             close("Buffer overflow");
208             return;
209         }
210         std::string rawData;
211         if (!id.empty())
212         {
213             rawData += "id: ";
214             rawData.append(id);
215             rawData += "\n";
216         }
217 
218         rawData += "data: ";
219         for (char character : msg)
220         {
221             rawData += character;
222             if (character == '\n')
223             {
224                 rawData += "data: ";
225             }
226         }
227         rawData += "\n\n";
228 
229         boost::asio::buffer_copy(inputBuffer.prepare(rawData.size()),
230                                  boost::asio::buffer(rawData));
231         inputBuffer.commit(rawData.size());
232     }
233 
234     void startTimeout()
235     {
236         std::weak_ptr<Connection> weakSelf = weak_from_this();
237         timer.expires_after(std::chrono::seconds(30));
238         timer.async_wait(std::bind_front(&ConnectionImpl::onTimeoutCallback,
239                                          this, weak_from_this()));
240     }
241 
242     void onTimeoutCallback(const std::weak_ptr<Connection>& weakSelf,
243                            const boost::system::error_code& ec)
244     {
245         std::shared_ptr<Connection> self = weakSelf.lock();
246         if (!self)
247         {
248             BMCWEB_LOG_CRITICAL("{} Failed to capture connection",
249                                 logPtr(self.get()));
250             return;
251         }
252 
253         if (ec == boost::asio::error::operation_aborted)
254         {
255             BMCWEB_LOG_DEBUG("Timer operation aborted");
256             // Canceled wait means the path succeeded.
257             return;
258         }
259         if (ec)
260         {
261             BMCWEB_LOG_CRITICAL("{} timer failed {}", logPtr(self.get()), ec);
262         }
263 
264         BMCWEB_LOG_WARNING("{} Connection timed out, closing",
265                            logPtr(self.get()));
266 
267         self->close("closing connection");
268     }
269 
270   private:
271     std::array<char, 1> buffer{};
272     boost::beast::multi_buffer inputBuffer;
273 
274     Adaptor adaptor;
275 
276     using BodyType = bmcweb::HttpBody;
277     boost::beast::http::response<BodyType> res;
278     std::optional<boost::beast::http::response_serializer<BodyType>> serializer;
279     boost::asio::io_context& ioc;
280     boost::asio::steady_timer timer;
281     bool doingWrite = false;
282 
283     std::function<void(Connection&)> openHandler;
284     std::function<void(Connection&)> closeHandler;
285 };
286 } // namespace sse_socket
287 } // namespace crow
288