1 #include <sdbusplus/async/match.hpp> 2 3 namespace sdbusplus::async 4 { 5 6 match::match(context& ctx, const std::string_view& pattern) 7 { 8 // C-style callback to redirect into this::handle_match. 9 static auto match_cb = [](message::msgp_t msg, void* ctx, 10 sd_bus_error*) noexcept { 11 static_cast<match*>(ctx)->handle_match(message_t{msg}); 12 return 0; 13 }; 14 15 sd_bus_slot* s = nullptr; 16 auto r = sd_bus_add_match(get_busp(ctx.get_bus()), &s, pattern.data(), 17 match_cb, this); 18 if (r < 0) 19 { 20 throw exception::SdBusError(-r, "sd_bus_add_match (async::match)"); 21 } 22 23 slot = std::move(s); 24 } 25 26 void match_ns::match_completion::arm() noexcept 27 { 28 // Set ourselves as the awaiting Receiver and see if there is a message 29 // to immediately complete on. 30 31 std::unique_lock lock{m.lock}; 32 33 if (std::exchange(m.complete, this) != nullptr) 34 { 35 // We do not support two awaiters; throw exception. Since we are in 36 // a noexcept context this will std::terminate anyhow, which is 37 // approximately the same as 'assert' but with better information. 38 try 39 { 40 throw std::logic_error( 41 "match_completion started with another await already pending!"); 42 } 43 catch (...) 44 { 45 std::terminate(); 46 } 47 } 48 49 m.handle_completion(std::move(lock)); 50 } 51 52 void match::handle_match(message_t&& msg) noexcept 53 { 54 // Insert the message into the queue and see if there is a pair ready for 55 // completion (Receiver + message). 56 std::unique_lock l{lock}; 57 queue.emplace(std::move(msg)); 58 handle_completion(std::move(l)); 59 } 60 61 void match::handle_completion(std::unique_lock<std::mutex>&& l) noexcept 62 { 63 auto lock = std::move(l); 64 65 // If there is no match_completion, there is no awaiting Receiver. 66 // If the queue is empty, there is no message waiting, so the waiting 67 // Receiver isn't complete. 68 if ((complete == nullptr) || queue.empty()) 69 { 70 return; 71 } 72 73 // Get the waiting completion and message. 74 auto c = std::exchange(complete, nullptr); 75 auto msg = std::move(queue.front()); 76 queue.pop(); 77 78 // Unlock before calling complete because the completed task may run and 79 // attempt to complete on the next event (and thus deadlock). 80 lock.unlock(); 81 82 // Signal completion. 83 c->complete(std::move(msg)); 84 } 85 86 } // namespace sdbusplus::async 87