1 /* 2 * urcu-mb.c 3 * 4 * Userspace RCU library with explicit memory barriers 5 * 6 * Copyright (c) 2009 Mathieu Desnoyers <mathieu.desnoyers@efficios.com> 7 * Copyright (c) 2009 Paul E. McKenney, IBM Corporation. 8 * Copyright 2015 Red Hat, Inc. 9 * 10 * Ported to QEMU by Paolo Bonzini <pbonzini@redhat.com> 11 * 12 * This library is free software; you can redistribute it and/or 13 * modify it under the terms of the GNU Lesser General Public 14 * License as published by the Free Software Foundation; either 15 * version 2.1 of the License, or (at your option) any later version. 16 * 17 * This library is distributed in the hope that it will be useful, 18 * but WITHOUT ANY WARRANTY; without even the implied warranty of 19 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 20 * Lesser General Public License for more details. 21 * 22 * You should have received a copy of the GNU Lesser General Public 23 * License along with this library; if not, write to the Free Software 24 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 25 * 26 * IBM's contributions to this file may be relicensed under LGPLv2 or later. 27 */ 28 29 #include "qemu-common.h" 30 #include <stdio.h> 31 #include <assert.h> 32 #include <stdlib.h> 33 #include <stdint.h> 34 #include <errno.h> 35 #include "qemu/rcu.h" 36 #include "qemu/atomic.h" 37 #include "qemu/thread.h" 38 39 /* 40 * Global grace period counter. Bit 0 is always one in rcu_gp_ctr. 41 * Bits 1 and above are defined in synchronize_rcu. 42 */ 43 #define RCU_GP_LOCKED (1UL << 0) 44 #define RCU_GP_CTR (1UL << 1) 45 46 unsigned long rcu_gp_ctr = RCU_GP_LOCKED; 47 48 QemuEvent rcu_gp_event; 49 static QemuMutex rcu_gp_lock; 50 51 /* 52 * Check whether a quiescent state was crossed between the beginning of 53 * update_counter_and_wait and now. 54 */ 55 static inline int rcu_gp_ongoing(unsigned long *ctr) 56 { 57 unsigned long v; 58 59 v = atomic_read(ctr); 60 return v && (v != rcu_gp_ctr); 61 } 62 63 /* Written to only by each individual reader. Read by both the reader and the 64 * writers. 65 */ 66 __thread struct rcu_reader_data rcu_reader; 67 68 /* Protected by rcu_gp_lock. */ 69 typedef QLIST_HEAD(, rcu_reader_data) ThreadList; 70 static ThreadList registry = QLIST_HEAD_INITIALIZER(registry); 71 72 /* Wait for previous parity/grace period to be empty of readers. */ 73 static void wait_for_readers(void) 74 { 75 ThreadList qsreaders = QLIST_HEAD_INITIALIZER(qsreaders); 76 struct rcu_reader_data *index, *tmp; 77 78 for (;;) { 79 /* We want to be notified of changes made to rcu_gp_ongoing 80 * while we walk the list. 81 */ 82 qemu_event_reset(&rcu_gp_event); 83 84 /* Instead of using atomic_mb_set for index->waiting, and 85 * atomic_mb_read for index->ctr, memory barriers are placed 86 * manually since writes to different threads are independent. 87 * atomic_mb_set has a smp_wmb before... 88 */ 89 smp_wmb(); 90 QLIST_FOREACH(index, ®istry, node) { 91 atomic_set(&index->waiting, true); 92 } 93 94 /* ... and a smp_mb after. */ 95 smp_mb(); 96 97 QLIST_FOREACH_SAFE(index, ®istry, node, tmp) { 98 if (!rcu_gp_ongoing(&index->ctr)) { 99 QLIST_REMOVE(index, node); 100 QLIST_INSERT_HEAD(&qsreaders, index, node); 101 102 /* No need for mb_set here, worst of all we 103 * get some extra futex wakeups. 104 */ 105 atomic_set(&index->waiting, false); 106 } 107 } 108 109 /* atomic_mb_read has smp_rmb after. */ 110 smp_rmb(); 111 112 if (QLIST_EMPTY(®istry)) { 113 break; 114 } 115 116 /* Wait for one thread to report a quiescent state and 117 * try again. 118 */ 119 qemu_event_wait(&rcu_gp_event); 120 } 121 122 /* put back the reader list in the registry */ 123 QLIST_SWAP(®istry, &qsreaders, node); 124 } 125 126 void synchronize_rcu(void) 127 { 128 qemu_mutex_lock(&rcu_gp_lock); 129 130 if (!QLIST_EMPTY(®istry)) { 131 /* In either case, the atomic_mb_set below blocks stores that free 132 * old RCU-protected pointers. 133 */ 134 if (sizeof(rcu_gp_ctr) < 8) { 135 /* For architectures with 32-bit longs, a two-subphases algorithm 136 * ensures we do not encounter overflow bugs. 137 * 138 * Switch parity: 0 -> 1, 1 -> 0. 139 */ 140 atomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR); 141 wait_for_readers(); 142 atomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR); 143 } else { 144 /* Increment current grace period. */ 145 atomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr + RCU_GP_CTR); 146 } 147 148 wait_for_readers(); 149 } 150 151 qemu_mutex_unlock(&rcu_gp_lock); 152 } 153 154 155 #define RCU_CALL_MIN_SIZE 30 156 157 /* Multi-producer, single-consumer queue based on urcu/static/wfqueue.h 158 * from liburcu. Note that head is only used by the consumer. 159 */ 160 static struct rcu_head dummy; 161 static struct rcu_head *head = &dummy, **tail = &dummy.next; 162 static int rcu_call_count; 163 static QemuEvent rcu_call_ready_event; 164 165 static void enqueue(struct rcu_head *node) 166 { 167 struct rcu_head **old_tail; 168 169 node->next = NULL; 170 old_tail = atomic_xchg(&tail, &node->next); 171 atomic_mb_set(old_tail, node); 172 } 173 174 static struct rcu_head *try_dequeue(void) 175 { 176 struct rcu_head *node, *next; 177 178 retry: 179 /* Test for an empty list, which we do not expect. Note that for 180 * the consumer head and tail are always consistent. The head 181 * is consistent because only the consumer reads/writes it. 182 * The tail, because it is the first step in the enqueuing. 183 * It is only the next pointers that might be inconsistent. 184 */ 185 if (head == &dummy && atomic_mb_read(&tail) == &dummy.next) { 186 abort(); 187 } 188 189 /* If the head node has NULL in its next pointer, the value is 190 * wrong and we need to wait until its enqueuer finishes the update. 191 */ 192 node = head; 193 next = atomic_mb_read(&head->next); 194 if (!next) { 195 return NULL; 196 } 197 198 /* Since we are the sole consumer, and we excluded the empty case 199 * above, the queue will always have at least two nodes: the 200 * dummy node, and the one being removed. So we do not need to update 201 * the tail pointer. 202 */ 203 head = next; 204 205 /* If we dequeued the dummy node, add it back at the end and retry. */ 206 if (node == &dummy) { 207 enqueue(node); 208 goto retry; 209 } 210 211 return node; 212 } 213 214 static void *call_rcu_thread(void *opaque) 215 { 216 struct rcu_head *node; 217 218 for (;;) { 219 int tries = 0; 220 int n = atomic_read(&rcu_call_count); 221 222 /* Heuristically wait for a decent number of callbacks to pile up. 223 * Fetch rcu_call_count now, we only must process elements that were 224 * added before synchronize_rcu() starts. 225 */ 226 while (n < RCU_CALL_MIN_SIZE && ++tries <= 5) { 227 g_usleep(100000); 228 qemu_event_reset(&rcu_call_ready_event); 229 n = atomic_read(&rcu_call_count); 230 if (n < RCU_CALL_MIN_SIZE) { 231 qemu_event_wait(&rcu_call_ready_event); 232 n = atomic_read(&rcu_call_count); 233 } 234 } 235 236 atomic_sub(&rcu_call_count, n); 237 synchronize_rcu(); 238 while (n > 0) { 239 node = try_dequeue(); 240 while (!node) { 241 qemu_event_reset(&rcu_call_ready_event); 242 node = try_dequeue(); 243 if (!node) { 244 qemu_event_wait(&rcu_call_ready_event); 245 node = try_dequeue(); 246 } 247 } 248 249 n--; 250 node->func(node); 251 } 252 } 253 abort(); 254 } 255 256 void call_rcu1(struct rcu_head *node, void (*func)(struct rcu_head *node)) 257 { 258 node->func = func; 259 enqueue(node); 260 atomic_inc(&rcu_call_count); 261 qemu_event_set(&rcu_call_ready_event); 262 } 263 264 void rcu_register_thread(void) 265 { 266 assert(rcu_reader.ctr == 0); 267 qemu_mutex_lock(&rcu_gp_lock); 268 QLIST_INSERT_HEAD(®istry, &rcu_reader, node); 269 qemu_mutex_unlock(&rcu_gp_lock); 270 } 271 272 void rcu_unregister_thread(void) 273 { 274 qemu_mutex_lock(&rcu_gp_lock); 275 QLIST_REMOVE(&rcu_reader, node); 276 qemu_mutex_unlock(&rcu_gp_lock); 277 } 278 279 static void __attribute__((__constructor__)) rcu_init(void) 280 { 281 QemuThread thread; 282 283 qemu_mutex_init(&rcu_gp_lock); 284 qemu_event_init(&rcu_gp_event, true); 285 286 qemu_event_init(&rcu_call_ready_event, false); 287 qemu_thread_create(&thread, "call_rcu", call_rcu_thread, 288 NULL, QEMU_THREAD_DETACHED); 289 290 rcu_register_thread(); 291 } 292