xref: /openbmc/bmcweb/http/server_sent_event.hpp (revision 8f79c5b6)
1 #pragma once
2 #include "http_request.hpp"
3 #include "http_response.hpp"
4 
5 #include <boost/asio/buffer.hpp>
6 #include <boost/asio/steady_timer.hpp>
7 #include <boost/beast/core/multi_buffer.hpp>
8 #include <boost/beast/websocket.hpp>
9 
10 #include <array>
11 #include <cstddef>
12 #include <functional>
13 #include <optional>
14 
15 namespace crow
16 {
17 
18 namespace sse_socket
19 {
20 struct Connection : std::enable_shared_from_this<Connection>
21 {
22   public:
23     Connection() = default;
24 
25     Connection(const Connection&) = delete;
26     Connection(Connection&&) = delete;
27     Connection& operator=(const Connection&) = delete;
28     Connection& operator=(const Connection&&) = delete;
29     virtual ~Connection() = default;
30 
31     virtual boost::asio::io_context& getIoContext() = 0;
32     virtual void close(std::string_view msg = "quit") = 0;
33     virtual void sendEvent(std::string_view id, std::string_view msg) = 0;
34 };
35 
36 template <typename Adaptor>
37 class ConnectionImpl : public Connection
38 {
39   public:
40     ConnectionImpl(boost::asio::io_context& ioIn, Adaptor&& adaptorIn,
41                    std::function<void(Connection&)> openHandlerIn,
42                    std::function<void(Connection&)> closeHandlerIn) :
43         adaptor(std::move(adaptorIn)),
44         ioc(ioIn), timer(ioc), openHandler(std::move(openHandlerIn)),
45         closeHandler(std::move(closeHandlerIn))
46 
47     {
48         BMCWEB_LOG_DEBUG("SseConnectionImpl: SSE constructor {}", logPtr(this));
49     }
50 
51     ConnectionImpl(const ConnectionImpl&) = delete;
52     ConnectionImpl(const ConnectionImpl&&) = delete;
53     ConnectionImpl& operator=(const ConnectionImpl&) = delete;
54     ConnectionImpl& operator=(const ConnectionImpl&&) = delete;
55 
56     ~ConnectionImpl() override
57     {
58         BMCWEB_LOG_DEBUG("SSE ConnectionImpl: SSE destructor {}", logPtr(this));
59     }
60 
61     boost::asio::io_context& getIoContext() override
62     {
63         return static_cast<boost::asio::io_context&>(
64             adaptor.get_executor().context());
65     }
66 
67     void start()
68     {
69         if (!openHandler)
70         {
71             BMCWEB_LOG_CRITICAL("No open handler???");
72             return;
73         }
74         openHandler(*this);
75         sendSSEHeader();
76     }
77 
78     void close(const std::string_view msg) override
79     {
80         BMCWEB_LOG_DEBUG("Closing connection with reason {}", msg);
81         // send notification to handler for cleanup
82         if (closeHandler)
83         {
84             closeHandler(*this);
85         }
86         BMCWEB_LOG_DEBUG("Closing SSE connection {} - {}", logPtr(this), msg);
87         boost::beast::get_lowest_layer(adaptor).close();
88     }
89 
90     void sendSSEHeader()
91     {
92         BMCWEB_LOG_DEBUG("Starting SSE connection");
93 
94         res.set(boost::beast::http::field::content_type, "text/event-stream");
95         boost::beast::http::response_serializer<BodyType>& serial =
96             serializer.emplace(res);
97 
98         boost::beast::http::async_write_header(
99             adaptor, serial,
100             std::bind_front(&ConnectionImpl::sendSSEHeaderCallback, this,
101                             shared_from_this()));
102     }
103 
104     void sendSSEHeaderCallback(const std::shared_ptr<Connection>& /*self*/,
105                                const boost::system::error_code& ec,
106                                size_t /*bytesSent*/)
107     {
108         serializer.reset();
109         if (ec)
110         {
111             BMCWEB_LOG_ERROR("Error sending header{}", ec);
112             close("async_write_header failed");
113             return;
114         }
115         BMCWEB_LOG_DEBUG("SSE header sent - Connection established");
116 
117         // SSE stream header sent, So let us setup monitor.
118         // Any read data on this stream will be error in case of SSE.
119         adaptor.async_read_some(boost::asio::buffer(buffer),
120                                 std::bind_front(&ConnectionImpl::afterReadError,
121                                                 this, shared_from_this()));
122     }
123 
124     void afterReadError(const std::shared_ptr<Connection>& /*self*/,
125                         const boost::system::error_code& ec, size_t bytesRead)
126     {
127         BMCWEB_LOG_DEBUG("Read {}", bytesRead);
128         if (ec == boost::asio::error::operation_aborted)
129         {
130             return;
131         }
132         if (ec)
133         {
134             BMCWEB_LOG_ERROR("Read error: {}", ec);
135         }
136 
137         close("Close SSE connection");
138     }
139 
140     void doWrite()
141     {
142         if (doingWrite)
143         {
144             return;
145         }
146         if (inputBuffer.size() == 0)
147         {
148             BMCWEB_LOG_DEBUG("inputBuffer is empty... Bailing out");
149             return;
150         }
151         startTimeout();
152         doingWrite = true;
153 
154         adaptor.async_write_some(
155             inputBuffer.data(),
156             std::bind_front(&ConnectionImpl::doWriteCallback, this,
157                             shared_from_this()));
158     }
159 
160     void doWriteCallback(const std::shared_ptr<Connection>& /*self*/,
161                          const boost::beast::error_code& ec,
162                          size_t bytesTransferred)
163     {
164         timer.cancel();
165         doingWrite = false;
166         inputBuffer.consume(bytesTransferred);
167 
168         if (ec == boost::asio::error::eof)
169         {
170             BMCWEB_LOG_ERROR("async_write_some() SSE stream closed");
171             close("SSE stream closed");
172             return;
173         }
174 
175         if (ec)
176         {
177             BMCWEB_LOG_ERROR("async_write_some() failed: {}", ec.message());
178             close("async_write_some failed");
179             return;
180         }
181         BMCWEB_LOG_DEBUG("async_write_some() bytes transferred: {}",
182                          bytesTransferred);
183 
184         doWrite();
185     }
186 
187     void sendEvent(std::string_view id, std::string_view msg) override
188     {
189         if (msg.empty())
190         {
191             BMCWEB_LOG_DEBUG("Empty data, bailing out.");
192             return;
193         }
194 
195         dataFormat(id, msg);
196 
197         doWrite();
198     }
199 
200     void dataFormat(std::string_view id, std::string_view msg)
201     {
202         constexpr size_t bufferLimit = 10485760U; // 10MB
203         if (id.size() + msg.size() + inputBuffer.size() >= bufferLimit)
204         {
205             BMCWEB_LOG_ERROR("SSE Buffer overflow while waiting for client");
206             close("Buffer overflow");
207             return;
208         }
209         std::string rawData;
210         if (!id.empty())
211         {
212             rawData += "id: ";
213             rawData.append(id);
214             rawData += "\n";
215         }
216 
217         rawData += "data: ";
218         for (char character : msg)
219         {
220             rawData += character;
221             if (character == '\n')
222             {
223                 rawData += "data: ";
224             }
225         }
226         rawData += "\n\n";
227 
228         boost::asio::buffer_copy(inputBuffer.prepare(rawData.size()),
229                                  boost::asio::buffer(rawData));
230         inputBuffer.commit(rawData.size());
231     }
232 
233     void startTimeout()
234     {
235         std::weak_ptr<Connection> weakSelf = weak_from_this();
236         timer.expires_after(std::chrono::seconds(30));
237         timer.async_wait(std::bind_front(&ConnectionImpl::onTimeoutCallback,
238                                          this, weak_from_this()));
239     }
240 
241     void onTimeoutCallback(const std::weak_ptr<Connection>& weakSelf,
242                            const boost::system::error_code& ec)
243     {
244         std::shared_ptr<Connection> self = weakSelf.lock();
245         if (!self)
246         {
247             BMCWEB_LOG_CRITICAL("{} Failed to capture connection",
248                                 logPtr(self.get()));
249             return;
250         }
251 
252         if (ec == boost::asio::error::operation_aborted)
253         {
254             BMCWEB_LOG_DEBUG("Timer operation aborted");
255             // Canceled wait means the path succeeded.
256             return;
257         }
258         if (ec)
259         {
260             BMCWEB_LOG_CRITICAL("{} timer failed {}", logPtr(self.get()), ec);
261         }
262 
263         BMCWEB_LOG_WARNING("{} Connection timed out, closing",
264                            logPtr(self.get()));
265 
266         self->close("closing connection");
267     }
268 
269   private:
270     std::array<char, 1> buffer{};
271     boost::beast::multi_buffer inputBuffer;
272 
273     Adaptor adaptor;
274 
275     using BodyType = bmcweb::FileBody;
276     boost::beast::http::response<BodyType> res;
277     std::optional<boost::beast::http::response_serializer<BodyType>> serializer;
278     boost::asio::io_context& ioc;
279     boost::asio::steady_timer timer;
280     bool doingWrite = false;
281 
282     std::function<void(Connection&)> openHandler;
283     std::function<void(Connection&)> closeHandler;
284 };
285 } // namespace sse_socket
286 } // namespace crow
287