xref: /openbmc/bmcweb/http/server_sent_event_impl.hpp (revision e60300aee76b7875f5fc407acede2f05ddbdd9bc)
1 // SPDX-License-Identifier: Apache-2.0
2 // SPDX-FileCopyrightText: Copyright OpenBMC Authors
3 #pragma once
4 #include "boost_formatters.hpp"
5 #include "http_body.hpp"
6 #include "http_request.hpp"
7 #include "io_context_singleton.hpp"
8 #include "logging.hpp"
9 #include "server_sent_event.hpp"
10 
11 #include <boost/asio/buffer.hpp>
12 #include <boost/asio/error.hpp>
13 #include <boost/asio/steady_timer.hpp>
14 #include <boost/beast/core/error.hpp>
15 #include <boost/beast/core/multi_buffer.hpp>
16 #include <boost/beast/http/field.hpp>
17 #include <boost/beast/http/serializer.hpp>
18 #include <boost/beast/http/write.hpp>
19 
20 #include <array>
21 #include <chrono>
22 #include <cstddef>
23 #include <functional>
24 #include <memory>
25 #include <optional>
26 #include <string>
27 #include <string_view>
28 #include <utility>
29 
30 namespace crow
31 {
32 
33 namespace sse_socket
34 {
35 
36 template <typename Adaptor>
37 class ConnectionImpl : public Connection
38 {
39   public:
ConnectionImpl(Adaptor && adaptorIn,std::function<void (Connection &,const Request &)> openHandlerIn,std::function<void (Connection &)> closeHandlerIn)40     ConnectionImpl(
41         Adaptor&& adaptorIn,
42         std::function<void(Connection&, const Request&)> openHandlerIn,
43         std::function<void(Connection&)> closeHandlerIn) :
44         adaptor(std::move(adaptorIn)), timer(getIoContext()),
45         openHandler(std::move(openHandlerIn)),
46         closeHandler(std::move(closeHandlerIn))
47 
48     {
49         BMCWEB_LOG_DEBUG("SseConnectionImpl: SSE constructor {}", logPtr(this));
50     }
51 
52     ConnectionImpl(const ConnectionImpl&) = delete;
53     ConnectionImpl(const ConnectionImpl&&) = delete;
54     ConnectionImpl& operator=(const ConnectionImpl&) = delete;
55     ConnectionImpl& operator=(const ConnectionImpl&&) = delete;
56 
~ConnectionImpl()57     ~ConnectionImpl() override
58     {
59         BMCWEB_LOG_DEBUG("SSE ConnectionImpl: SSE destructor {}", logPtr(this));
60     }
61 
start(const Request & req)62     void start(const Request& req)
63     {
64         BMCWEB_LOG_DEBUG("Starting SSE connection");
65 
66         res.set(boost::beast::http::field::content_type, "text/event-stream");
67         boost::beast::http::response_serializer<BodyType>& serial =
68             serializer.emplace(res);
69 
70         boost::beast::http::async_write_header(
71             adaptor, serial,
72             std::bind_front(&ConnectionImpl::sendSSEHeaderCallback, this,
73                             shared_from_this(), req));
74     }
75 
close(const std::string_view msg)76     void close(const std::string_view msg) override
77     {
78         BMCWEB_LOG_DEBUG("Closing connection with reason {}", msg);
79         // send notification to handler for cleanup
80         if (closeHandler)
81         {
82             closeHandler(*this);
83         }
84         BMCWEB_LOG_DEBUG("Closing SSE connection {} - {}", logPtr(this), msg);
85         boost::beast::get_lowest_layer(adaptor).close();
86     }
87 
sendSSEHeaderCallback(const std::shared_ptr<Connection> &,const Request & req,const boost::system::error_code & ec,size_t)88     void sendSSEHeaderCallback(const std::shared_ptr<Connection>& /*self*/,
89                                const Request& req,
90                                const boost::system::error_code& ec,
91                                size_t /*bytesSent*/)
92     {
93         serializer.reset();
94         if (ec)
95         {
96             BMCWEB_LOG_ERROR("Error sending header{}", ec);
97             close("async_write_header failed");
98             return;
99         }
100         BMCWEB_LOG_DEBUG("SSE header sent - Connection established");
101         if (!openHandler)
102         {
103             BMCWEB_LOG_CRITICAL("No open handler???");
104             return;
105         }
106         openHandler(*this, req);
107 
108         // SSE stream header sent, So let us setup monitor.
109         // Any read data on this stream will be error in case of SSE.
110         adaptor.async_read_some(boost::asio::buffer(buffer),
111                                 std::bind_front(&ConnectionImpl::afterReadError,
112                                                 this, shared_from_this()));
113     }
114 
afterReadError(const std::shared_ptr<Connection> &,const boost::system::error_code & ec,size_t bytesRead)115     void afterReadError(const std::shared_ptr<Connection>& /*self*/,
116                         const boost::system::error_code& ec, size_t bytesRead)
117     {
118         BMCWEB_LOG_DEBUG("Read {}", bytesRead);
119         if (ec == boost::asio::error::operation_aborted)
120         {
121             return;
122         }
123         if (ec)
124         {
125             BMCWEB_LOG_ERROR("Read error: {}", ec);
126         }
127 
128         close("Close SSE connection");
129     }
130 
doWrite()131     void doWrite()
132     {
133         if (doingWrite)
134         {
135             return;
136         }
137         if (inputBuffer.size() == 0)
138         {
139             BMCWEB_LOG_DEBUG("inputBuffer is empty... Bailing out");
140             return;
141         }
142         startTimeout();
143         doingWrite = true;
144 
145         adaptor.async_write_some(
146             inputBuffer.data(),
147             std::bind_front(&ConnectionImpl::doWriteCallback, this,
148                             shared_from_this()));
149     }
150 
doWriteCallback(const std::shared_ptr<Connection> &,const boost::beast::error_code & ec,size_t bytesTransferred)151     void doWriteCallback(const std::shared_ptr<Connection>& /*self*/,
152                          const boost::beast::error_code& ec,
153                          size_t bytesTransferred)
154     {
155         timer.cancel();
156         doingWrite = false;
157         inputBuffer.consume(bytesTransferred);
158 
159         if (ec == boost::asio::error::eof)
160         {
161             BMCWEB_LOG_ERROR("async_write_some() SSE stream closed");
162             close("SSE stream closed");
163             return;
164         }
165 
166         if (ec)
167         {
168             BMCWEB_LOG_ERROR("async_write_some() failed: {}", ec.message());
169             close("async_write_some failed");
170             return;
171         }
172         BMCWEB_LOG_DEBUG("async_write_some() bytes transferred: {}",
173                          bytesTransferred);
174 
175         doWrite();
176     }
177 
sendSseEvent(std::string_view id,std::string_view msg)178     void sendSseEvent(std::string_view id, std::string_view msg) override
179     {
180         if (msg.empty())
181         {
182             BMCWEB_LOG_DEBUG("Empty data, bailing out.");
183             return;
184         }
185 
186         dataFormat(id, msg);
187 
188         doWrite();
189     }
190 
dataFormat(std::string_view id,std::string_view msg)191     void dataFormat(std::string_view id, std::string_view msg)
192     {
193         constexpr size_t bufferLimit = 10485760U; // 10MB
194         if (id.size() + msg.size() + inputBuffer.size() >= bufferLimit)
195         {
196             BMCWEB_LOG_ERROR("SSE Buffer overflow while waiting for client");
197             close("Buffer overflow");
198             return;
199         }
200         std::string rawData;
201         if (!id.empty())
202         {
203             rawData += "id: ";
204             rawData.append(id);
205             rawData += "\n";
206         }
207 
208         rawData += "data: ";
209         for (char character : msg)
210         {
211             rawData += character;
212             if (character == '\n')
213             {
214                 rawData += "data: ";
215             }
216         }
217         rawData += "\n\n";
218 
219         size_t copied = boost::asio::buffer_copy(
220             inputBuffer.prepare(rawData.size()), boost::asio::buffer(rawData));
221         inputBuffer.commit(copied);
222     }
223 
startTimeout()224     void startTimeout()
225     {
226         std::weak_ptr<Connection> weakSelf = weak_from_this();
227         timer.expires_after(std::chrono::seconds(30));
228         timer.async_wait(std::bind_front(&ConnectionImpl::onTimeoutCallback,
229                                          this, weak_from_this()));
230     }
231 
onTimeoutCallback(const std::weak_ptr<Connection> & weakSelf,const boost::system::error_code & ec)232     void onTimeoutCallback(const std::weak_ptr<Connection>& weakSelf,
233                            const boost::system::error_code& ec)
234     {
235         std::shared_ptr<Connection> self = weakSelf.lock();
236         if (!self)
237         {
238             BMCWEB_LOG_CRITICAL("{} Failed to capture connection",
239                                 logPtr(self.get()));
240             return;
241         }
242 
243         if (ec == boost::asio::error::operation_aborted)
244         {
245             BMCWEB_LOG_DEBUG("Timer operation aborted");
246             // Canceled wait means the path succeeded.
247             return;
248         }
249         if (ec)
250         {
251             BMCWEB_LOG_CRITICAL("{} timer failed {}", logPtr(self.get()), ec);
252         }
253 
254         BMCWEB_LOG_WARNING("{} Connection timed out, closing",
255                            logPtr(self.get()));
256 
257         self->close("closing connection");
258     }
259 
260   private:
261     std::array<char, 1> buffer{};
262     boost::beast::multi_buffer inputBuffer;
263 
264     Adaptor adaptor;
265 
266     using BodyType = bmcweb::HttpBody;
267     boost::beast::http::response<BodyType> res;
268     std::optional<boost::beast::http::response_serializer<BodyType>> serializer;
269     boost::asio::steady_timer timer;
270     bool doingWrite = false;
271 
272     std::function<void(Connection&, const Request&)> openHandler;
273     std::function<void(Connection&)> closeHandler;
274 };
275 } // namespace sse_socket
276 } // namespace crow
277