xref: /openbmc/bmcweb/http/server_sent_event.hpp (revision a14c9113)
1 #pragma once
2 #include "boost_formatters.hpp"
3 #include "http_body.hpp"
4 #include "http_request.hpp"
5 #include "http_response.hpp"
6 
7 #include <boost/asio/buffer.hpp>
8 #include <boost/asio/steady_timer.hpp>
9 #include <boost/beast/core/multi_buffer.hpp>
10 #include <boost/beast/websocket.hpp>
11 
12 #include <array>
13 #include <cstddef>
14 #include <functional>
15 #include <optional>
16 
17 namespace crow
18 {
19 
20 namespace sse_socket
21 {
22 struct Connection : public std::enable_shared_from_this<Connection>
23 {
24   public:
25     Connection() = default;
26 
27     Connection(const Connection&) = delete;
28     Connection(Connection&&) = delete;
29     Connection& operator=(const Connection&) = delete;
30     Connection& operator=(const Connection&&) = delete;
31     virtual ~Connection() = default;
32 
33     virtual boost::asio::io_context& getIoContext() = 0;
34     virtual void close(std::string_view msg = "quit") = 0;
35     virtual void sendSseEvent(std::string_view id, std::string_view msg) = 0;
36 };
37 
38 template <typename Adaptor>
39 class ConnectionImpl : public Connection
40 {
41   public:
42     ConnectionImpl(
43         Adaptor&& adaptorIn,
44         std::function<void(Connection&, const Request&)> openHandlerIn,
45         std::function<void(Connection&)> closeHandlerIn) :
46         adaptor(std::move(adaptorIn)),
47         timer(static_cast<boost::asio::io_context&>(
48             adaptor.get_executor().context())),
49         openHandler(std::move(openHandlerIn)),
50         closeHandler(std::move(closeHandlerIn))
51 
52     {
53         BMCWEB_LOG_DEBUG("SseConnectionImpl: SSE constructor {}", logPtr(this));
54     }
55 
56     ConnectionImpl(const ConnectionImpl&) = delete;
57     ConnectionImpl(const ConnectionImpl&&) = delete;
58     ConnectionImpl& operator=(const ConnectionImpl&) = delete;
59     ConnectionImpl& operator=(const ConnectionImpl&&) = delete;
60 
61     ~ConnectionImpl() override
62     {
63         BMCWEB_LOG_DEBUG("SSE ConnectionImpl: SSE destructor {}", logPtr(this));
64     }
65 
66     boost::asio::io_context& getIoContext() override
67     {
68         return static_cast<boost::asio::io_context&>(
69             adaptor.get_executor().context());
70     }
71 
72     void start(const Request& req)
73     {
74         if (!openHandler)
75         {
76             BMCWEB_LOG_CRITICAL("No open handler???");
77             return;
78         }
79         openHandler(*this, req);
80         sendSSEHeader();
81     }
82 
83     void close(const std::string_view msg) override
84     {
85         BMCWEB_LOG_DEBUG("Closing connection with reason {}", msg);
86         // send notification to handler for cleanup
87         if (closeHandler)
88         {
89             closeHandler(*this);
90         }
91         BMCWEB_LOG_DEBUG("Closing SSE connection {} - {}", logPtr(this), msg);
92         boost::beast::get_lowest_layer(adaptor).close();
93     }
94 
95     void sendSSEHeader()
96     {
97         BMCWEB_LOG_DEBUG("Starting SSE connection");
98 
99         res.set(boost::beast::http::field::content_type, "text/event-stream");
100         boost::beast::http::response_serializer<BodyType>& serial =
101             serializer.emplace(res);
102 
103         boost::beast::http::async_write_header(
104             adaptor, serial,
105             std::bind_front(&ConnectionImpl::sendSSEHeaderCallback, this,
106                             shared_from_this()));
107     }
108 
109     void sendSSEHeaderCallback(const std::shared_ptr<Connection>& /*self*/,
110                                const boost::system::error_code& ec,
111                                size_t /*bytesSent*/)
112     {
113         serializer.reset();
114         if (ec)
115         {
116             BMCWEB_LOG_ERROR("Error sending header{}", ec);
117             close("async_write_header failed");
118             return;
119         }
120         BMCWEB_LOG_DEBUG("SSE header sent - Connection established");
121 
122         // SSE stream header sent, So let us setup monitor.
123         // Any read data on this stream will be error in case of SSE.
124         adaptor.async_read_some(boost::asio::buffer(buffer),
125                                 std::bind_front(&ConnectionImpl::afterReadError,
126                                                 this, shared_from_this()));
127     }
128 
129     void afterReadError(const std::shared_ptr<Connection>& /*self*/,
130                         const boost::system::error_code& ec, size_t bytesRead)
131     {
132         BMCWEB_LOG_DEBUG("Read {}", bytesRead);
133         if (ec == boost::asio::error::operation_aborted)
134         {
135             return;
136         }
137         if (ec)
138         {
139             BMCWEB_LOG_ERROR("Read error: {}", ec);
140         }
141 
142         close("Close SSE connection");
143     }
144 
145     void doWrite()
146     {
147         if (doingWrite)
148         {
149             return;
150         }
151         if (inputBuffer.size() == 0)
152         {
153             BMCWEB_LOG_DEBUG("inputBuffer is empty... Bailing out");
154             return;
155         }
156         startTimeout();
157         doingWrite = true;
158 
159         adaptor.async_write_some(
160             inputBuffer.data(),
161             std::bind_front(&ConnectionImpl::doWriteCallback, this,
162                             shared_from_this()));
163     }
164 
165     void doWriteCallback(const std::shared_ptr<Connection>& /*self*/,
166                          const boost::beast::error_code& ec,
167                          size_t bytesTransferred)
168     {
169         timer.cancel();
170         doingWrite = false;
171         inputBuffer.consume(bytesTransferred);
172 
173         if (ec == boost::asio::error::eof)
174         {
175             BMCWEB_LOG_ERROR("async_write_some() SSE stream closed");
176             close("SSE stream closed");
177             return;
178         }
179 
180         if (ec)
181         {
182             BMCWEB_LOG_ERROR("async_write_some() failed: {}", ec.message());
183             close("async_write_some failed");
184             return;
185         }
186         BMCWEB_LOG_DEBUG("async_write_some() bytes transferred: {}",
187                          bytesTransferred);
188 
189         doWrite();
190     }
191 
192     void sendSseEvent(std::string_view id, std::string_view msg) override
193     {
194         if (msg.empty())
195         {
196             BMCWEB_LOG_DEBUG("Empty data, bailing out.");
197             return;
198         }
199 
200         dataFormat(id, msg);
201 
202         doWrite();
203     }
204 
205     void dataFormat(std::string_view id, std::string_view msg)
206     {
207         constexpr size_t bufferLimit = 10485760U; // 10MB
208         if (id.size() + msg.size() + inputBuffer.size() >= bufferLimit)
209         {
210             BMCWEB_LOG_ERROR("SSE Buffer overflow while waiting for client");
211             close("Buffer overflow");
212             return;
213         }
214         std::string rawData;
215         if (!id.empty())
216         {
217             rawData += "id: ";
218             rawData.append(id);
219             rawData += "\n";
220         }
221 
222         rawData += "data: ";
223         for (char character : msg)
224         {
225             rawData += character;
226             if (character == '\n')
227             {
228                 rawData += "data: ";
229             }
230         }
231         rawData += "\n\n";
232 
233         size_t copied = boost::asio::buffer_copy(
234             inputBuffer.prepare(rawData.size()), boost::asio::buffer(rawData));
235         inputBuffer.commit(copied);
236     }
237 
238     void startTimeout()
239     {
240         std::weak_ptr<Connection> weakSelf = weak_from_this();
241         timer.expires_after(std::chrono::seconds(30));
242         timer.async_wait(std::bind_front(&ConnectionImpl::onTimeoutCallback,
243                                          this, weak_from_this()));
244     }
245 
246     void onTimeoutCallback(const std::weak_ptr<Connection>& weakSelf,
247                            const boost::system::error_code& ec)
248     {
249         std::shared_ptr<Connection> self = weakSelf.lock();
250         if (!self)
251         {
252             BMCWEB_LOG_CRITICAL("{} Failed to capture connection",
253                                 logPtr(self.get()));
254             return;
255         }
256 
257         if (ec == boost::asio::error::operation_aborted)
258         {
259             BMCWEB_LOG_DEBUG("Timer operation aborted");
260             // Canceled wait means the path succeeded.
261             return;
262         }
263         if (ec)
264         {
265             BMCWEB_LOG_CRITICAL("{} timer failed {}", logPtr(self.get()), ec);
266         }
267 
268         BMCWEB_LOG_WARNING("{} Connection timed out, closing",
269                            logPtr(self.get()));
270 
271         self->close("closing connection");
272     }
273 
274   private:
275     std::array<char, 1> buffer{};
276     boost::beast::multi_buffer inputBuffer;
277 
278     Adaptor adaptor;
279 
280     using BodyType = bmcweb::HttpBody;
281     boost::beast::http::response<BodyType> res;
282     std::optional<boost::beast::http::response_serializer<BodyType>> serializer;
283     boost::asio::steady_timer timer;
284     bool doingWrite = false;
285 
286     std::function<void(Connection&, const Request&)> openHandler;
287     std::function<void(Connection&)> closeHandler;
288 };
289 } // namespace sse_socket
290 } // namespace crow
291