xref: /openbmc/bmcweb/http/server_sent_event.hpp (revision 1b8b02a4)
1 #pragma once
2 #include "dbus_singleton.hpp"
3 #include "http_request.hpp"
4 #include "http_response.hpp"
5 
6 #include <boost/algorithm/string/predicate.hpp>
7 #include <boost/asio/buffer.hpp>
8 #include <boost/asio/steady_timer.hpp>
9 #include <boost/beast/core/multi_buffer.hpp>
10 #include <boost/beast/http/buffer_body.hpp>
11 #include <boost/beast/websocket.hpp>
12 
13 #include <array>
14 #include <functional>
15 
16 #ifdef BMCWEB_ENABLE_SSL
17 #include <boost/beast/websocket/ssl.hpp>
18 #endif
19 
20 namespace crow
21 {
22 
23 namespace sse_socket
24 {
25 struct Connection : std::enable_shared_from_this<Connection>
26 {
27   public:
28     Connection() = default;
29 
30     Connection(const Connection&) = delete;
31     Connection(Connection&&) = delete;
32     Connection& operator=(const Connection&) = delete;
33     Connection& operator=(const Connection&&) = delete;
34     virtual ~Connection() = default;
35 
36     virtual boost::asio::io_context& getIoContext() = 0;
37     virtual void close(std::string_view msg = "quit") = 0;
38     virtual void sendEvent(std::string_view id, std::string_view msg) = 0;
39 };
40 
41 template <typename Adaptor>
42 class ConnectionImpl : public Connection
43 {
44   public:
45     ConnectionImpl(Adaptor&& adaptorIn,
46                    std::function<void(Connection&)> openHandlerIn,
47                    std::function<void(Connection&)> closeHandlerIn) :
48         adaptor(std::move(adaptorIn)),
49         timer(ioc), openHandler(std::move(openHandlerIn)),
50         closeHandler(std::move(closeHandlerIn))
51     {
52         BMCWEB_LOG_DEBUG("SseConnectionImpl: SSE constructor {}", logPtr(this));
53     }
54 
55     ConnectionImpl(const ConnectionImpl&) = delete;
56     ConnectionImpl(const ConnectionImpl&&) = delete;
57     ConnectionImpl& operator=(const ConnectionImpl&) = delete;
58     ConnectionImpl& operator=(const ConnectionImpl&&) = delete;
59 
60     ~ConnectionImpl() override
61     {
62         BMCWEB_LOG_DEBUG("SSE ConnectionImpl: SSE destructor {}", logPtr(this));
63     }
64 
65     boost::asio::io_context& getIoContext() override
66     {
67         return static_cast<boost::asio::io_context&>(
68             adaptor.get_executor().context());
69     }
70 
71     void start()
72     {
73         if (!openHandler)
74         {
75             BMCWEB_LOG_CRITICAL("No open handler???");
76             return;
77         }
78         openHandler(*this);
79     }
80 
81     void close(const std::string_view msg) override
82     {
83         // send notification to handler for cleanup
84         if (closeHandler)
85         {
86             closeHandler(*this);
87         }
88         BMCWEB_LOG_DEBUG("Closing SSE connection {} - {}", logPtr(this), msg);
89         boost::beast::get_lowest_layer(adaptor).close();
90     }
91 
92     void sendSSEHeader()
93     {
94         BMCWEB_LOG_DEBUG("Starting SSE connection");
95         using BodyType = boost::beast::http::buffer_body;
96         boost::beast::http::response<BodyType> res(
97             boost::beast::http::status::ok, 11, BodyType{});
98         res.set(boost::beast::http::field::content_type, "text/event-stream");
99         res.body().more = true;
100         boost::beast::http::response_serializer<BodyType>& ser =
101             serializer.emplace(std::move(res));
102 
103         boost::beast::http::async_write_header(
104             adaptor, ser,
105             std::bind_front(&ConnectionImpl::sendSSEHeaderCallback, this,
106                             shared_from_this()));
107     }
108 
109     void sendSSEHeaderCallback(const std::shared_ptr<Connection>& /*self*/,
110                                const boost::system::error_code& ec,
111                                size_t /*bytesSent*/)
112     {
113         serializer.reset();
114         if (ec)
115         {
116             BMCWEB_LOG_ERROR("Error sending header{}", ec);
117             close("async_write_header failed");
118             return;
119         }
120         BMCWEB_LOG_DEBUG("SSE header sent - Connection established");
121 
122         serializer.reset();
123 
124         // SSE stream header sent, So let us setup monitor.
125         // Any read data on this stream will be error in case of SSE.
126 
127         adaptor.async_wait(boost::asio::ip::tcp::socket::wait_error,
128                            std::bind_front(&ConnectionImpl::afterReadError,
129                                            this, shared_from_this()));
130     }
131 
132     void afterReadError(const std::shared_ptr<Connection>& /*self*/,
133                         const boost::system::error_code& ec)
134     {
135         if (ec == boost::asio::error::operation_aborted)
136         {
137             return;
138         }
139         if (ec)
140         {
141             BMCWEB_LOG_ERROR("Read error: {}", ec);
142         }
143 
144         close("Close SSE connection");
145     }
146 
147     void doWrite()
148     {
149         if (doingWrite)
150         {
151             return;
152         }
153         if (inputBuffer.size() == 0)
154         {
155             BMCWEB_LOG_DEBUG("inputBuffer is empty... Bailing out");
156             return;
157         }
158         startTimeout();
159         doingWrite = true;
160 
161         adaptor.async_write_some(
162             inputBuffer.data(),
163             std::bind_front(&ConnectionImpl::doWriteCallback, this,
164                             weak_from_this()));
165     }
166 
167     void doWriteCallback(const std::weak_ptr<Connection>& weak,
168                          const boost::beast::error_code& ec,
169                          size_t bytesTransferred)
170     {
171         auto self = weak.lock();
172         if (self == nullptr)
173         {
174             return;
175         }
176         timer.cancel();
177         doingWrite = false;
178         inputBuffer.consume(bytesTransferred);
179 
180         if (ec == boost::asio::error::eof)
181         {
182             BMCWEB_LOG_ERROR("async_write_some() SSE stream closed");
183             close("SSE stream closed");
184             return;
185         }
186 
187         if (ec)
188         {
189             BMCWEB_LOG_ERROR("async_write_some() failed: {}", ec.message());
190             close("async_write_some failed");
191             return;
192         }
193         BMCWEB_LOG_DEBUG("async_write_some() bytes transferred: {}",
194                          bytesTransferred);
195 
196         doWrite();
197     }
198 
199     void sendEvent(std::string_view id, std::string_view msg) override
200     {
201         if (msg.empty())
202         {
203             BMCWEB_LOG_DEBUG("Empty data, bailing out.");
204             return;
205         }
206 
207         dataFormat(id, msg);
208 
209         doWrite();
210     }
211 
212     void dataFormat(std::string_view id, std::string_view msg)
213     {
214         std::string rawData;
215         if (!id.empty())
216         {
217             rawData += "id: ";
218             rawData.append(id);
219             rawData += "\n";
220         }
221 
222         rawData += "data: ";
223         for (char character : msg)
224         {
225             rawData += character;
226             if (character == '\n')
227             {
228                 rawData += "data: ";
229             }
230         }
231         rawData += "\n\n";
232 
233         boost::asio::buffer_copy(inputBuffer.prepare(rawData.size()),
234                                  boost::asio::buffer(rawData));
235         inputBuffer.commit(rawData.size());
236     }
237 
238     void startTimeout()
239     {
240         std::weak_ptr<Connection> weakSelf = weak_from_this();
241         timer.expires_after(std::chrono::seconds(30));
242         timer.async_wait(std::bind_front(&ConnectionImpl::onTimeoutCallback,
243                                          this, weak_from_this()));
244     }
245 
246     void onTimeoutCallback(const std::weak_ptr<Connection>& weakSelf,
247                            const boost::system::error_code& ec)
248     {
249         std::shared_ptr<Connection> self = weakSelf.lock();
250         if (!self)
251         {
252             BMCWEB_LOG_CRITICAL("{} Failed to capture connection",
253                                 logPtr(self.get()));
254             return;
255         }
256 
257         if (ec == boost::asio::error::operation_aborted)
258         {
259             BMCWEB_LOG_DEBUG("operation aborted");
260             // Canceled wait means the path succeeeded.
261             return;
262         }
263         if (ec)
264         {
265             BMCWEB_LOG_CRITICAL("{} timer failed {}", logPtr(self.get()), ec);
266         }
267 
268         BMCWEB_LOG_WARNING("{}Connection timed out, closing",
269                            logPtr(self.get()));
270 
271         self->close("closing connection");
272     }
273 
274   private:
275     Adaptor adaptor;
276 
277     boost::beast::multi_buffer inputBuffer;
278 
279     std::optional<boost::beast::http::response_serializer<
280         boost::beast::http::buffer_body>>
281         serializer;
282     boost::asio::io_context& ioc =
283         crow::connections::systemBus->get_io_context();
284     boost::asio::steady_timer timer;
285     bool doingWrite = false;
286 
287     std::function<void(Connection&)> openHandler;
288     std::function<void(Connection&)> closeHandler;
289 };
290 } // namespace sse_socket
291 } // namespace crow
292