xref: /openbmc/qemu/util/rcu.c (revision 2822c1b6)
1 /*
2  * urcu-mb.c
3  *
4  * Userspace RCU library with explicit memory barriers
5  *
6  * Copyright (c) 2009 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
7  * Copyright (c) 2009 Paul E. McKenney, IBM Corporation.
8  * Copyright 2015 Red Hat, Inc.
9  *
10  * Ported to QEMU by Paolo Bonzini  <pbonzini@redhat.com>
11  *
12  * This library is free software; you can redistribute it and/or
13  * modify it under the terms of the GNU Lesser General Public
14  * License as published by the Free Software Foundation; either
15  * version 2.1 of the License, or (at your option) any later version.
16  *
17  * This library is distributed in the hope that it will be useful,
18  * but WITHOUT ANY WARRANTY; without even the implied warranty of
19  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
20  * Lesser General Public License for more details.
21  *
22  * You should have received a copy of the GNU Lesser General Public
23  * License along with this library; if not, write to the Free Software
24  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
25  *
26  * IBM's contributions to this file may be relicensed under LGPLv2 or later.
27  */
28 
29 #include "qemu-common.h"
30 #include <stdio.h>
31 #include <assert.h>
32 #include <stdlib.h>
33 #include <stdint.h>
34 #include <errno.h>
35 #include "qemu/rcu.h"
36 #include "qemu/atomic.h"
37 #include "qemu/thread.h"
38 
39 /*
40  * Global grace period counter.  Bit 0 is always one in rcu_gp_ctr.
41  * Bits 1 and above are defined in synchronize_rcu.
42  */
43 #define RCU_GP_LOCKED           (1UL << 0)
44 #define RCU_GP_CTR              (1UL << 1)
45 
46 unsigned long rcu_gp_ctr = RCU_GP_LOCKED;
47 
48 QemuEvent rcu_gp_event;
49 static QemuMutex rcu_gp_lock;
50 
51 /*
52  * Check whether a quiescent state was crossed between the beginning of
53  * update_counter_and_wait and now.
54  */
55 static inline int rcu_gp_ongoing(unsigned long *ctr)
56 {
57     unsigned long v;
58 
59     v = atomic_read(ctr);
60     return v && (v != rcu_gp_ctr);
61 }
62 
63 /* Written to only by each individual reader. Read by both the reader and the
64  * writers.
65  */
66 __thread struct rcu_reader_data rcu_reader;
67 
68 /* Protected by rcu_gp_lock.  */
69 typedef QLIST_HEAD(, rcu_reader_data) ThreadList;
70 static ThreadList registry = QLIST_HEAD_INITIALIZER(registry);
71 
72 /* Wait for previous parity/grace period to be empty of readers.  */
73 static void wait_for_readers(void)
74 {
75     ThreadList qsreaders = QLIST_HEAD_INITIALIZER(qsreaders);
76     struct rcu_reader_data *index, *tmp;
77 
78     for (;;) {
79         /* We want to be notified of changes made to rcu_gp_ongoing
80          * while we walk the list.
81          */
82         qemu_event_reset(&rcu_gp_event);
83 
84         /* Instead of using atomic_mb_set for index->waiting, and
85          * atomic_mb_read for index->ctr, memory barriers are placed
86          * manually since writes to different threads are independent.
87          * atomic_mb_set has a smp_wmb before...
88          */
89         smp_wmb();
90         QLIST_FOREACH(index, &registry, node) {
91             atomic_set(&index->waiting, true);
92         }
93 
94         /* ... and a smp_mb after.  */
95         smp_mb();
96 
97         QLIST_FOREACH_SAFE(index, &registry, node, tmp) {
98             if (!rcu_gp_ongoing(&index->ctr)) {
99                 QLIST_REMOVE(index, node);
100                 QLIST_INSERT_HEAD(&qsreaders, index, node);
101 
102                 /* No need for mb_set here, worst of all we
103                  * get some extra futex wakeups.
104                  */
105                 atomic_set(&index->waiting, false);
106             }
107         }
108 
109         /* atomic_mb_read has smp_rmb after.  */
110         smp_rmb();
111 
112         if (QLIST_EMPTY(&registry)) {
113             break;
114         }
115 
116         /* Wait for one thread to report a quiescent state and
117          * try again.
118          */
119         qemu_event_wait(&rcu_gp_event);
120     }
121 
122     /* put back the reader list in the registry */
123     QLIST_SWAP(&registry, &qsreaders, node);
124 }
125 
126 void synchronize_rcu(void)
127 {
128     qemu_mutex_lock(&rcu_gp_lock);
129 
130     if (!QLIST_EMPTY(&registry)) {
131         /* In either case, the atomic_mb_set below blocks stores that free
132          * old RCU-protected pointers.
133          */
134         if (sizeof(rcu_gp_ctr) < 8) {
135             /* For architectures with 32-bit longs, a two-subphases algorithm
136              * ensures we do not encounter overflow bugs.
137              *
138              * Switch parity: 0 -> 1, 1 -> 0.
139              */
140             atomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR);
141             wait_for_readers();
142             atomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR);
143         } else {
144             /* Increment current grace period.  */
145             atomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr + RCU_GP_CTR);
146         }
147 
148         wait_for_readers();
149     }
150 
151     qemu_mutex_unlock(&rcu_gp_lock);
152 }
153 
154 
155 #define RCU_CALL_MIN_SIZE        30
156 
157 /* Multi-producer, single-consumer queue based on urcu/static/wfqueue.h
158  * from liburcu.  Note that head is only used by the consumer.
159  */
160 static struct rcu_head dummy;
161 static struct rcu_head *head = &dummy, **tail = &dummy.next;
162 static int rcu_call_count;
163 static QemuEvent rcu_call_ready_event;
164 
165 static void enqueue(struct rcu_head *node)
166 {
167     struct rcu_head **old_tail;
168 
169     node->next = NULL;
170     old_tail = atomic_xchg(&tail, &node->next);
171     atomic_mb_set(old_tail, node);
172 }
173 
174 static struct rcu_head *try_dequeue(void)
175 {
176     struct rcu_head *node, *next;
177 
178 retry:
179     /* Test for an empty list, which we do not expect.  Note that for
180      * the consumer head and tail are always consistent.  The head
181      * is consistent because only the consumer reads/writes it.
182      * The tail, because it is the first step in the enqueuing.
183      * It is only the next pointers that might be inconsistent.
184      */
185     if (head == &dummy && atomic_mb_read(&tail) == &dummy.next) {
186         abort();
187     }
188 
189     /* If the head node has NULL in its next pointer, the value is
190      * wrong and we need to wait until its enqueuer finishes the update.
191      */
192     node = head;
193     next = atomic_mb_read(&head->next);
194     if (!next) {
195         return NULL;
196     }
197 
198     /* Since we are the sole consumer, and we excluded the empty case
199      * above, the queue will always have at least two nodes: the
200      * dummy node, and the one being removed.  So we do not need to update
201      * the tail pointer.
202      */
203     head = next;
204 
205     /* If we dequeued the dummy node, add it back at the end and retry.  */
206     if (node == &dummy) {
207         enqueue(node);
208         goto retry;
209     }
210 
211     return node;
212 }
213 
214 static void *call_rcu_thread(void *opaque)
215 {
216     struct rcu_head *node;
217 
218     for (;;) {
219         int tries = 0;
220         int n = atomic_read(&rcu_call_count);
221 
222         /* Heuristically wait for a decent number of callbacks to pile up.
223          * Fetch rcu_call_count now, we only must process elements that were
224          * added before synchronize_rcu() starts.
225          */
226         while (n < RCU_CALL_MIN_SIZE && ++tries <= 5) {
227             g_usleep(100000);
228             qemu_event_reset(&rcu_call_ready_event);
229             n = atomic_read(&rcu_call_count);
230             if (n < RCU_CALL_MIN_SIZE) {
231                 qemu_event_wait(&rcu_call_ready_event);
232                 n = atomic_read(&rcu_call_count);
233             }
234         }
235 
236         atomic_sub(&rcu_call_count, n);
237         synchronize_rcu();
238         while (n > 0) {
239             node = try_dequeue();
240             while (!node) {
241                 qemu_event_reset(&rcu_call_ready_event);
242                 node = try_dequeue();
243                 if (!node) {
244                     qemu_event_wait(&rcu_call_ready_event);
245                     node = try_dequeue();
246                 }
247             }
248 
249             n--;
250             node->func(node);
251         }
252     }
253     abort();
254 }
255 
256 void call_rcu1(struct rcu_head *node, void (*func)(struct rcu_head *node))
257 {
258     node->func = func;
259     enqueue(node);
260     atomic_inc(&rcu_call_count);
261     qemu_event_set(&rcu_call_ready_event);
262 }
263 
264 void rcu_register_thread(void)
265 {
266     assert(rcu_reader.ctr == 0);
267     qemu_mutex_lock(&rcu_gp_lock);
268     QLIST_INSERT_HEAD(&registry, &rcu_reader, node);
269     qemu_mutex_unlock(&rcu_gp_lock);
270 }
271 
272 void rcu_unregister_thread(void)
273 {
274     qemu_mutex_lock(&rcu_gp_lock);
275     QLIST_REMOVE(&rcu_reader, node);
276     qemu_mutex_unlock(&rcu_gp_lock);
277 }
278 
279 static void __attribute__((__constructor__)) rcu_init(void)
280 {
281     QemuThread thread;
282 
283     qemu_mutex_init(&rcu_gp_lock);
284     qemu_event_init(&rcu_gp_event, true);
285 
286     qemu_event_init(&rcu_call_ready_event, false);
287     qemu_thread_create(&thread, "call_rcu", call_rcu_thread,
288                        NULL, QEMU_THREAD_DETACHED);
289 
290     rcu_register_thread();
291 }
292