xref: /openbmc/bmcweb/http/server_sent_event.hpp (revision 44106f34)
188ada3bcSV-Sanjana #pragma once
2b2896149SEd Tanous #include "http_body.hpp"
388ada3bcSV-Sanjana #include "http_request.hpp"
488ada3bcSV-Sanjana #include "http_response.hpp"
588ada3bcSV-Sanjana 
688ada3bcSV-Sanjana #include <boost/asio/buffer.hpp>
788ada3bcSV-Sanjana #include <boost/asio/steady_timer.hpp>
888ada3bcSV-Sanjana #include <boost/beast/core/multi_buffer.hpp>
988ada3bcSV-Sanjana #include <boost/beast/websocket.hpp>
1088ada3bcSV-Sanjana 
1188ada3bcSV-Sanjana #include <array>
128f79c5b6SEd Tanous #include <cstddef>
1388ada3bcSV-Sanjana #include <functional>
148f79c5b6SEd Tanous #include <optional>
1588ada3bcSV-Sanjana 
1688ada3bcSV-Sanjana namespace crow
1788ada3bcSV-Sanjana {
1888ada3bcSV-Sanjana 
1988ada3bcSV-Sanjana namespace sse_socket
2088ada3bcSV-Sanjana {
2193cf0ac2SEd Tanous struct Connection : public std::enable_shared_from_this<Connection>
2288ada3bcSV-Sanjana {
2388ada3bcSV-Sanjana   public:
246fde95faSEd Tanous     Connection() = default;
2588ada3bcSV-Sanjana 
2688ada3bcSV-Sanjana     Connection(const Connection&) = delete;
2788ada3bcSV-Sanjana     Connection(Connection&&) = delete;
2888ada3bcSV-Sanjana     Connection& operator=(const Connection&) = delete;
2988ada3bcSV-Sanjana     Connection& operator=(const Connection&&) = delete;
3088ada3bcSV-Sanjana     virtual ~Connection() = default;
3188ada3bcSV-Sanjana 
3288ada3bcSV-Sanjana     virtual boost::asio::io_context& getIoContext() = 0;
3388ada3bcSV-Sanjana     virtual void close(std::string_view msg = "quit") = 0;
3488ada3bcSV-Sanjana     virtual void sendEvent(std::string_view id, std::string_view msg) = 0;
3588ada3bcSV-Sanjana };
3688ada3bcSV-Sanjana 
3788ada3bcSV-Sanjana template <typename Adaptor>
3888ada3bcSV-Sanjana class ConnectionImpl : public Connection
3988ada3bcSV-Sanjana {
4088ada3bcSV-Sanjana   public:
ConnectionImpl(Adaptor && adaptorIn,std::function<void (Connection &)> openHandlerIn,std::function<void (Connection &)> closeHandlerIn)4193cf0ac2SEd Tanous     ConnectionImpl(Adaptor&& adaptorIn,
426fde95faSEd Tanous                    std::function<void(Connection&)> openHandlerIn,
436fde95faSEd Tanous                    std::function<void(Connection&)> closeHandlerIn) :
446fde95faSEd Tanous         adaptor(std::move(adaptorIn)),
4593cf0ac2SEd Tanous         timer(static_cast<boost::asio::io_context&>(
4693cf0ac2SEd Tanous             adaptor.get_executor().context())),
4793cf0ac2SEd Tanous         openHandler(std::move(openHandlerIn)),
4888ada3bcSV-Sanjana         closeHandler(std::move(closeHandlerIn))
498f79c5b6SEd Tanous 
5088ada3bcSV-Sanjana     {
5162598e31SEd Tanous         BMCWEB_LOG_DEBUG("SseConnectionImpl: SSE constructor {}", logPtr(this));
5288ada3bcSV-Sanjana     }
5388ada3bcSV-Sanjana 
5488ada3bcSV-Sanjana     ConnectionImpl(const ConnectionImpl&) = delete;
5588ada3bcSV-Sanjana     ConnectionImpl(const ConnectionImpl&&) = delete;
5688ada3bcSV-Sanjana     ConnectionImpl& operator=(const ConnectionImpl&) = delete;
5788ada3bcSV-Sanjana     ConnectionImpl& operator=(const ConnectionImpl&&) = delete;
5888ada3bcSV-Sanjana 
~ConnectionImpl()5988ada3bcSV-Sanjana     ~ConnectionImpl() override
6088ada3bcSV-Sanjana     {
6162598e31SEd Tanous         BMCWEB_LOG_DEBUG("SSE ConnectionImpl: SSE destructor {}", logPtr(this));
6288ada3bcSV-Sanjana     }
6388ada3bcSV-Sanjana 
getIoContext()6488ada3bcSV-Sanjana     boost::asio::io_context& getIoContext() override
6588ada3bcSV-Sanjana     {
6688ada3bcSV-Sanjana         return static_cast<boost::asio::io_context&>(
6788ada3bcSV-Sanjana             adaptor.get_executor().context());
6888ada3bcSV-Sanjana     }
6988ada3bcSV-Sanjana 
start()7088ada3bcSV-Sanjana     void start()
7188ada3bcSV-Sanjana     {
726fde95faSEd Tanous         if (!openHandler)
7388ada3bcSV-Sanjana         {
7462598e31SEd Tanous             BMCWEB_LOG_CRITICAL("No open handler???");
756fde95faSEd Tanous             return;
7688ada3bcSV-Sanjana         }
776fde95faSEd Tanous         openHandler(*this);
788f79c5b6SEd Tanous         sendSSEHeader();
7988ada3bcSV-Sanjana     }
8088ada3bcSV-Sanjana 
close(const std::string_view msg)8188ada3bcSV-Sanjana     void close(const std::string_view msg) override
8288ada3bcSV-Sanjana     {
838f79c5b6SEd Tanous         BMCWEB_LOG_DEBUG("Closing connection with reason {}", msg);
8488ada3bcSV-Sanjana         // send notification to handler for cleanup
8588ada3bcSV-Sanjana         if (closeHandler)
8688ada3bcSV-Sanjana         {
876fde95faSEd Tanous             closeHandler(*this);
8888ada3bcSV-Sanjana         }
8962598e31SEd Tanous         BMCWEB_LOG_DEBUG("Closing SSE connection {} - {}", logPtr(this), msg);
906fde95faSEd Tanous         boost::beast::get_lowest_layer(adaptor).close();
9188ada3bcSV-Sanjana     }
9288ada3bcSV-Sanjana 
sendSSEHeader()936fde95faSEd Tanous     void sendSSEHeader()
9488ada3bcSV-Sanjana     {
9562598e31SEd Tanous         BMCWEB_LOG_DEBUG("Starting SSE connection");
968f79c5b6SEd Tanous 
976fde95faSEd Tanous         res.set(boost::beast::http::field::content_type, "text/event-stream");
988ece0e45SEd Tanous         boost::beast::http::response_serializer<BodyType>& serial =
998f79c5b6SEd Tanous             serializer.emplace(res);
10088ada3bcSV-Sanjana 
10188ada3bcSV-Sanjana         boost::beast::http::async_write_header(
1028ece0e45SEd Tanous             adaptor, serial,
10388ada3bcSV-Sanjana             std::bind_front(&ConnectionImpl::sendSSEHeaderCallback, this,
10488ada3bcSV-Sanjana                             shared_from_this()));
10588ada3bcSV-Sanjana     }
10688ada3bcSV-Sanjana 
sendSSEHeaderCallback(const std::shared_ptr<Connection> &,const boost::system::error_code & ec,size_t)10788ada3bcSV-Sanjana     void sendSSEHeaderCallback(const std::shared_ptr<Connection>& /*self*/,
1086fde95faSEd Tanous                                const boost::system::error_code& ec,
1096fde95faSEd Tanous                                size_t /*bytesSent*/)
11088ada3bcSV-Sanjana     {
1116fde95faSEd Tanous         serializer.reset();
11288ada3bcSV-Sanjana         if (ec)
11388ada3bcSV-Sanjana         {
11462598e31SEd Tanous             BMCWEB_LOG_ERROR("Error sending header{}", ec);
11588ada3bcSV-Sanjana             close("async_write_header failed");
11688ada3bcSV-Sanjana             return;
11788ada3bcSV-Sanjana         }
11862598e31SEd Tanous         BMCWEB_LOG_DEBUG("SSE header sent - Connection established");
11988ada3bcSV-Sanjana 
12088ada3bcSV-Sanjana         // SSE stream header sent, So let us setup monitor.
12188ada3bcSV-Sanjana         // Any read data on this stream will be error in case of SSE.
1228f79c5b6SEd Tanous         adaptor.async_read_some(boost::asio::buffer(buffer),
1236fde95faSEd Tanous                                 std::bind_front(&ConnectionImpl::afterReadError,
1246fde95faSEd Tanous                                                 this, shared_from_this()));
12588ada3bcSV-Sanjana     }
12688ada3bcSV-Sanjana 
afterReadError(const std::shared_ptr<Connection> &,const boost::system::error_code & ec,size_t bytesRead)1276fde95faSEd Tanous     void afterReadError(const std::shared_ptr<Connection>& /*self*/,
1288f79c5b6SEd Tanous                         const boost::system::error_code& ec, size_t bytesRead)
12988ada3bcSV-Sanjana     {
1308f79c5b6SEd Tanous         BMCWEB_LOG_DEBUG("Read {}", bytesRead);
1316fde95faSEd Tanous         if (ec == boost::asio::error::operation_aborted)
1326fde95faSEd Tanous         {
1336fde95faSEd Tanous             return;
13488ada3bcSV-Sanjana         }
13588ada3bcSV-Sanjana         if (ec)
13688ada3bcSV-Sanjana         {
13762598e31SEd Tanous             BMCWEB_LOG_ERROR("Read error: {}", ec);
13888ada3bcSV-Sanjana         }
13988ada3bcSV-Sanjana 
1406fde95faSEd Tanous         close("Close SSE connection");
14188ada3bcSV-Sanjana     }
14288ada3bcSV-Sanjana 
doWrite()14388ada3bcSV-Sanjana     void doWrite()
14488ada3bcSV-Sanjana     {
14588ada3bcSV-Sanjana         if (doingWrite)
14688ada3bcSV-Sanjana         {
14788ada3bcSV-Sanjana             return;
14888ada3bcSV-Sanjana         }
14988ada3bcSV-Sanjana         if (inputBuffer.size() == 0)
15088ada3bcSV-Sanjana         {
15162598e31SEd Tanous             BMCWEB_LOG_DEBUG("inputBuffer is empty... Bailing out");
15288ada3bcSV-Sanjana             return;
15388ada3bcSV-Sanjana         }
1546fde95faSEd Tanous         startTimeout();
15588ada3bcSV-Sanjana         doingWrite = true;
15688ada3bcSV-Sanjana 
15788ada3bcSV-Sanjana         adaptor.async_write_some(
15888ada3bcSV-Sanjana             inputBuffer.data(),
15988ada3bcSV-Sanjana             std::bind_front(&ConnectionImpl::doWriteCallback, this,
1608f79c5b6SEd Tanous                             shared_from_this()));
16188ada3bcSV-Sanjana     }
16288ada3bcSV-Sanjana 
doWriteCallback(const std::shared_ptr<Connection> &,const boost::beast::error_code & ec,size_t bytesTransferred)1638f79c5b6SEd Tanous     void doWriteCallback(const std::shared_ptr<Connection>& /*self*/,
16488ada3bcSV-Sanjana                          const boost::beast::error_code& ec,
1656fde95faSEd Tanous                          size_t bytesTransferred)
16688ada3bcSV-Sanjana     {
1676fde95faSEd Tanous         timer.cancel();
16888ada3bcSV-Sanjana         doingWrite = false;
16988ada3bcSV-Sanjana         inputBuffer.consume(bytesTransferred);
17088ada3bcSV-Sanjana 
17188ada3bcSV-Sanjana         if (ec == boost::asio::error::eof)
17288ada3bcSV-Sanjana         {
17362598e31SEd Tanous             BMCWEB_LOG_ERROR("async_write_some() SSE stream closed");
17488ada3bcSV-Sanjana             close("SSE stream closed");
17588ada3bcSV-Sanjana             return;
17688ada3bcSV-Sanjana         }
17788ada3bcSV-Sanjana 
17888ada3bcSV-Sanjana         if (ec)
17988ada3bcSV-Sanjana         {
18062598e31SEd Tanous             BMCWEB_LOG_ERROR("async_write_some() failed: {}", ec.message());
18188ada3bcSV-Sanjana             close("async_write_some failed");
18288ada3bcSV-Sanjana             return;
18388ada3bcSV-Sanjana         }
18462598e31SEd Tanous         BMCWEB_LOG_DEBUG("async_write_some() bytes transferred: {}",
18562598e31SEd Tanous                          bytesTransferred);
18688ada3bcSV-Sanjana 
18788ada3bcSV-Sanjana         doWrite();
18888ada3bcSV-Sanjana     }
18988ada3bcSV-Sanjana 
sendEvent(std::string_view id,std::string_view msg)19088ada3bcSV-Sanjana     void sendEvent(std::string_view id, std::string_view msg) override
19188ada3bcSV-Sanjana     {
19288ada3bcSV-Sanjana         if (msg.empty())
19388ada3bcSV-Sanjana         {
19462598e31SEd Tanous             BMCWEB_LOG_DEBUG("Empty data, bailing out.");
19588ada3bcSV-Sanjana             return;
19688ada3bcSV-Sanjana         }
19788ada3bcSV-Sanjana 
1986fde95faSEd Tanous         dataFormat(id, msg);
19988ada3bcSV-Sanjana 
20088ada3bcSV-Sanjana         doWrite();
20188ada3bcSV-Sanjana     }
20288ada3bcSV-Sanjana 
dataFormat(std::string_view id,std::string_view msg)2036fde95faSEd Tanous     void dataFormat(std::string_view id, std::string_view msg)
20488ada3bcSV-Sanjana     {
2058f79c5b6SEd Tanous         constexpr size_t bufferLimit = 10485760U; // 10MB
2068f79c5b6SEd Tanous         if (id.size() + msg.size() + inputBuffer.size() >= bufferLimit)
2078f79c5b6SEd Tanous         {
2088f79c5b6SEd Tanous             BMCWEB_LOG_ERROR("SSE Buffer overflow while waiting for client");
2098f79c5b6SEd Tanous             close("Buffer overflow");
2108f79c5b6SEd Tanous             return;
2118f79c5b6SEd Tanous         }
21288ada3bcSV-Sanjana         std::string rawData;
21388ada3bcSV-Sanjana         if (!id.empty())
21488ada3bcSV-Sanjana         {
21588ada3bcSV-Sanjana             rawData += "id: ";
2166fde95faSEd Tanous             rawData.append(id);
21788ada3bcSV-Sanjana             rawData += "\n";
21888ada3bcSV-Sanjana         }
21988ada3bcSV-Sanjana 
22088ada3bcSV-Sanjana         rawData += "data: ";
22188ada3bcSV-Sanjana         for (char character : msg)
22288ada3bcSV-Sanjana         {
22388ada3bcSV-Sanjana             rawData += character;
22488ada3bcSV-Sanjana             if (character == '\n')
22588ada3bcSV-Sanjana             {
22688ada3bcSV-Sanjana                 rawData += "data: ";
22788ada3bcSV-Sanjana             }
22888ada3bcSV-Sanjana         }
22988ada3bcSV-Sanjana         rawData += "\n\n";
23088ada3bcSV-Sanjana 
231*44106f34SEd Tanous         size_t copied = boost::asio::buffer_copy(
232*44106f34SEd Tanous             inputBuffer.prepare(rawData.size()), boost::asio::buffer(rawData));
233*44106f34SEd Tanous         inputBuffer.commit(copied);
23488ada3bcSV-Sanjana     }
23588ada3bcSV-Sanjana 
startTimeout()2366fde95faSEd Tanous     void startTimeout()
23788ada3bcSV-Sanjana     {
23888ada3bcSV-Sanjana         std::weak_ptr<Connection> weakSelf = weak_from_this();
23988ada3bcSV-Sanjana         timer.expires_after(std::chrono::seconds(30));
24088ada3bcSV-Sanjana         timer.async_wait(std::bind_front(&ConnectionImpl::onTimeoutCallback,
24188ada3bcSV-Sanjana                                          this, weak_from_this()));
24288ada3bcSV-Sanjana     }
24388ada3bcSV-Sanjana 
onTimeoutCallback(const std::weak_ptr<Connection> & weakSelf,const boost::system::error_code & ec)24488ada3bcSV-Sanjana     void onTimeoutCallback(const std::weak_ptr<Connection>& weakSelf,
2456fde95faSEd Tanous                            const boost::system::error_code& ec)
24688ada3bcSV-Sanjana     {
24788ada3bcSV-Sanjana         std::shared_ptr<Connection> self = weakSelf.lock();
24888ada3bcSV-Sanjana         if (!self)
24988ada3bcSV-Sanjana         {
25062598e31SEd Tanous             BMCWEB_LOG_CRITICAL("{} Failed to capture connection",
25162598e31SEd Tanous                                 logPtr(self.get()));
25288ada3bcSV-Sanjana             return;
25388ada3bcSV-Sanjana         }
25488ada3bcSV-Sanjana 
25588ada3bcSV-Sanjana         if (ec == boost::asio::error::operation_aborted)
25688ada3bcSV-Sanjana         {
2578f79c5b6SEd Tanous             BMCWEB_LOG_DEBUG("Timer operation aborted");
2588ece0e45SEd Tanous             // Canceled wait means the path succeeded.
25988ada3bcSV-Sanjana             return;
26088ada3bcSV-Sanjana         }
26188ada3bcSV-Sanjana         if (ec)
26288ada3bcSV-Sanjana         {
26362598e31SEd Tanous             BMCWEB_LOG_CRITICAL("{} timer failed {}", logPtr(self.get()), ec);
26488ada3bcSV-Sanjana         }
26588ada3bcSV-Sanjana 
26662598e31SEd Tanous         BMCWEB_LOG_WARNING("{} Connection timed out, closing",
26762598e31SEd Tanous                            logPtr(self.get()));
26888ada3bcSV-Sanjana 
26988ada3bcSV-Sanjana         self->close("closing connection");
27088ada3bcSV-Sanjana     }
27188ada3bcSV-Sanjana 
27288ada3bcSV-Sanjana   private:
2738f79c5b6SEd Tanous     std::array<char, 1> buffer{};
27488ada3bcSV-Sanjana     boost::beast::multi_buffer inputBuffer;
27588ada3bcSV-Sanjana 
2768f79c5b6SEd Tanous     Adaptor adaptor;
2778f79c5b6SEd Tanous 
278b2896149SEd Tanous     using BodyType = bmcweb::HttpBody;
2798f79c5b6SEd Tanous     boost::beast::http::response<BodyType> res;
2808f79c5b6SEd Tanous     std::optional<boost::beast::http::response_serializer<BodyType>> serializer;
2816fde95faSEd Tanous     boost::asio::steady_timer timer;
28288ada3bcSV-Sanjana     bool doingWrite = false;
28388ada3bcSV-Sanjana 
2846fde95faSEd Tanous     std::function<void(Connection&)> openHandler;
2856fde95faSEd Tanous     std::function<void(Connection&)> closeHandler;
28688ada3bcSV-Sanjana };
28788ada3bcSV-Sanjana } // namespace sse_socket
28888ada3bcSV-Sanjana } // namespace crow
289