xref: /openbmc/bmcweb/http/server_sent_event.hpp (revision 6fde95fa)
188ada3bcSV-Sanjana #pragma once
2*6fde95faSEd Tanous #include "dbus_singleton.hpp"
388ada3bcSV-Sanjana #include "http_request.hpp"
488ada3bcSV-Sanjana #include "http_response.hpp"
588ada3bcSV-Sanjana 
688ada3bcSV-Sanjana #include <boost/algorithm/string/predicate.hpp>
788ada3bcSV-Sanjana #include <boost/asio/buffer.hpp>
888ada3bcSV-Sanjana #include <boost/asio/steady_timer.hpp>
988ada3bcSV-Sanjana #include <boost/beast/core/multi_buffer.hpp>
1088ada3bcSV-Sanjana #include <boost/beast/http/buffer_body.hpp>
1188ada3bcSV-Sanjana #include <boost/beast/websocket.hpp>
1288ada3bcSV-Sanjana 
1388ada3bcSV-Sanjana #include <array>
1488ada3bcSV-Sanjana #include <functional>
1588ada3bcSV-Sanjana 
1688ada3bcSV-Sanjana #ifdef BMCWEB_ENABLE_SSL
1788ada3bcSV-Sanjana #include <boost/beast/websocket/ssl.hpp>
1888ada3bcSV-Sanjana #endif
1988ada3bcSV-Sanjana 
2088ada3bcSV-Sanjana namespace crow
2188ada3bcSV-Sanjana {
2288ada3bcSV-Sanjana 
2388ada3bcSV-Sanjana namespace sse_socket
2488ada3bcSV-Sanjana {
2588ada3bcSV-Sanjana struct Connection : std::enable_shared_from_this<Connection>
2688ada3bcSV-Sanjana {
2788ada3bcSV-Sanjana   public:
28*6fde95faSEd Tanous     Connection() = default;
2988ada3bcSV-Sanjana 
3088ada3bcSV-Sanjana     Connection(const Connection&) = delete;
3188ada3bcSV-Sanjana     Connection(Connection&&) = delete;
3288ada3bcSV-Sanjana     Connection& operator=(const Connection&) = delete;
3388ada3bcSV-Sanjana     Connection& operator=(const Connection&&) = delete;
3488ada3bcSV-Sanjana     virtual ~Connection() = default;
3588ada3bcSV-Sanjana 
3688ada3bcSV-Sanjana     virtual boost::asio::io_context& getIoContext() = 0;
3788ada3bcSV-Sanjana     virtual void close(std::string_view msg = "quit") = 0;
3888ada3bcSV-Sanjana     virtual void sendEvent(std::string_view id, std::string_view msg) = 0;
3988ada3bcSV-Sanjana };
4088ada3bcSV-Sanjana 
4188ada3bcSV-Sanjana template <typename Adaptor>
4288ada3bcSV-Sanjana class ConnectionImpl : public Connection
4388ada3bcSV-Sanjana {
4488ada3bcSV-Sanjana   public:
45*6fde95faSEd Tanous     ConnectionImpl(Adaptor&& adaptorIn,
46*6fde95faSEd Tanous                    std::function<void(Connection&)> openHandlerIn,
47*6fde95faSEd Tanous                    std::function<void(Connection&)> closeHandlerIn) :
48*6fde95faSEd Tanous         adaptor(std::move(adaptorIn)),
49*6fde95faSEd Tanous         timer(ioc), openHandler(std::move(openHandlerIn)),
5088ada3bcSV-Sanjana         closeHandler(std::move(closeHandlerIn))
5188ada3bcSV-Sanjana     {
5288ada3bcSV-Sanjana         BMCWEB_LOG_DEBUG << "SseConnectionImpl: SSE constructor " << this;
5388ada3bcSV-Sanjana     }
5488ada3bcSV-Sanjana 
5588ada3bcSV-Sanjana     ConnectionImpl(const ConnectionImpl&) = delete;
5688ada3bcSV-Sanjana     ConnectionImpl(const ConnectionImpl&&) = delete;
5788ada3bcSV-Sanjana     ConnectionImpl& operator=(const ConnectionImpl&) = delete;
5888ada3bcSV-Sanjana     ConnectionImpl& operator=(const ConnectionImpl&&) = delete;
5988ada3bcSV-Sanjana 
6088ada3bcSV-Sanjana     ~ConnectionImpl() override
6188ada3bcSV-Sanjana     {
6288ada3bcSV-Sanjana         BMCWEB_LOG_DEBUG << "SSE ConnectionImpl: SSE destructor " << this;
6388ada3bcSV-Sanjana     }
6488ada3bcSV-Sanjana 
6588ada3bcSV-Sanjana     boost::asio::io_context& getIoContext() override
6688ada3bcSV-Sanjana     {
6788ada3bcSV-Sanjana         return static_cast<boost::asio::io_context&>(
6888ada3bcSV-Sanjana             adaptor.get_executor().context());
6988ada3bcSV-Sanjana     }
7088ada3bcSV-Sanjana 
7188ada3bcSV-Sanjana     void start()
7288ada3bcSV-Sanjana     {
73*6fde95faSEd Tanous         if (!openHandler)
7488ada3bcSV-Sanjana         {
75*6fde95faSEd Tanous             BMCWEB_LOG_CRITICAL << "No open handler???";
76*6fde95faSEd Tanous             return;
7788ada3bcSV-Sanjana         }
78*6fde95faSEd Tanous         openHandler(*this);
7988ada3bcSV-Sanjana     }
8088ada3bcSV-Sanjana 
8188ada3bcSV-Sanjana     void close(const std::string_view msg) override
8288ada3bcSV-Sanjana     {
8388ada3bcSV-Sanjana         // send notification to handler for cleanup
8488ada3bcSV-Sanjana         if (closeHandler)
8588ada3bcSV-Sanjana         {
86*6fde95faSEd Tanous             closeHandler(*this);
8788ada3bcSV-Sanjana         }
88*6fde95faSEd Tanous         BMCWEB_LOG_DEBUG << "Closing SSE connection " << this << " - " << msg;
89*6fde95faSEd Tanous         boost::beast::get_lowest_layer(adaptor).close();
9088ada3bcSV-Sanjana     }
9188ada3bcSV-Sanjana 
92*6fde95faSEd Tanous     void sendSSEHeader()
9388ada3bcSV-Sanjana     {
9488ada3bcSV-Sanjana         BMCWEB_LOG_DEBUG << "Starting SSE connection";
9588ada3bcSV-Sanjana         using BodyType = boost::beast::http::buffer_body;
96*6fde95faSEd Tanous         boost::beast::http::response<BodyType> res(
97*6fde95faSEd Tanous             boost::beast::http::status::ok, 11, BodyType{});
98*6fde95faSEd Tanous         res.set(boost::beast::http::field::content_type, "text/event-stream");
99*6fde95faSEd Tanous         res.body().more = true;
100*6fde95faSEd Tanous         boost::beast::http::response_serializer<BodyType>& ser =
101*6fde95faSEd Tanous             serializer.emplace(std::move(res));
10288ada3bcSV-Sanjana 
10388ada3bcSV-Sanjana         boost::beast::http::async_write_header(
104*6fde95faSEd Tanous             adaptor, ser,
10588ada3bcSV-Sanjana             std::bind_front(&ConnectionImpl::sendSSEHeaderCallback, this,
10688ada3bcSV-Sanjana                             shared_from_this()));
10788ada3bcSV-Sanjana     }
10888ada3bcSV-Sanjana 
10988ada3bcSV-Sanjana     void sendSSEHeaderCallback(const std::shared_ptr<Connection>& /*self*/,
110*6fde95faSEd Tanous                                const boost::system::error_code& ec,
111*6fde95faSEd Tanous                                size_t /*bytesSent*/)
11288ada3bcSV-Sanjana     {
113*6fde95faSEd Tanous         serializer.reset();
11488ada3bcSV-Sanjana         if (ec)
11588ada3bcSV-Sanjana         {
11688ada3bcSV-Sanjana             BMCWEB_LOG_ERROR << "Error sending header" << ec;
11788ada3bcSV-Sanjana             close("async_write_header failed");
11888ada3bcSV-Sanjana             return;
11988ada3bcSV-Sanjana         }
12088ada3bcSV-Sanjana         BMCWEB_LOG_DEBUG << "SSE header sent - Connection established";
12188ada3bcSV-Sanjana 
12288ada3bcSV-Sanjana         serializer.reset();
12388ada3bcSV-Sanjana 
12488ada3bcSV-Sanjana         // SSE stream header sent, So let us setup monitor.
12588ada3bcSV-Sanjana         // Any read data on this stream will be error in case of SSE.
126*6fde95faSEd Tanous 
127*6fde95faSEd Tanous         adaptor.async_wait(boost::asio::ip::tcp::socket::wait_error,
128*6fde95faSEd Tanous                            std::bind_front(&ConnectionImpl::afterReadError,
129*6fde95faSEd Tanous                                            this, shared_from_this()));
13088ada3bcSV-Sanjana     }
13188ada3bcSV-Sanjana 
132*6fde95faSEd Tanous     void afterReadError(const std::shared_ptr<Connection>& /*self*/,
133*6fde95faSEd Tanous                         const boost::system::error_code& ec)
13488ada3bcSV-Sanjana     {
135*6fde95faSEd Tanous         if (ec == boost::asio::error::operation_aborted)
136*6fde95faSEd Tanous         {
137*6fde95faSEd Tanous             return;
13888ada3bcSV-Sanjana         }
13988ada3bcSV-Sanjana         if (ec)
14088ada3bcSV-Sanjana         {
14188ada3bcSV-Sanjana             BMCWEB_LOG_ERROR << "Read error: " << ec;
14288ada3bcSV-Sanjana         }
14388ada3bcSV-Sanjana 
144*6fde95faSEd Tanous         close("Close SSE connection");
14588ada3bcSV-Sanjana     }
14688ada3bcSV-Sanjana 
14788ada3bcSV-Sanjana     void doWrite()
14888ada3bcSV-Sanjana     {
14988ada3bcSV-Sanjana         if (doingWrite)
15088ada3bcSV-Sanjana         {
15188ada3bcSV-Sanjana             return;
15288ada3bcSV-Sanjana         }
15388ada3bcSV-Sanjana         if (inputBuffer.size() == 0)
15488ada3bcSV-Sanjana         {
15588ada3bcSV-Sanjana             BMCWEB_LOG_DEBUG << "inputBuffer is empty... Bailing out";
15688ada3bcSV-Sanjana             return;
15788ada3bcSV-Sanjana         }
158*6fde95faSEd Tanous         startTimeout();
15988ada3bcSV-Sanjana         doingWrite = true;
16088ada3bcSV-Sanjana 
16188ada3bcSV-Sanjana         adaptor.async_write_some(
16288ada3bcSV-Sanjana             inputBuffer.data(),
16388ada3bcSV-Sanjana             std::bind_front(&ConnectionImpl::doWriteCallback, this,
164*6fde95faSEd Tanous                             weak_from_this()));
16588ada3bcSV-Sanjana     }
16688ada3bcSV-Sanjana 
167*6fde95faSEd Tanous     void doWriteCallback(const std::weak_ptr<Connection>& weak,
16888ada3bcSV-Sanjana                          const boost::beast::error_code& ec,
169*6fde95faSEd Tanous                          size_t bytesTransferred)
17088ada3bcSV-Sanjana     {
171*6fde95faSEd Tanous         auto self = weak.lock();
172*6fde95faSEd Tanous         if (self == nullptr)
173*6fde95faSEd Tanous         {
174*6fde95faSEd Tanous             return;
175*6fde95faSEd Tanous         }
176*6fde95faSEd Tanous         timer.cancel();
17788ada3bcSV-Sanjana         doingWrite = false;
17888ada3bcSV-Sanjana         inputBuffer.consume(bytesTransferred);
17988ada3bcSV-Sanjana 
18088ada3bcSV-Sanjana         if (ec == boost::asio::error::eof)
18188ada3bcSV-Sanjana         {
18288ada3bcSV-Sanjana             BMCWEB_LOG_ERROR << "async_write_some() SSE stream closed";
18388ada3bcSV-Sanjana             close("SSE stream closed");
18488ada3bcSV-Sanjana             return;
18588ada3bcSV-Sanjana         }
18688ada3bcSV-Sanjana 
18788ada3bcSV-Sanjana         if (ec)
18888ada3bcSV-Sanjana         {
18988ada3bcSV-Sanjana             BMCWEB_LOG_ERROR << "async_write_some() failed: " << ec.message();
19088ada3bcSV-Sanjana             close("async_write_some failed");
19188ada3bcSV-Sanjana             return;
19288ada3bcSV-Sanjana         }
19388ada3bcSV-Sanjana         BMCWEB_LOG_DEBUG << "async_write_some() bytes transferred: "
19488ada3bcSV-Sanjana                          << bytesTransferred;
19588ada3bcSV-Sanjana 
19688ada3bcSV-Sanjana         doWrite();
19788ada3bcSV-Sanjana     }
19888ada3bcSV-Sanjana 
19988ada3bcSV-Sanjana     void sendEvent(std::string_view id, std::string_view msg) override
20088ada3bcSV-Sanjana     {
20188ada3bcSV-Sanjana         if (msg.empty())
20288ada3bcSV-Sanjana         {
20388ada3bcSV-Sanjana             BMCWEB_LOG_DEBUG << "Empty data, bailing out.";
20488ada3bcSV-Sanjana             return;
20588ada3bcSV-Sanjana         }
20688ada3bcSV-Sanjana 
207*6fde95faSEd Tanous         dataFormat(id, msg);
20888ada3bcSV-Sanjana 
20988ada3bcSV-Sanjana         doWrite();
21088ada3bcSV-Sanjana     }
21188ada3bcSV-Sanjana 
212*6fde95faSEd Tanous     void dataFormat(std::string_view id, std::string_view msg)
21388ada3bcSV-Sanjana     {
21488ada3bcSV-Sanjana         std::string rawData;
21588ada3bcSV-Sanjana         if (!id.empty())
21688ada3bcSV-Sanjana         {
21788ada3bcSV-Sanjana             rawData += "id: ";
218*6fde95faSEd Tanous             rawData.append(id);
21988ada3bcSV-Sanjana             rawData += "\n";
22088ada3bcSV-Sanjana         }
22188ada3bcSV-Sanjana 
22288ada3bcSV-Sanjana         rawData += "data: ";
22388ada3bcSV-Sanjana         for (char character : msg)
22488ada3bcSV-Sanjana         {
22588ada3bcSV-Sanjana             rawData += character;
22688ada3bcSV-Sanjana             if (character == '\n')
22788ada3bcSV-Sanjana             {
22888ada3bcSV-Sanjana                 rawData += "data: ";
22988ada3bcSV-Sanjana             }
23088ada3bcSV-Sanjana         }
23188ada3bcSV-Sanjana         rawData += "\n\n";
23288ada3bcSV-Sanjana 
23388ada3bcSV-Sanjana         boost::asio::buffer_copy(inputBuffer.prepare(rawData.size()),
23488ada3bcSV-Sanjana                                  boost::asio::buffer(rawData));
23588ada3bcSV-Sanjana         inputBuffer.commit(rawData.size());
23688ada3bcSV-Sanjana     }
23788ada3bcSV-Sanjana 
238*6fde95faSEd Tanous     void startTimeout()
23988ada3bcSV-Sanjana     {
24088ada3bcSV-Sanjana         std::weak_ptr<Connection> weakSelf = weak_from_this();
24188ada3bcSV-Sanjana         timer.expires_after(std::chrono::seconds(30));
24288ada3bcSV-Sanjana         timer.async_wait(std::bind_front(&ConnectionImpl::onTimeoutCallback,
24388ada3bcSV-Sanjana                                          this, weak_from_this()));
24488ada3bcSV-Sanjana     }
24588ada3bcSV-Sanjana 
24688ada3bcSV-Sanjana     void onTimeoutCallback(const std::weak_ptr<Connection>& weakSelf,
247*6fde95faSEd Tanous                            const boost::system::error_code& ec)
24888ada3bcSV-Sanjana     {
24988ada3bcSV-Sanjana         std::shared_ptr<Connection> self = weakSelf.lock();
25088ada3bcSV-Sanjana         if (!self)
25188ada3bcSV-Sanjana         {
25288ada3bcSV-Sanjana             BMCWEB_LOG_CRITICAL << self << " Failed to capture connection";
25388ada3bcSV-Sanjana             return;
25488ada3bcSV-Sanjana         }
25588ada3bcSV-Sanjana 
25688ada3bcSV-Sanjana         if (ec == boost::asio::error::operation_aborted)
25788ada3bcSV-Sanjana         {
25888ada3bcSV-Sanjana             BMCWEB_LOG_DEBUG << "operation aborted";
25988ada3bcSV-Sanjana             // Canceled wait means the path succeeeded.
26088ada3bcSV-Sanjana             return;
26188ada3bcSV-Sanjana         }
26288ada3bcSV-Sanjana         if (ec)
26388ada3bcSV-Sanjana         {
26488ada3bcSV-Sanjana             BMCWEB_LOG_CRITICAL << self << " timer failed " << ec;
26588ada3bcSV-Sanjana         }
26688ada3bcSV-Sanjana 
26788ada3bcSV-Sanjana         BMCWEB_LOG_WARNING << self << "Connection timed out, closing";
26888ada3bcSV-Sanjana 
26988ada3bcSV-Sanjana         self->close("closing connection");
27088ada3bcSV-Sanjana     }
27188ada3bcSV-Sanjana 
27288ada3bcSV-Sanjana   private:
27388ada3bcSV-Sanjana     Adaptor adaptor;
27488ada3bcSV-Sanjana 
27588ada3bcSV-Sanjana     boost::beast::multi_buffer inputBuffer;
27688ada3bcSV-Sanjana 
27788ada3bcSV-Sanjana     std::optional<boost::beast::http::response_serializer<
278*6fde95faSEd Tanous         boost::beast::http::buffer_body>>
27988ada3bcSV-Sanjana         serializer;
28088ada3bcSV-Sanjana     boost::asio::io_context& ioc =
28188ada3bcSV-Sanjana         crow::connections::systemBus->get_io_context();
282*6fde95faSEd Tanous     boost::asio::steady_timer timer;
28388ada3bcSV-Sanjana     bool doingWrite = false;
28488ada3bcSV-Sanjana 
285*6fde95faSEd Tanous     std::function<void(Connection&)> openHandler;
286*6fde95faSEd Tanous     std::function<void(Connection&)> closeHandler;
28788ada3bcSV-Sanjana };
28888ada3bcSV-Sanjana } // namespace sse_socket
28988ada3bcSV-Sanjana } // namespace crow
290