xref: /openbmc/bmcweb/http/server_sent_event.hpp (revision 11e8f60df2fbe1ceafdf886132bd93d112a726bf)
1  #pragma once
2  #include "dbus_singleton.hpp"
3  #include "http_request.hpp"
4  #include "http_response.hpp"
5  
6  #include <boost/algorithm/string/predicate.hpp>
7  #include <boost/asio/buffer.hpp>
8  #include <boost/asio/steady_timer.hpp>
9  #include <boost/beast/core/multi_buffer.hpp>
10  #include <boost/beast/http/buffer_body.hpp>
11  #include <boost/beast/websocket.hpp>
12  
13  #include <array>
14  #include <functional>
15  
16  #ifdef BMCWEB_ENABLE_SSL
17  #include <boost/beast/websocket/ssl.hpp>
18  #endif
19  
20  namespace crow
21  {
22  
23  namespace sse_socket
24  {
25  struct Connection : std::enable_shared_from_this<Connection>
26  {
27    public:
28      Connection() = default;
29  
30      Connection(const Connection&) = delete;
31      Connection(Connection&&) = delete;
32      Connection& operator=(const Connection&) = delete;
33      Connection& operator=(const Connection&&) = delete;
34      virtual ~Connection() = default;
35  
36      virtual boost::asio::io_context& getIoContext() = 0;
37      virtual void close(std::string_view msg = "quit") = 0;
38      virtual void sendEvent(std::string_view id, std::string_view msg) = 0;
39  };
40  
41  template <typename Adaptor>
42  class ConnectionImpl : public Connection
43  {
44    public:
45      ConnectionImpl(Adaptor&& adaptorIn,
46                     std::function<void(Connection&)> openHandlerIn,
47                     std::function<void(Connection&)> closeHandlerIn) :
48          adaptor(std::move(adaptorIn)),
49          timer(ioc), openHandler(std::move(openHandlerIn)),
50          closeHandler(std::move(closeHandlerIn))
51      {
52          BMCWEB_LOG_DEBUG("SseConnectionImpl: SSE constructor {}", logPtr(this));
53      }
54  
55      ConnectionImpl(const ConnectionImpl&) = delete;
56      ConnectionImpl(const ConnectionImpl&&) = delete;
57      ConnectionImpl& operator=(const ConnectionImpl&) = delete;
58      ConnectionImpl& operator=(const ConnectionImpl&&) = delete;
59  
60      ~ConnectionImpl() override
61      {
62          BMCWEB_LOG_DEBUG("SSE ConnectionImpl: SSE destructor {}", logPtr(this));
63      }
64  
65      boost::asio::io_context& getIoContext() override
66      {
67          return static_cast<boost::asio::io_context&>(
68              adaptor.get_executor().context());
69      }
70  
71      void start()
72      {
73          if (!openHandler)
74          {
75              BMCWEB_LOG_CRITICAL("No open handler???");
76              return;
77          }
78          openHandler(*this);
79      }
80  
81      void close(const std::string_view msg) override
82      {
83          // send notification to handler for cleanup
84          if (closeHandler)
85          {
86              closeHandler(*this);
87          }
88          BMCWEB_LOG_DEBUG("Closing SSE connection {} - {}", logPtr(this), msg);
89          boost::beast::get_lowest_layer(adaptor).close();
90      }
91  
92      void sendSSEHeader()
93      {
94          BMCWEB_LOG_DEBUG("Starting SSE connection");
95          using BodyType = boost::beast::http::buffer_body;
96          boost::beast::http::response<BodyType> res(
97              boost::beast::http::status::ok, 11, BodyType{});
98          res.set(boost::beast::http::field::content_type, "text/event-stream");
99          res.body().more = true;
100          boost::beast::http::response_serializer<BodyType>& ser =
101              serializer.emplace(std::move(res));
102  
103          boost::beast::http::async_write_header(
104              adaptor, ser,
105              std::bind_front(&ConnectionImpl::sendSSEHeaderCallback, this,
106                              shared_from_this()));
107      }
108  
109      void sendSSEHeaderCallback(const std::shared_ptr<Connection>& /*self*/,
110                                 const boost::system::error_code& ec,
111                                 size_t /*bytesSent*/)
112      {
113          serializer.reset();
114          if (ec)
115          {
116              BMCWEB_LOG_ERROR("Error sending header{}", ec);
117              close("async_write_header failed");
118              return;
119          }
120          BMCWEB_LOG_DEBUG("SSE header sent - Connection established");
121  
122          serializer.reset();
123  
124          // SSE stream header sent, So let us setup monitor.
125          // Any read data on this stream will be error in case of SSE.
126  
127          adaptor.async_wait(boost::asio::ip::tcp::socket::wait_error,
128                             std::bind_front(&ConnectionImpl::afterReadError,
129                                             this, shared_from_this()));
130      }
131  
132      void afterReadError(const std::shared_ptr<Connection>& /*self*/,
133                          const boost::system::error_code& ec)
134      {
135          if (ec == boost::asio::error::operation_aborted)
136          {
137              return;
138          }
139          if (ec)
140          {
141              BMCWEB_LOG_ERROR("Read error: {}", ec);
142          }
143  
144          close("Close SSE connection");
145      }
146  
147      void doWrite()
148      {
149          if (doingWrite)
150          {
151              return;
152          }
153          if (inputBuffer.size() == 0)
154          {
155              BMCWEB_LOG_DEBUG("inputBuffer is empty... Bailing out");
156              return;
157          }
158          startTimeout();
159          doingWrite = true;
160  
161          adaptor.async_write_some(
162              inputBuffer.data(),
163              std::bind_front(&ConnectionImpl::doWriteCallback, this,
164                              weak_from_this()));
165      }
166  
167      void doWriteCallback(const std::weak_ptr<Connection>& weak,
168                           const boost::beast::error_code& ec,
169                           size_t bytesTransferred)
170      {
171          auto self = weak.lock();
172          if (self == nullptr)
173          {
174              return;
175          }
176          timer.cancel();
177          doingWrite = false;
178          inputBuffer.consume(bytesTransferred);
179  
180          if (ec == boost::asio::error::eof)
181          {
182              BMCWEB_LOG_ERROR("async_write_some() SSE stream closed");
183              close("SSE stream closed");
184              return;
185          }
186  
187          if (ec)
188          {
189              BMCWEB_LOG_ERROR("async_write_some() failed: {}", ec.message());
190              close("async_write_some failed");
191              return;
192          }
193          BMCWEB_LOG_DEBUG("async_write_some() bytes transferred: {}",
194                           bytesTransferred);
195  
196          doWrite();
197      }
198  
199      void sendEvent(std::string_view id, std::string_view msg) override
200      {
201          if (msg.empty())
202          {
203              BMCWEB_LOG_DEBUG("Empty data, bailing out.");
204              return;
205          }
206  
207          dataFormat(id, msg);
208  
209          doWrite();
210      }
211  
212      void dataFormat(std::string_view id, std::string_view msg)
213      {
214          std::string rawData;
215          if (!id.empty())
216          {
217              rawData += "id: ";
218              rawData.append(id);
219              rawData += "\n";
220          }
221  
222          rawData += "data: ";
223          for (char character : msg)
224          {
225              rawData += character;
226              if (character == '\n')
227              {
228                  rawData += "data: ";
229              }
230          }
231          rawData += "\n\n";
232  
233          boost::asio::buffer_copy(inputBuffer.prepare(rawData.size()),
234                                   boost::asio::buffer(rawData));
235          inputBuffer.commit(rawData.size());
236      }
237  
238      void startTimeout()
239      {
240          std::weak_ptr<Connection> weakSelf = weak_from_this();
241          timer.expires_after(std::chrono::seconds(30));
242          timer.async_wait(std::bind_front(&ConnectionImpl::onTimeoutCallback,
243                                           this, weak_from_this()));
244      }
245  
246      void onTimeoutCallback(const std::weak_ptr<Connection>& weakSelf,
247                             const boost::system::error_code& ec)
248      {
249          std::shared_ptr<Connection> self = weakSelf.lock();
250          if (!self)
251          {
252              BMCWEB_LOG_CRITICAL("{} Failed to capture connection",
253                                  logPtr(self.get()));
254              return;
255          }
256  
257          if (ec == boost::asio::error::operation_aborted)
258          {
259              BMCWEB_LOG_DEBUG("operation aborted");
260              // Canceled wait means the path succeeeded.
261              return;
262          }
263          if (ec)
264          {
265              BMCWEB_LOG_CRITICAL("{} timer failed {}", logPtr(self.get()), ec);
266          }
267  
268          BMCWEB_LOG_WARNING("{}Connection timed out, closing",
269                             logPtr(self.get()));
270  
271          self->close("closing connection");
272      }
273  
274    private:
275      Adaptor adaptor;
276  
277      boost::beast::multi_buffer inputBuffer;
278  
279      std::optional<boost::beast::http::response_serializer<
280          boost::beast::http::buffer_body>>
281          serializer;
282      boost::asio::io_context& ioc =
283          crow::connections::systemBus->get_io_context();
284      boost::asio::steady_timer timer;
285      bool doingWrite = false;
286  
287      std::function<void(Connection&)> openHandler;
288      std::function<void(Connection&)> closeHandler;
289  };
290  } // namespace sse_socket
291  } // namespace crow
292