xref: /openbmc/bmcweb/http/server_sent_event_impl.hpp (revision e60300aee76b7875f5fc407acede2f05ddbdd9bc)
1*e60300aeSEd Tanous // SPDX-License-Identifier: Apache-2.0
2*e60300aeSEd Tanous // SPDX-FileCopyrightText: Copyright OpenBMC Authors
3*e60300aeSEd Tanous #pragma once
4*e60300aeSEd Tanous #include "boost_formatters.hpp"
5*e60300aeSEd Tanous #include "http_body.hpp"
6*e60300aeSEd Tanous #include "http_request.hpp"
7*e60300aeSEd Tanous #include "io_context_singleton.hpp"
8*e60300aeSEd Tanous #include "logging.hpp"
9*e60300aeSEd Tanous #include "server_sent_event.hpp"
10*e60300aeSEd Tanous 
11*e60300aeSEd Tanous #include <boost/asio/buffer.hpp>
12*e60300aeSEd Tanous #include <boost/asio/error.hpp>
13*e60300aeSEd Tanous #include <boost/asio/steady_timer.hpp>
14*e60300aeSEd Tanous #include <boost/beast/core/error.hpp>
15*e60300aeSEd Tanous #include <boost/beast/core/multi_buffer.hpp>
16*e60300aeSEd Tanous #include <boost/beast/http/field.hpp>
17*e60300aeSEd Tanous #include <boost/beast/http/serializer.hpp>
18*e60300aeSEd Tanous #include <boost/beast/http/write.hpp>
19*e60300aeSEd Tanous 
20*e60300aeSEd Tanous #include <array>
21*e60300aeSEd Tanous #include <chrono>
22*e60300aeSEd Tanous #include <cstddef>
23*e60300aeSEd Tanous #include <functional>
24*e60300aeSEd Tanous #include <memory>
25*e60300aeSEd Tanous #include <optional>
26*e60300aeSEd Tanous #include <string>
27*e60300aeSEd Tanous #include <string_view>
28*e60300aeSEd Tanous #include <utility>
29*e60300aeSEd Tanous 
30*e60300aeSEd Tanous namespace crow
31*e60300aeSEd Tanous {
32*e60300aeSEd Tanous 
33*e60300aeSEd Tanous namespace sse_socket
34*e60300aeSEd Tanous {
35*e60300aeSEd Tanous 
36*e60300aeSEd Tanous template <typename Adaptor>
37*e60300aeSEd Tanous class ConnectionImpl : public Connection
38*e60300aeSEd Tanous {
39*e60300aeSEd Tanous   public:
ConnectionImpl(Adaptor && adaptorIn,std::function<void (Connection &,const Request &)> openHandlerIn,std::function<void (Connection &)> closeHandlerIn)40*e60300aeSEd Tanous     ConnectionImpl(
41*e60300aeSEd Tanous         Adaptor&& adaptorIn,
42*e60300aeSEd Tanous         std::function<void(Connection&, const Request&)> openHandlerIn,
43*e60300aeSEd Tanous         std::function<void(Connection&)> closeHandlerIn) :
44*e60300aeSEd Tanous         adaptor(std::move(adaptorIn)), timer(getIoContext()),
45*e60300aeSEd Tanous         openHandler(std::move(openHandlerIn)),
46*e60300aeSEd Tanous         closeHandler(std::move(closeHandlerIn))
47*e60300aeSEd Tanous 
48*e60300aeSEd Tanous     {
49*e60300aeSEd Tanous         BMCWEB_LOG_DEBUG("SseConnectionImpl: SSE constructor {}", logPtr(this));
50*e60300aeSEd Tanous     }
51*e60300aeSEd Tanous 
52*e60300aeSEd Tanous     ConnectionImpl(const ConnectionImpl&) = delete;
53*e60300aeSEd Tanous     ConnectionImpl(const ConnectionImpl&&) = delete;
54*e60300aeSEd Tanous     ConnectionImpl& operator=(const ConnectionImpl&) = delete;
55*e60300aeSEd Tanous     ConnectionImpl& operator=(const ConnectionImpl&&) = delete;
56*e60300aeSEd Tanous 
~ConnectionImpl()57*e60300aeSEd Tanous     ~ConnectionImpl() override
58*e60300aeSEd Tanous     {
59*e60300aeSEd Tanous         BMCWEB_LOG_DEBUG("SSE ConnectionImpl: SSE destructor {}", logPtr(this));
60*e60300aeSEd Tanous     }
61*e60300aeSEd Tanous 
start(const Request & req)62*e60300aeSEd Tanous     void start(const Request& req)
63*e60300aeSEd Tanous     {
64*e60300aeSEd Tanous         BMCWEB_LOG_DEBUG("Starting SSE connection");
65*e60300aeSEd Tanous 
66*e60300aeSEd Tanous         res.set(boost::beast::http::field::content_type, "text/event-stream");
67*e60300aeSEd Tanous         boost::beast::http::response_serializer<BodyType>& serial =
68*e60300aeSEd Tanous             serializer.emplace(res);
69*e60300aeSEd Tanous 
70*e60300aeSEd Tanous         boost::beast::http::async_write_header(
71*e60300aeSEd Tanous             adaptor, serial,
72*e60300aeSEd Tanous             std::bind_front(&ConnectionImpl::sendSSEHeaderCallback, this,
73*e60300aeSEd Tanous                             shared_from_this(), req));
74*e60300aeSEd Tanous     }
75*e60300aeSEd Tanous 
close(const std::string_view msg)76*e60300aeSEd Tanous     void close(const std::string_view msg) override
77*e60300aeSEd Tanous     {
78*e60300aeSEd Tanous         BMCWEB_LOG_DEBUG("Closing connection with reason {}", msg);
79*e60300aeSEd Tanous         // send notification to handler for cleanup
80*e60300aeSEd Tanous         if (closeHandler)
81*e60300aeSEd Tanous         {
82*e60300aeSEd Tanous             closeHandler(*this);
83*e60300aeSEd Tanous         }
84*e60300aeSEd Tanous         BMCWEB_LOG_DEBUG("Closing SSE connection {} - {}", logPtr(this), msg);
85*e60300aeSEd Tanous         boost::beast::get_lowest_layer(adaptor).close();
86*e60300aeSEd Tanous     }
87*e60300aeSEd Tanous 
sendSSEHeaderCallback(const std::shared_ptr<Connection> &,const Request & req,const boost::system::error_code & ec,size_t)88*e60300aeSEd Tanous     void sendSSEHeaderCallback(const std::shared_ptr<Connection>& /*self*/,
89*e60300aeSEd Tanous                                const Request& req,
90*e60300aeSEd Tanous                                const boost::system::error_code& ec,
91*e60300aeSEd Tanous                                size_t /*bytesSent*/)
92*e60300aeSEd Tanous     {
93*e60300aeSEd Tanous         serializer.reset();
94*e60300aeSEd Tanous         if (ec)
95*e60300aeSEd Tanous         {
96*e60300aeSEd Tanous             BMCWEB_LOG_ERROR("Error sending header{}", ec);
97*e60300aeSEd Tanous             close("async_write_header failed");
98*e60300aeSEd Tanous             return;
99*e60300aeSEd Tanous         }
100*e60300aeSEd Tanous         BMCWEB_LOG_DEBUG("SSE header sent - Connection established");
101*e60300aeSEd Tanous         if (!openHandler)
102*e60300aeSEd Tanous         {
103*e60300aeSEd Tanous             BMCWEB_LOG_CRITICAL("No open handler???");
104*e60300aeSEd Tanous             return;
105*e60300aeSEd Tanous         }
106*e60300aeSEd Tanous         openHandler(*this, req);
107*e60300aeSEd Tanous 
108*e60300aeSEd Tanous         // SSE stream header sent, So let us setup monitor.
109*e60300aeSEd Tanous         // Any read data on this stream will be error in case of SSE.
110*e60300aeSEd Tanous         adaptor.async_read_some(boost::asio::buffer(buffer),
111*e60300aeSEd Tanous                                 std::bind_front(&ConnectionImpl::afterReadError,
112*e60300aeSEd Tanous                                                 this, shared_from_this()));
113*e60300aeSEd Tanous     }
114*e60300aeSEd Tanous 
afterReadError(const std::shared_ptr<Connection> &,const boost::system::error_code & ec,size_t bytesRead)115*e60300aeSEd Tanous     void afterReadError(const std::shared_ptr<Connection>& /*self*/,
116*e60300aeSEd Tanous                         const boost::system::error_code& ec, size_t bytesRead)
117*e60300aeSEd Tanous     {
118*e60300aeSEd Tanous         BMCWEB_LOG_DEBUG("Read {}", bytesRead);
119*e60300aeSEd Tanous         if (ec == boost::asio::error::operation_aborted)
120*e60300aeSEd Tanous         {
121*e60300aeSEd Tanous             return;
122*e60300aeSEd Tanous         }
123*e60300aeSEd Tanous         if (ec)
124*e60300aeSEd Tanous         {
125*e60300aeSEd Tanous             BMCWEB_LOG_ERROR("Read error: {}", ec);
126*e60300aeSEd Tanous         }
127*e60300aeSEd Tanous 
128*e60300aeSEd Tanous         close("Close SSE connection");
129*e60300aeSEd Tanous     }
130*e60300aeSEd Tanous 
doWrite()131*e60300aeSEd Tanous     void doWrite()
132*e60300aeSEd Tanous     {
133*e60300aeSEd Tanous         if (doingWrite)
134*e60300aeSEd Tanous         {
135*e60300aeSEd Tanous             return;
136*e60300aeSEd Tanous         }
137*e60300aeSEd Tanous         if (inputBuffer.size() == 0)
138*e60300aeSEd Tanous         {
139*e60300aeSEd Tanous             BMCWEB_LOG_DEBUG("inputBuffer is empty... Bailing out");
140*e60300aeSEd Tanous             return;
141*e60300aeSEd Tanous         }
142*e60300aeSEd Tanous         startTimeout();
143*e60300aeSEd Tanous         doingWrite = true;
144*e60300aeSEd Tanous 
145*e60300aeSEd Tanous         adaptor.async_write_some(
146*e60300aeSEd Tanous             inputBuffer.data(),
147*e60300aeSEd Tanous             std::bind_front(&ConnectionImpl::doWriteCallback, this,
148*e60300aeSEd Tanous                             shared_from_this()));
149*e60300aeSEd Tanous     }
150*e60300aeSEd Tanous 
doWriteCallback(const std::shared_ptr<Connection> &,const boost::beast::error_code & ec,size_t bytesTransferred)151*e60300aeSEd Tanous     void doWriteCallback(const std::shared_ptr<Connection>& /*self*/,
152*e60300aeSEd Tanous                          const boost::beast::error_code& ec,
153*e60300aeSEd Tanous                          size_t bytesTransferred)
154*e60300aeSEd Tanous     {
155*e60300aeSEd Tanous         timer.cancel();
156*e60300aeSEd Tanous         doingWrite = false;
157*e60300aeSEd Tanous         inputBuffer.consume(bytesTransferred);
158*e60300aeSEd Tanous 
159*e60300aeSEd Tanous         if (ec == boost::asio::error::eof)
160*e60300aeSEd Tanous         {
161*e60300aeSEd Tanous             BMCWEB_LOG_ERROR("async_write_some() SSE stream closed");
162*e60300aeSEd Tanous             close("SSE stream closed");
163*e60300aeSEd Tanous             return;
164*e60300aeSEd Tanous         }
165*e60300aeSEd Tanous 
166*e60300aeSEd Tanous         if (ec)
167*e60300aeSEd Tanous         {
168*e60300aeSEd Tanous             BMCWEB_LOG_ERROR("async_write_some() failed: {}", ec.message());
169*e60300aeSEd Tanous             close("async_write_some failed");
170*e60300aeSEd Tanous             return;
171*e60300aeSEd Tanous         }
172*e60300aeSEd Tanous         BMCWEB_LOG_DEBUG("async_write_some() bytes transferred: {}",
173*e60300aeSEd Tanous                          bytesTransferred);
174*e60300aeSEd Tanous 
175*e60300aeSEd Tanous         doWrite();
176*e60300aeSEd Tanous     }
177*e60300aeSEd Tanous 
sendSseEvent(std::string_view id,std::string_view msg)178*e60300aeSEd Tanous     void sendSseEvent(std::string_view id, std::string_view msg) override
179*e60300aeSEd Tanous     {
180*e60300aeSEd Tanous         if (msg.empty())
181*e60300aeSEd Tanous         {
182*e60300aeSEd Tanous             BMCWEB_LOG_DEBUG("Empty data, bailing out.");
183*e60300aeSEd Tanous             return;
184*e60300aeSEd Tanous         }
185*e60300aeSEd Tanous 
186*e60300aeSEd Tanous         dataFormat(id, msg);
187*e60300aeSEd Tanous 
188*e60300aeSEd Tanous         doWrite();
189*e60300aeSEd Tanous     }
190*e60300aeSEd Tanous 
dataFormat(std::string_view id,std::string_view msg)191*e60300aeSEd Tanous     void dataFormat(std::string_view id, std::string_view msg)
192*e60300aeSEd Tanous     {
193*e60300aeSEd Tanous         constexpr size_t bufferLimit = 10485760U; // 10MB
194*e60300aeSEd Tanous         if (id.size() + msg.size() + inputBuffer.size() >= bufferLimit)
195*e60300aeSEd Tanous         {
196*e60300aeSEd Tanous             BMCWEB_LOG_ERROR("SSE Buffer overflow while waiting for client");
197*e60300aeSEd Tanous             close("Buffer overflow");
198*e60300aeSEd Tanous             return;
199*e60300aeSEd Tanous         }
200*e60300aeSEd Tanous         std::string rawData;
201*e60300aeSEd Tanous         if (!id.empty())
202*e60300aeSEd Tanous         {
203*e60300aeSEd Tanous             rawData += "id: ";
204*e60300aeSEd Tanous             rawData.append(id);
205*e60300aeSEd Tanous             rawData += "\n";
206*e60300aeSEd Tanous         }
207*e60300aeSEd Tanous 
208*e60300aeSEd Tanous         rawData += "data: ";
209*e60300aeSEd Tanous         for (char character : msg)
210*e60300aeSEd Tanous         {
211*e60300aeSEd Tanous             rawData += character;
212*e60300aeSEd Tanous             if (character == '\n')
213*e60300aeSEd Tanous             {
214*e60300aeSEd Tanous                 rawData += "data: ";
215*e60300aeSEd Tanous             }
216*e60300aeSEd Tanous         }
217*e60300aeSEd Tanous         rawData += "\n\n";
218*e60300aeSEd Tanous 
219*e60300aeSEd Tanous         size_t copied = boost::asio::buffer_copy(
220*e60300aeSEd Tanous             inputBuffer.prepare(rawData.size()), boost::asio::buffer(rawData));
221*e60300aeSEd Tanous         inputBuffer.commit(copied);
222*e60300aeSEd Tanous     }
223*e60300aeSEd Tanous 
startTimeout()224*e60300aeSEd Tanous     void startTimeout()
225*e60300aeSEd Tanous     {
226*e60300aeSEd Tanous         std::weak_ptr<Connection> weakSelf = weak_from_this();
227*e60300aeSEd Tanous         timer.expires_after(std::chrono::seconds(30));
228*e60300aeSEd Tanous         timer.async_wait(std::bind_front(&ConnectionImpl::onTimeoutCallback,
229*e60300aeSEd Tanous                                          this, weak_from_this()));
230*e60300aeSEd Tanous     }
231*e60300aeSEd Tanous 
onTimeoutCallback(const std::weak_ptr<Connection> & weakSelf,const boost::system::error_code & ec)232*e60300aeSEd Tanous     void onTimeoutCallback(const std::weak_ptr<Connection>& weakSelf,
233*e60300aeSEd Tanous                            const boost::system::error_code& ec)
234*e60300aeSEd Tanous     {
235*e60300aeSEd Tanous         std::shared_ptr<Connection> self = weakSelf.lock();
236*e60300aeSEd Tanous         if (!self)
237*e60300aeSEd Tanous         {
238*e60300aeSEd Tanous             BMCWEB_LOG_CRITICAL("{} Failed to capture connection",
239*e60300aeSEd Tanous                                 logPtr(self.get()));
240*e60300aeSEd Tanous             return;
241*e60300aeSEd Tanous         }
242*e60300aeSEd Tanous 
243*e60300aeSEd Tanous         if (ec == boost::asio::error::operation_aborted)
244*e60300aeSEd Tanous         {
245*e60300aeSEd Tanous             BMCWEB_LOG_DEBUG("Timer operation aborted");
246*e60300aeSEd Tanous             // Canceled wait means the path succeeded.
247*e60300aeSEd Tanous             return;
248*e60300aeSEd Tanous         }
249*e60300aeSEd Tanous         if (ec)
250*e60300aeSEd Tanous         {
251*e60300aeSEd Tanous             BMCWEB_LOG_CRITICAL("{} timer failed {}", logPtr(self.get()), ec);
252*e60300aeSEd Tanous         }
253*e60300aeSEd Tanous 
254*e60300aeSEd Tanous         BMCWEB_LOG_WARNING("{} Connection timed out, closing",
255*e60300aeSEd Tanous                            logPtr(self.get()));
256*e60300aeSEd Tanous 
257*e60300aeSEd Tanous         self->close("closing connection");
258*e60300aeSEd Tanous     }
259*e60300aeSEd Tanous 
260*e60300aeSEd Tanous   private:
261*e60300aeSEd Tanous     std::array<char, 1> buffer{};
262*e60300aeSEd Tanous     boost::beast::multi_buffer inputBuffer;
263*e60300aeSEd Tanous 
264*e60300aeSEd Tanous     Adaptor adaptor;
265*e60300aeSEd Tanous 
266*e60300aeSEd Tanous     using BodyType = bmcweb::HttpBody;
267*e60300aeSEd Tanous     boost::beast::http::response<BodyType> res;
268*e60300aeSEd Tanous     std::optional<boost::beast::http::response_serializer<BodyType>> serializer;
269*e60300aeSEd Tanous     boost::asio::steady_timer timer;
270*e60300aeSEd Tanous     bool doingWrite = false;
271*e60300aeSEd Tanous 
272*e60300aeSEd Tanous     std::function<void(Connection&, const Request&)> openHandler;
273*e60300aeSEd Tanous     std::function<void(Connection&)> closeHandler;
274*e60300aeSEd Tanous };
275*e60300aeSEd Tanous } // namespace sse_socket
276*e60300aeSEd Tanous } // namespace crow
277