1 /*
2  * Copyright (c) Dmitiy V'jukov
3  * Copyright (c) 2024 Maikel Nadolski
4  * Copyright (c) 2024 NVIDIA Corporation
5  *
6  * Licensed under the Apache License Version 2.0 with LLVM Exceptions
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  *   https://llvm.org/LICENSE.txt
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 // general design of this MPSC queue is taken from
20 // https://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
21 
22 #pragma once
23 
24 #include <sdbusplus/async/stdexec/__spin_loop_pause.hpp>
25 
26 #include <atomic>
27 
28 namespace stdexec
29 {
30 template <auto _Ptr>
31 class __intrusive_mpsc_queue;
32 
33 template <class _Node, std::atomic<void*> _Node::*_Next>
34 class __intrusive_mpsc_queue<_Next>
35 {
36     std::atomic<void*> __back_{&__nil_};
37     void* __front_{&__nil_};
38     std::atomic<_Node*> __nil_ = nullptr;
39 
push_back_nil()40     void push_back_nil()
41     {
42         __nil_.store(nullptr, std::memory_order_relaxed);
43         _Node* __prev = static_cast<_Node*>(
44             __back_.exchange(&__nil_, std::memory_order_acq_rel));
45         (__prev->*_Next).store(&__nil_, std::memory_order_release);
46     }
47 
48   public:
push_back(_Node * __new_node)49     bool push_back(_Node* __new_node) noexcept
50     {
51         (__new_node->*_Next).store(nullptr, std::memory_order_relaxed);
52         void* __prev_back =
53             __back_.exchange(__new_node, std::memory_order_acq_rel);
54         bool __is_nil = __prev_back == static_cast<void*>(&__nil_);
55         if (__is_nil)
56         {
57             __nil_.store(__new_node, std::memory_order_release);
58         }
59         else
60         {
61             (static_cast<_Node*>(__prev_back)->*_Next)
62                 .store(__new_node, std::memory_order_release);
63         }
64         return __is_nil;
65     }
66 
pop_front()67     _Node* pop_front() noexcept
68     {
69         if (__front_ == static_cast<void*>(&__nil_))
70         {
71             _Node* __next = __nil_.load(std::memory_order_acquire);
72             if (!__next)
73             {
74                 return nullptr;
75             }
76             __front_ = __next;
77         }
78         _Node* __front = static_cast<_Node*>(__front_);
79         void* __next = (__front->*_Next).load(std::memory_order_acquire);
80         if (__next)
81         {
82             __front_ = __next;
83             return __front;
84         }
85         STDEXEC_ASSERT(!__next);
86         push_back_nil();
87         do
88         {
89             __spin_loop_pause();
90             __next = (__front->*_Next).load(std::memory_order_acquire);
91         } while (!__next);
92         __front_ = __next;
93         return __front;
94     }
95 };
96 } // namespace stdexec
97