xref: /openbmc/bmcweb/http/server_sent_event.hpp (revision 2ae81db9)
1 #pragma once
2 #include "dbus_singleton.hpp"
3 #include "http_request.hpp"
4 #include "http_response.hpp"
5 
6 #include <boost/asio/buffer.hpp>
7 #include <boost/asio/steady_timer.hpp>
8 #include <boost/beast/core/multi_buffer.hpp>
9 #include <boost/beast/http/buffer_body.hpp>
10 #include <boost/beast/websocket.hpp>
11 
12 #include <array>
13 #include <functional>
14 
15 #ifdef BMCWEB_ENABLE_SSL
16 #include <boost/beast/websocket/ssl.hpp>
17 #endif
18 
19 namespace crow
20 {
21 
22 namespace sse_socket
23 {
24 struct Connection : std::enable_shared_from_this<Connection>
25 {
26   public:
27     Connection() = default;
28 
29     Connection(const Connection&) = delete;
30     Connection(Connection&&) = delete;
31     Connection& operator=(const Connection&) = delete;
32     Connection& operator=(const Connection&&) = delete;
33     virtual ~Connection() = default;
34 
35     virtual boost::asio::io_context& getIoContext() = 0;
36     virtual void close(std::string_view msg = "quit") = 0;
37     virtual void sendEvent(std::string_view id, std::string_view msg) = 0;
38 };
39 
40 template <typename Adaptor>
41 class ConnectionImpl : public Connection
42 {
43   public:
44     ConnectionImpl(Adaptor&& adaptorIn,
45                    std::function<void(Connection&)> openHandlerIn,
46                    std::function<void(Connection&)> closeHandlerIn) :
47         adaptor(std::move(adaptorIn)),
48         timer(ioc), openHandler(std::move(openHandlerIn)),
49         closeHandler(std::move(closeHandlerIn))
50     {
51         BMCWEB_LOG_DEBUG("SseConnectionImpl: SSE constructor {}", logPtr(this));
52     }
53 
54     ConnectionImpl(const ConnectionImpl&) = delete;
55     ConnectionImpl(const ConnectionImpl&&) = delete;
56     ConnectionImpl& operator=(const ConnectionImpl&) = delete;
57     ConnectionImpl& operator=(const ConnectionImpl&&) = delete;
58 
59     ~ConnectionImpl() override
60     {
61         BMCWEB_LOG_DEBUG("SSE ConnectionImpl: SSE destructor {}", logPtr(this));
62     }
63 
64     boost::asio::io_context& getIoContext() override
65     {
66         return static_cast<boost::asio::io_context&>(
67             adaptor.get_executor().context());
68     }
69 
70     void start()
71     {
72         if (!openHandler)
73         {
74             BMCWEB_LOG_CRITICAL("No open handler???");
75             return;
76         }
77         openHandler(*this);
78     }
79 
80     void close(const std::string_view msg) override
81     {
82         // send notification to handler for cleanup
83         if (closeHandler)
84         {
85             closeHandler(*this);
86         }
87         BMCWEB_LOG_DEBUG("Closing SSE connection {} - {}", logPtr(this), msg);
88         boost::beast::get_lowest_layer(adaptor).close();
89     }
90 
91     void sendSSEHeader()
92     {
93         BMCWEB_LOG_DEBUG("Starting SSE connection");
94         using BodyType = boost::beast::http::buffer_body;
95         boost::beast::http::response<BodyType> res(
96             boost::beast::http::status::ok, 11, BodyType{});
97         res.set(boost::beast::http::field::content_type, "text/event-stream");
98         res.body().more = true;
99         boost::beast::http::response_serializer<BodyType>& serial =
100             serializer.emplace(std::move(res));
101 
102         boost::beast::http::async_write_header(
103             adaptor, serial,
104             std::bind_front(&ConnectionImpl::sendSSEHeaderCallback, this,
105                             shared_from_this()));
106     }
107 
108     void sendSSEHeaderCallback(const std::shared_ptr<Connection>& /*self*/,
109                                const boost::system::error_code& ec,
110                                size_t /*bytesSent*/)
111     {
112         serializer.reset();
113         if (ec)
114         {
115             BMCWEB_LOG_ERROR("Error sending header{}", ec);
116             close("async_write_header failed");
117             return;
118         }
119         BMCWEB_LOG_DEBUG("SSE header sent - Connection established");
120 
121         serializer.reset();
122 
123         // SSE stream header sent, So let us setup monitor.
124         // Any read data on this stream will be error in case of SSE.
125 
126         adaptor.async_wait(boost::asio::ip::tcp::socket::wait_error,
127                            std::bind_front(&ConnectionImpl::afterReadError,
128                                            this, shared_from_this()));
129     }
130 
131     void afterReadError(const std::shared_ptr<Connection>& /*self*/,
132                         const boost::system::error_code& ec)
133     {
134         if (ec == boost::asio::error::operation_aborted)
135         {
136             return;
137         }
138         if (ec)
139         {
140             BMCWEB_LOG_ERROR("Read error: {}", ec);
141         }
142 
143         close("Close SSE connection");
144     }
145 
146     void doWrite()
147     {
148         if (doingWrite)
149         {
150             return;
151         }
152         if (inputBuffer.size() == 0)
153         {
154             BMCWEB_LOG_DEBUG("inputBuffer is empty... Bailing out");
155             return;
156         }
157         startTimeout();
158         doingWrite = true;
159 
160         adaptor.async_write_some(
161             inputBuffer.data(),
162             std::bind_front(&ConnectionImpl::doWriteCallback, this,
163                             weak_from_this()));
164     }
165 
166     void doWriteCallback(const std::weak_ptr<Connection>& weak,
167                          const boost::beast::error_code& ec,
168                          size_t bytesTransferred)
169     {
170         auto self = weak.lock();
171         if (self == nullptr)
172         {
173             return;
174         }
175         timer.cancel();
176         doingWrite = false;
177         inputBuffer.consume(bytesTransferred);
178 
179         if (ec == boost::asio::error::eof)
180         {
181             BMCWEB_LOG_ERROR("async_write_some() SSE stream closed");
182             close("SSE stream closed");
183             return;
184         }
185 
186         if (ec)
187         {
188             BMCWEB_LOG_ERROR("async_write_some() failed: {}", ec.message());
189             close("async_write_some failed");
190             return;
191         }
192         BMCWEB_LOG_DEBUG("async_write_some() bytes transferred: {}",
193                          bytesTransferred);
194 
195         doWrite();
196     }
197 
198     void sendEvent(std::string_view id, std::string_view msg) override
199     {
200         if (msg.empty())
201         {
202             BMCWEB_LOG_DEBUG("Empty data, bailing out.");
203             return;
204         }
205 
206         dataFormat(id, msg);
207 
208         doWrite();
209     }
210 
211     void dataFormat(std::string_view id, std::string_view msg)
212     {
213         std::string rawData;
214         if (!id.empty())
215         {
216             rawData += "id: ";
217             rawData.append(id);
218             rawData += "\n";
219         }
220 
221         rawData += "data: ";
222         for (char character : msg)
223         {
224             rawData += character;
225             if (character == '\n')
226             {
227                 rawData += "data: ";
228             }
229         }
230         rawData += "\n\n";
231 
232         boost::asio::buffer_copy(inputBuffer.prepare(rawData.size()),
233                                  boost::asio::buffer(rawData));
234         inputBuffer.commit(rawData.size());
235     }
236 
237     void startTimeout()
238     {
239         std::weak_ptr<Connection> weakSelf = weak_from_this();
240         timer.expires_after(std::chrono::seconds(30));
241         timer.async_wait(std::bind_front(&ConnectionImpl::onTimeoutCallback,
242                                          this, weak_from_this()));
243     }
244 
245     void onTimeoutCallback(const std::weak_ptr<Connection>& weakSelf,
246                            const boost::system::error_code& ec)
247     {
248         std::shared_ptr<Connection> self = weakSelf.lock();
249         if (!self)
250         {
251             BMCWEB_LOG_CRITICAL("{} Failed to capture connection",
252                                 logPtr(self.get()));
253             return;
254         }
255 
256         if (ec == boost::asio::error::operation_aborted)
257         {
258             BMCWEB_LOG_DEBUG("operation aborted");
259             // Canceled wait means the path succeeded.
260             return;
261         }
262         if (ec)
263         {
264             BMCWEB_LOG_CRITICAL("{} timer failed {}", logPtr(self.get()), ec);
265         }
266 
267         BMCWEB_LOG_WARNING("{}Connection timed out, closing",
268                            logPtr(self.get()));
269 
270         self->close("closing connection");
271     }
272 
273   private:
274     Adaptor adaptor;
275 
276     boost::beast::multi_buffer inputBuffer;
277 
278     std::optional<boost::beast::http::response_serializer<
279         boost::beast::http::buffer_body>>
280         serializer;
281     boost::asio::io_context& ioc =
282         crow::connections::systemBus->get_io_context();
283     boost::asio::steady_timer timer;
284     bool doingWrite = false;
285 
286     std::function<void(Connection&)> openHandler;
287     std::function<void(Connection&)> closeHandler;
288 };
289 } // namespace sse_socket
290 } // namespace crow
291