xref: /openbmc/bmcweb/http/server_sent_event.hpp (revision 8db83747)
1 #pragma once
2 #include "http_body.hpp"
3 #include "http_request.hpp"
4 #include "http_response.hpp"
5 
6 #include <boost/asio/buffer.hpp>
7 #include <boost/asio/steady_timer.hpp>
8 #include <boost/beast/core/multi_buffer.hpp>
9 #include <boost/beast/websocket.hpp>
10 
11 #include <array>
12 #include <cstddef>
13 #include <functional>
14 #include <optional>
15 
16 namespace crow
17 {
18 
19 namespace sse_socket
20 {
21 struct Connection : public std::enable_shared_from_this<Connection>
22 {
23   public:
24     Connection() = default;
25 
26     Connection(const Connection&) = delete;
27     Connection(Connection&&) = delete;
28     Connection& operator=(const Connection&) = delete;
29     Connection& operator=(const Connection&&) = delete;
30     virtual ~Connection() = default;
31 
32     virtual boost::asio::io_context& getIoContext() = 0;
33     virtual void close(std::string_view msg = "quit") = 0;
34     virtual void sendEvent(std::string_view id, std::string_view msg) = 0;
35 };
36 
37 template <typename Adaptor>
38 class ConnectionImpl : public Connection
39 {
40   public:
41     ConnectionImpl(Adaptor&& adaptorIn,
42                    std::function<void(Connection&)> openHandlerIn,
43                    std::function<void(Connection&)> closeHandlerIn) :
44         adaptor(std::move(adaptorIn)),
45         timer(static_cast<boost::asio::io_context&>(
46             adaptor.get_executor().context())),
47         openHandler(std::move(openHandlerIn)),
48         closeHandler(std::move(closeHandlerIn))
49 
50     {
51         BMCWEB_LOG_DEBUG("SseConnectionImpl: SSE constructor {}", logPtr(this));
52     }
53 
54     ConnectionImpl(const ConnectionImpl&) = delete;
55     ConnectionImpl(const ConnectionImpl&&) = delete;
56     ConnectionImpl& operator=(const ConnectionImpl&) = delete;
57     ConnectionImpl& operator=(const ConnectionImpl&&) = delete;
58 
59     ~ConnectionImpl() override
60     {
61         BMCWEB_LOG_DEBUG("SSE ConnectionImpl: SSE destructor {}", logPtr(this));
62     }
63 
64     boost::asio::io_context& getIoContext() override
65     {
66         return static_cast<boost::asio::io_context&>(
67             adaptor.get_executor().context());
68     }
69 
70     void start()
71     {
72         if (!openHandler)
73         {
74             BMCWEB_LOG_CRITICAL("No open handler???");
75             return;
76         }
77         openHandler(*this);
78         sendSSEHeader();
79     }
80 
81     void close(const std::string_view msg) override
82     {
83         BMCWEB_LOG_DEBUG("Closing connection with reason {}", msg);
84         // send notification to handler for cleanup
85         if (closeHandler)
86         {
87             closeHandler(*this);
88         }
89         BMCWEB_LOG_DEBUG("Closing SSE connection {} - {}", logPtr(this), msg);
90         boost::beast::get_lowest_layer(adaptor).close();
91     }
92 
93     void sendSSEHeader()
94     {
95         BMCWEB_LOG_DEBUG("Starting SSE connection");
96 
97         res.set(boost::beast::http::field::content_type, "text/event-stream");
98         boost::beast::http::response_serializer<BodyType>& serial =
99             serializer.emplace(res);
100 
101         boost::beast::http::async_write_header(
102             adaptor, serial,
103             std::bind_front(&ConnectionImpl::sendSSEHeaderCallback, this,
104                             shared_from_this()));
105     }
106 
107     void sendSSEHeaderCallback(const std::shared_ptr<Connection>& /*self*/,
108                                const boost::system::error_code& ec,
109                                size_t /*bytesSent*/)
110     {
111         serializer.reset();
112         if (ec)
113         {
114             BMCWEB_LOG_ERROR("Error sending header{}", ec);
115             close("async_write_header failed");
116             return;
117         }
118         BMCWEB_LOG_DEBUG("SSE header sent - Connection established");
119 
120         // SSE stream header sent, So let us setup monitor.
121         // Any read data on this stream will be error in case of SSE.
122         adaptor.async_read_some(boost::asio::buffer(buffer),
123                                 std::bind_front(&ConnectionImpl::afterReadError,
124                                                 this, shared_from_this()));
125     }
126 
127     void afterReadError(const std::shared_ptr<Connection>& /*self*/,
128                         const boost::system::error_code& ec, size_t bytesRead)
129     {
130         BMCWEB_LOG_DEBUG("Read {}", bytesRead);
131         if (ec == boost::asio::error::operation_aborted)
132         {
133             return;
134         }
135         if (ec)
136         {
137             BMCWEB_LOG_ERROR("Read error: {}", ec);
138         }
139 
140         close("Close SSE connection");
141     }
142 
143     void doWrite()
144     {
145         if (doingWrite)
146         {
147             return;
148         }
149         if (inputBuffer.size() == 0)
150         {
151             BMCWEB_LOG_DEBUG("inputBuffer is empty... Bailing out");
152             return;
153         }
154         startTimeout();
155         doingWrite = true;
156 
157         adaptor.async_write_some(
158             inputBuffer.data(),
159             std::bind_front(&ConnectionImpl::doWriteCallback, this,
160                             shared_from_this()));
161     }
162 
163     void doWriteCallback(const std::shared_ptr<Connection>& /*self*/,
164                          const boost::beast::error_code& ec,
165                          size_t bytesTransferred)
166     {
167         timer.cancel();
168         doingWrite = false;
169         inputBuffer.consume(bytesTransferred);
170 
171         if (ec == boost::asio::error::eof)
172         {
173             BMCWEB_LOG_ERROR("async_write_some() SSE stream closed");
174             close("SSE stream closed");
175             return;
176         }
177 
178         if (ec)
179         {
180             BMCWEB_LOG_ERROR("async_write_some() failed: {}", ec.message());
181             close("async_write_some failed");
182             return;
183         }
184         BMCWEB_LOG_DEBUG("async_write_some() bytes transferred: {}",
185                          bytesTransferred);
186 
187         doWrite();
188     }
189 
190     void sendEvent(std::string_view id, std::string_view msg) override
191     {
192         if (msg.empty())
193         {
194             BMCWEB_LOG_DEBUG("Empty data, bailing out.");
195             return;
196         }
197 
198         dataFormat(id, msg);
199 
200         doWrite();
201     }
202 
203     void dataFormat(std::string_view id, std::string_view msg)
204     {
205         constexpr size_t bufferLimit = 10485760U; // 10MB
206         if (id.size() + msg.size() + inputBuffer.size() >= bufferLimit)
207         {
208             BMCWEB_LOG_ERROR("SSE Buffer overflow while waiting for client");
209             close("Buffer overflow");
210             return;
211         }
212         std::string rawData;
213         if (!id.empty())
214         {
215             rawData += "id: ";
216             rawData.append(id);
217             rawData += "\n";
218         }
219 
220         rawData += "data: ";
221         for (char character : msg)
222         {
223             rawData += character;
224             if (character == '\n')
225             {
226                 rawData += "data: ";
227             }
228         }
229         rawData += "\n\n";
230 
231         size_t copied = boost::asio::buffer_copy(
232             inputBuffer.prepare(rawData.size()), boost::asio::buffer(rawData));
233         inputBuffer.commit(copied);
234     }
235 
236     void startTimeout()
237     {
238         std::weak_ptr<Connection> weakSelf = weak_from_this();
239         timer.expires_after(std::chrono::seconds(30));
240         timer.async_wait(std::bind_front(&ConnectionImpl::onTimeoutCallback,
241                                          this, weak_from_this()));
242     }
243 
244     void onTimeoutCallback(const std::weak_ptr<Connection>& weakSelf,
245                            const boost::system::error_code& ec)
246     {
247         std::shared_ptr<Connection> self = weakSelf.lock();
248         if (!self)
249         {
250             BMCWEB_LOG_CRITICAL("{} Failed to capture connection",
251                                 logPtr(self.get()));
252             return;
253         }
254 
255         if (ec == boost::asio::error::operation_aborted)
256         {
257             BMCWEB_LOG_DEBUG("Timer operation aborted");
258             // Canceled wait means the path succeeded.
259             return;
260         }
261         if (ec)
262         {
263             BMCWEB_LOG_CRITICAL("{} timer failed {}", logPtr(self.get()), ec);
264         }
265 
266         BMCWEB_LOG_WARNING("{} Connection timed out, closing",
267                            logPtr(self.get()));
268 
269         self->close("closing connection");
270     }
271 
272   private:
273     std::array<char, 1> buffer{};
274     boost::beast::multi_buffer inputBuffer;
275 
276     Adaptor adaptor;
277 
278     using BodyType = bmcweb::HttpBody;
279     boost::beast::http::response<BodyType> res;
280     std::optional<boost::beast::http::response_serializer<BodyType>> serializer;
281     boost::asio::steady_timer timer;
282     bool doingWrite = false;
283 
284     std::function<void(Connection&)> openHandler;
285     std::function<void(Connection&)> closeHandler;
286 };
287 } // namespace sse_socket
288 } // namespace crow
289