xref: /openbmc/bmcweb/http/server_sent_event.hpp (revision 40e9b92ec19acffb46f83a6e55b18974da5d708e)
1 // SPDX-License-Identifier: Apache-2.0
2 // SPDX-FileCopyrightText: Copyright OpenBMC Authors
3 #pragma once
4 #include "boost_formatters.hpp"
5 #include "http_body.hpp"
6 #include "http_request.hpp"
7 #include "http_response.hpp"
8 
9 #include <boost/asio/buffer.hpp>
10 #include <boost/asio/steady_timer.hpp>
11 #include <boost/beast/core/multi_buffer.hpp>
12 #include <boost/beast/websocket.hpp>
13 
14 #include <array>
15 #include <cstddef>
16 #include <functional>
17 #include <optional>
18 
19 namespace crow
20 {
21 
22 namespace sse_socket
23 {
24 struct Connection : public std::enable_shared_from_this<Connection>
25 {
26   public:
27     Connection() = default;
28 
29     Connection(const Connection&) = delete;
30     Connection(Connection&&) = delete;
31     Connection& operator=(const Connection&) = delete;
32     Connection& operator=(const Connection&&) = delete;
33     virtual ~Connection() = default;
34 
35     virtual boost::asio::io_context& getIoContext() = 0;
36     virtual void close(std::string_view msg = "quit") = 0;
37     virtual void sendSseEvent(std::string_view id, std::string_view msg) = 0;
38 };
39 
40 template <typename Adaptor>
41 class ConnectionImpl : public Connection
42 {
43   public:
ConnectionImpl(Adaptor && adaptorIn,std::function<void (Connection &,const Request &)> openHandlerIn,std::function<void (Connection &)> closeHandlerIn)44     ConnectionImpl(
45         Adaptor&& adaptorIn,
46         std::function<void(Connection&, const Request&)> openHandlerIn,
47         std::function<void(Connection&)> closeHandlerIn) :
48         adaptor(std::move(adaptorIn)),
49         timer(static_cast<boost::asio::io_context&>(
50             adaptor.get_executor().context())),
51         openHandler(std::move(openHandlerIn)),
52         closeHandler(std::move(closeHandlerIn))
53 
54     {
55         BMCWEB_LOG_DEBUG("SseConnectionImpl: SSE constructor {}", logPtr(this));
56     }
57 
58     ConnectionImpl(const ConnectionImpl&) = delete;
59     ConnectionImpl(const ConnectionImpl&&) = delete;
60     ConnectionImpl& operator=(const ConnectionImpl&) = delete;
61     ConnectionImpl& operator=(const ConnectionImpl&&) = delete;
62 
~ConnectionImpl()63     ~ConnectionImpl() override
64     {
65         BMCWEB_LOG_DEBUG("SSE ConnectionImpl: SSE destructor {}", logPtr(this));
66     }
67 
getIoContext()68     boost::asio::io_context& getIoContext() override
69     {
70         return static_cast<boost::asio::io_context&>(
71             adaptor.get_executor().context());
72     }
73 
start(const Request & req)74     void start(const Request& req)
75     {
76         BMCWEB_LOG_DEBUG("Starting SSE connection");
77 
78         res.set(boost::beast::http::field::content_type, "text/event-stream");
79         boost::beast::http::response_serializer<BodyType>& serial =
80             serializer.emplace(res);
81 
82         boost::beast::http::async_write_header(
83             adaptor, serial,
84             std::bind_front(&ConnectionImpl::sendSSEHeaderCallback, this,
85                             shared_from_this(), req));
86     }
87 
close(const std::string_view msg)88     void close(const std::string_view msg) override
89     {
90         BMCWEB_LOG_DEBUG("Closing connection with reason {}", msg);
91         // send notification to handler for cleanup
92         if (closeHandler)
93         {
94             closeHandler(*this);
95         }
96         BMCWEB_LOG_DEBUG("Closing SSE connection {} - {}", logPtr(this), msg);
97         boost::beast::get_lowest_layer(adaptor).close();
98     }
99 
sendSSEHeaderCallback(const std::shared_ptr<Connection> &,const Request & req,const boost::system::error_code & ec,size_t)100     void sendSSEHeaderCallback(const std::shared_ptr<Connection>& /*self*/,
101                                const Request& req,
102                                const boost::system::error_code& ec,
103                                size_t /*bytesSent*/)
104     {
105         serializer.reset();
106         if (ec)
107         {
108             BMCWEB_LOG_ERROR("Error sending header{}", ec);
109             close("async_write_header failed");
110             return;
111         }
112         BMCWEB_LOG_DEBUG("SSE header sent - Connection established");
113         if (!openHandler)
114         {
115             BMCWEB_LOG_CRITICAL("No open handler???");
116             return;
117         }
118         openHandler(*this, req);
119 
120         // SSE stream header sent, So let us setup monitor.
121         // Any read data on this stream will be error in case of SSE.
122         adaptor.async_read_some(boost::asio::buffer(buffer),
123                                 std::bind_front(&ConnectionImpl::afterReadError,
124                                                 this, shared_from_this()));
125     }
126 
afterReadError(const std::shared_ptr<Connection> &,const boost::system::error_code & ec,size_t bytesRead)127     void afterReadError(const std::shared_ptr<Connection>& /*self*/,
128                         const boost::system::error_code& ec, size_t bytesRead)
129     {
130         BMCWEB_LOG_DEBUG("Read {}", bytesRead);
131         if (ec == boost::asio::error::operation_aborted)
132         {
133             return;
134         }
135         if (ec)
136         {
137             BMCWEB_LOG_ERROR("Read error: {}", ec);
138         }
139 
140         close("Close SSE connection");
141     }
142 
doWrite()143     void doWrite()
144     {
145         if (doingWrite)
146         {
147             return;
148         }
149         if (inputBuffer.size() == 0)
150         {
151             BMCWEB_LOG_DEBUG("inputBuffer is empty... Bailing out");
152             return;
153         }
154         startTimeout();
155         doingWrite = true;
156 
157         adaptor.async_write_some(
158             inputBuffer.data(),
159             std::bind_front(&ConnectionImpl::doWriteCallback, this,
160                             shared_from_this()));
161     }
162 
doWriteCallback(const std::shared_ptr<Connection> &,const boost::beast::error_code & ec,size_t bytesTransferred)163     void doWriteCallback(const std::shared_ptr<Connection>& /*self*/,
164                          const boost::beast::error_code& ec,
165                          size_t bytesTransferred)
166     {
167         timer.cancel();
168         doingWrite = false;
169         inputBuffer.consume(bytesTransferred);
170 
171         if (ec == boost::asio::error::eof)
172         {
173             BMCWEB_LOG_ERROR("async_write_some() SSE stream closed");
174             close("SSE stream closed");
175             return;
176         }
177 
178         if (ec)
179         {
180             BMCWEB_LOG_ERROR("async_write_some() failed: {}", ec.message());
181             close("async_write_some failed");
182             return;
183         }
184         BMCWEB_LOG_DEBUG("async_write_some() bytes transferred: {}",
185                          bytesTransferred);
186 
187         doWrite();
188     }
189 
sendSseEvent(std::string_view id,std::string_view msg)190     void sendSseEvent(std::string_view id, std::string_view msg) override
191     {
192         if (msg.empty())
193         {
194             BMCWEB_LOG_DEBUG("Empty data, bailing out.");
195             return;
196         }
197 
198         dataFormat(id, msg);
199 
200         doWrite();
201     }
202 
dataFormat(std::string_view id,std::string_view msg)203     void dataFormat(std::string_view id, std::string_view msg)
204     {
205         constexpr size_t bufferLimit = 10485760U; // 10MB
206         if (id.size() + msg.size() + inputBuffer.size() >= bufferLimit)
207         {
208             BMCWEB_LOG_ERROR("SSE Buffer overflow while waiting for client");
209             close("Buffer overflow");
210             return;
211         }
212         std::string rawData;
213         if (!id.empty())
214         {
215             rawData += "id: ";
216             rawData.append(id);
217             rawData += "\n";
218         }
219 
220         rawData += "data: ";
221         for (char character : msg)
222         {
223             rawData += character;
224             if (character == '\n')
225             {
226                 rawData += "data: ";
227             }
228         }
229         rawData += "\n\n";
230 
231         size_t copied = boost::asio::buffer_copy(
232             inputBuffer.prepare(rawData.size()), boost::asio::buffer(rawData));
233         inputBuffer.commit(copied);
234     }
235 
startTimeout()236     void startTimeout()
237     {
238         std::weak_ptr<Connection> weakSelf = weak_from_this();
239         timer.expires_after(std::chrono::seconds(30));
240         timer.async_wait(std::bind_front(&ConnectionImpl::onTimeoutCallback,
241                                          this, weak_from_this()));
242     }
243 
onTimeoutCallback(const std::weak_ptr<Connection> & weakSelf,const boost::system::error_code & ec)244     void onTimeoutCallback(const std::weak_ptr<Connection>& weakSelf,
245                            const boost::system::error_code& ec)
246     {
247         std::shared_ptr<Connection> self = weakSelf.lock();
248         if (!self)
249         {
250             BMCWEB_LOG_CRITICAL("{} Failed to capture connection",
251                                 logPtr(self.get()));
252             return;
253         }
254 
255         if (ec == boost::asio::error::operation_aborted)
256         {
257             BMCWEB_LOG_DEBUG("Timer operation aborted");
258             // Canceled wait means the path succeeded.
259             return;
260         }
261         if (ec)
262         {
263             BMCWEB_LOG_CRITICAL("{} timer failed {}", logPtr(self.get()), ec);
264         }
265 
266         BMCWEB_LOG_WARNING("{} Connection timed out, closing",
267                            logPtr(self.get()));
268 
269         self->close("closing connection");
270     }
271 
272   private:
273     std::array<char, 1> buffer{};
274     boost::beast::multi_buffer inputBuffer;
275 
276     Adaptor adaptor;
277 
278     using BodyType = bmcweb::HttpBody;
279     boost::beast::http::response<BodyType> res;
280     std::optional<boost::beast::http::response_serializer<BodyType>> serializer;
281     boost::asio::steady_timer timer;
282     bool doingWrite = false;
283 
284     std::function<void(Connection&, const Request&)> openHandler;
285     std::function<void(Connection&)> closeHandler;
286 };
287 } // namespace sse_socket
288 } // namespace crow
289