xref: /openbmc/sdbusplus/src/async/match.cpp (revision 77b8aac3abf614331e5ec2b50d6f5bf465cb4433)
1 #include <sdbusplus/async/match.hpp>
2 
3 namespace sdbusplus::async
4 {
5 
6 match::match(context& ctx, const std::string_view& pattern)
7 {
8     // C-style callback to redirect into this::handle_match.
9     static auto match_cb = [](message::msgp_t msg, void* ctx,
10                               sd_bus_error*) noexcept {
11         static_cast<match*>(ctx)->handle_match(message_t{msg});
12         return 0;
13     };
14 
15     sd_bus_slot* s = nullptr;
16     auto r = sd_bus_add_match(get_busp(ctx.get_bus()), &s, pattern.data(),
17                               match_cb, this);
18     if (r < 0)
19     {
20         throw exception::SdBusError(-r, "sd_bus_add_match (async::match)");
21     }
22 
23     slot = std::move(s);
24 }
25 
26 match::~match()
27 {
28     match_ns::match_completion* c = nullptr;
29 
30     {
31         std::lock_guard l{lock};
32         c = std::exchange(complete, nullptr);
33     }
34 
35     if (c)
36     {
37         c->stop();
38     }
39 }
40 
41 void match_ns::match_completion::arm() noexcept
42 {
43     // Set ourselves as the awaiting Receiver and see if there is a message
44     // to immediately complete on.
45 
46     std::unique_lock lock{m.lock};
47 
48     if (std::exchange(m.complete, this) != nullptr)
49     {
50         // We do not support two awaiters; throw exception.  Since we are in
51         // a noexcept context this will std::terminate anyhow, which is
52         // approximately the same as 'assert' but with better information.
53         try
54         {
55             throw std::logic_error(
56                 "match_completion started with another await already pending!");
57         }
58         catch (...)
59         {
60             std::terminate();
61         }
62     }
63 
64     m.handle_completion(std::move(lock));
65 }
66 
67 void match::handle_match(message_t&& msg) noexcept
68 {
69     // Insert the message into the queue and see if there is a pair ready for
70     // completion (Receiver + message).
71     std::unique_lock l{lock};
72     queue.emplace(std::move(msg));
73     handle_completion(std::move(l));
74 }
75 
76 void match::handle_completion(std::unique_lock<std::mutex>&& l) noexcept
77 {
78     auto lock = std::move(l);
79 
80     // If there is no match_completion, there is no awaiting Receiver.
81     // If the queue is empty, there is no message waiting, so the waiting
82     // Receiver isn't complete.
83     if ((complete == nullptr) || queue.empty())
84     {
85         return;
86     }
87 
88     // Get the waiting completion and message.
89     auto c = std::exchange(complete, nullptr);
90     auto msg = std::move(queue.front());
91     queue.pop();
92 
93     // Unlock before calling complete because the completed task may run and
94     // attempt to complete on the next event (and thus deadlock).
95     lock.unlock();
96 
97     // Signal completion.
98     c->complete(std::move(msg));
99 }
100 
101 } // namespace sdbusplus::async
102