xref: /openbmc/sdbusplus/include/sdbusplus/async/stdexec/__detail/__intrusive_mpsc_queue.hpp (revision 10d0b4b7d1498cfd5c3d37edea271a54d1984e41)
1 /*
2  * Copyright (c) Dmitiy V'jukov
3  * Copyright (c) 2024 Maikel Nadolski
4  * Copyright (c) 2024 NVIDIA Corporation
5  *
6  * Licensed under the Apache License Version 2.0 with LLVM Exceptions
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  *   https://llvm.org/LICENSE.txt
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 // general design of this MPSC queue is taken from
20 // https://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
21 
22 #pragma once
23 
24 
25 #include <atomic>
26 
27 #include <sdbusplus/async/stdexec/__spin_loop_pause.hpp>
28 
29 namespace stdexec {
30   template <auto _Ptr>
31   class __intrusive_mpsc_queue;
32 
33   template <class _Node, std::atomic<void*> _Node::* _Next>
34   class __intrusive_mpsc_queue<_Next> {
35     std::atomic<void*> __back_{&__nil_};
36     void* __front_{&__nil_};
37     std::atomic<_Node*> __nil_ = nullptr;
38 
push_back_nil()39     void push_back_nil() {
40       __nil_.store(nullptr, std::memory_order_relaxed);
41       auto* __prev = static_cast<_Node*>(__back_.exchange(&__nil_, std::memory_order_acq_rel));
42       (__prev->*_Next).store(&__nil_, std::memory_order_release);
43     }
44 
45    public:
push_back(_Node * __new_node)46     auto push_back(_Node* __new_node) noexcept -> bool {
47       (__new_node->*_Next).store(nullptr, std::memory_order_relaxed);
48       void* __prev_back = __back_.exchange(__new_node, std::memory_order_acq_rel);
49       bool __is_nil = __prev_back == static_cast<void*>(&__nil_);
50       if (__is_nil) {
51         __nil_.store(__new_node, std::memory_order_release);
52       } else {
53         (static_cast<_Node*>(__prev_back)->*_Next).store(__new_node, std::memory_order_release);
54       }
55       return __is_nil;
56     }
57 
pop_front()58     auto pop_front() noexcept -> _Node* {
59       if (__front_ == static_cast<void*>(&__nil_)) {
60         _Node* __next = __nil_.load(std::memory_order_acquire);
61         if (!__next) {
62           return nullptr;
63         }
64         __front_ = __next;
65       }
66       auto* __front = static_cast<_Node*>(__front_);
67       void* __next = (__front->*_Next).load(std::memory_order_acquire);
68       if (__next) {
69         __front_ = __next;
70         return __front;
71       }
72       STDEXEC_ASSERT(!__next);
73       push_back_nil();
74       do {
75         __spin_loop_pause();
76         __next = (__front->*_Next).load(std::memory_order_acquire);
77       } while (!__next);
78       __front_ = __next;
79       return __front;
80     }
81   };
82 } // namespace stdexec