1 /* 2 * urcu-mb.c 3 * 4 * Userspace RCU library with explicit memory barriers 5 * 6 * Copyright (c) 2009 Mathieu Desnoyers <mathieu.desnoyers@efficios.com> 7 * Copyright (c) 2009 Paul E. McKenney, IBM Corporation. 8 * Copyright 2015 Red Hat, Inc. 9 * 10 * Ported to QEMU by Paolo Bonzini <pbonzini@redhat.com> 11 * 12 * This library is free software; you can redistribute it and/or 13 * modify it under the terms of the GNU Lesser General Public 14 * License as published by the Free Software Foundation; either 15 * version 2.1 of the License, or (at your option) any later version. 16 * 17 * This library is distributed in the hope that it will be useful, 18 * but WITHOUT ANY WARRANTY; without even the implied warranty of 19 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 20 * Lesser General Public License for more details. 21 * 22 * You should have received a copy of the GNU Lesser General Public 23 * License along with this library; if not, write to the Free Software 24 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 25 * 26 * IBM's contributions to this file may be relicensed under LGPLv2 or later. 27 */ 28 29 #include "qemu/osdep.h" 30 #include "qemu/rcu.h" 31 #include "qemu/atomic.h" 32 #include "qemu/thread.h" 33 #include "qemu/main-loop.h" 34 #include "qemu/lockable.h" 35 #if defined(CONFIG_MALLOC_TRIM) 36 #include <malloc.h> 37 #endif 38 39 /* 40 * Global grace period counter. Bit 0 is always one in rcu_gp_ctr. 41 * Bits 1 and above are defined in synchronize_rcu. 42 */ 43 #define RCU_GP_LOCKED (1UL << 0) 44 #define RCU_GP_CTR (1UL << 1) 45 46 unsigned long rcu_gp_ctr = RCU_GP_LOCKED; 47 48 QemuEvent rcu_gp_event; 49 static int in_drain_call_rcu; 50 static QemuMutex rcu_registry_lock; 51 static QemuMutex rcu_sync_lock; 52 53 /* 54 * Check whether a quiescent state was crossed between the beginning of 55 * update_counter_and_wait and now. 56 */ 57 static inline int rcu_gp_ongoing(unsigned long *ctr) 58 { 59 unsigned long v; 60 61 v = qatomic_read(ctr); 62 return v && (v != rcu_gp_ctr); 63 } 64 65 /* Written to only by each individual reader. Read by both the reader and the 66 * writers. 67 */ 68 QEMU_DEFINE_CO_TLS(struct rcu_reader_data, rcu_reader) 69 70 /* Protected by rcu_registry_lock. */ 71 typedef QLIST_HEAD(, rcu_reader_data) ThreadList; 72 static ThreadList registry = QLIST_HEAD_INITIALIZER(registry); 73 74 /* Wait for previous parity/grace period to be empty of readers. */ 75 static void wait_for_readers(void) 76 { 77 ThreadList qsreaders = QLIST_HEAD_INITIALIZER(qsreaders); 78 struct rcu_reader_data *index, *tmp; 79 80 for (;;) { 81 /* We want to be notified of changes made to rcu_gp_ongoing 82 * while we walk the list. 83 */ 84 qemu_event_reset(&rcu_gp_event); 85 86 QLIST_FOREACH(index, ®istry, node) { 87 qatomic_set(&index->waiting, true); 88 } 89 90 /* Here, order the stores to index->waiting before the loads of 91 * index->ctr. Pairs with smp_mb_placeholder() in rcu_read_unlock(), 92 * ensuring that the loads of index->ctr are sequentially consistent. 93 * 94 * If this is the last iteration, this barrier also prevents 95 * frees from seeping upwards, and orders the two wait phases 96 * on architectures with 32-bit longs; see synchronize_rcu(). 97 */ 98 smp_mb_global(); 99 100 QLIST_FOREACH_SAFE(index, ®istry, node, tmp) { 101 if (!rcu_gp_ongoing(&index->ctr)) { 102 QLIST_REMOVE(index, node); 103 QLIST_INSERT_HEAD(&qsreaders, index, node); 104 105 /* No need for memory barriers here, worst of all we 106 * get some extra futex wakeups. 107 */ 108 qatomic_set(&index->waiting, false); 109 } else if (qatomic_read(&in_drain_call_rcu)) { 110 notifier_list_notify(&index->force_rcu, NULL); 111 } 112 } 113 114 if (QLIST_EMPTY(®istry)) { 115 break; 116 } 117 118 /* Wait for one thread to report a quiescent state and try again. 119 * Release rcu_registry_lock, so rcu_(un)register_thread() doesn't 120 * wait too much time. 121 * 122 * rcu_register_thread() may add nodes to ®istry; it will not 123 * wake up synchronize_rcu, but that is okay because at least another 124 * thread must exit its RCU read-side critical section before 125 * synchronize_rcu is done. The next iteration of the loop will 126 * move the new thread's rcu_reader from ®istry to &qsreaders, 127 * because rcu_gp_ongoing() will return false. 128 * 129 * rcu_unregister_thread() may remove nodes from &qsreaders instead 130 * of ®istry if it runs during qemu_event_wait. That's okay; 131 * the node then will not be added back to ®istry by QLIST_SWAP 132 * below. The invariant is that the node is part of one list when 133 * rcu_registry_lock is released. 134 */ 135 qemu_mutex_unlock(&rcu_registry_lock); 136 qemu_event_wait(&rcu_gp_event); 137 qemu_mutex_lock(&rcu_registry_lock); 138 } 139 140 /* put back the reader list in the registry */ 141 QLIST_SWAP(®istry, &qsreaders, node); 142 } 143 144 void synchronize_rcu(void) 145 { 146 QEMU_LOCK_GUARD(&rcu_sync_lock); 147 148 /* Write RCU-protected pointers before reading p_rcu_reader->ctr. 149 * Pairs with smp_mb_placeholder() in rcu_read_lock(). 150 * 151 * Also orders write to RCU-protected pointers before 152 * write to rcu_gp_ctr. 153 */ 154 smp_mb_global(); 155 156 QEMU_LOCK_GUARD(&rcu_registry_lock); 157 if (!QLIST_EMPTY(®istry)) { 158 if (sizeof(rcu_gp_ctr) < 8) { 159 /* For architectures with 32-bit longs, a two-subphases algorithm 160 * ensures we do not encounter overflow bugs. 161 * 162 * Switch parity: 0 -> 1, 1 -> 0. 163 */ 164 qatomic_set(&rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR); 165 wait_for_readers(); 166 qatomic_set(&rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR); 167 } else { 168 /* Increment current grace period. */ 169 qatomic_set(&rcu_gp_ctr, rcu_gp_ctr + RCU_GP_CTR); 170 } 171 172 wait_for_readers(); 173 } 174 } 175 176 177 #define RCU_CALL_MIN_SIZE 30 178 179 /* Multi-producer, single-consumer queue based on urcu/static/wfqueue.h 180 * from liburcu. Note that head is only used by the consumer. 181 */ 182 static struct rcu_head dummy; 183 static struct rcu_head *head = &dummy, **tail = &dummy.next; 184 static int rcu_call_count; 185 static QemuEvent rcu_call_ready_event; 186 187 static void enqueue(struct rcu_head *node) 188 { 189 struct rcu_head **old_tail; 190 191 node->next = NULL; 192 193 /* 194 * Make this node the tail of the list. The node will be 195 * used by further enqueue operations, but it will not 196 * be dequeued yet... 197 */ 198 old_tail = qatomic_xchg(&tail, &node->next); 199 200 /* 201 * ... until it is pointed to from another item in the list. 202 * In the meantime, try_dequeue() will find a NULL next pointer 203 * and loop. 204 * 205 * Synchronizes with qatomic_load_acquire() in try_dequeue(). 206 */ 207 qatomic_store_release(old_tail, node); 208 } 209 210 static struct rcu_head *try_dequeue(void) 211 { 212 struct rcu_head *node, *next; 213 214 retry: 215 /* Head is only written by this thread, so no need for barriers. */ 216 node = head; 217 218 /* 219 * If the head node has NULL in its next pointer, the value is 220 * wrong and we need to wait until its enqueuer finishes the update. 221 */ 222 next = qatomic_load_acquire(&node->next); 223 if (!next) { 224 return NULL; 225 } 226 227 /* 228 * Test for an empty list, which we do not expect. Note that for 229 * the consumer head and tail are always consistent. The head 230 * is consistent because only the consumer reads/writes it. 231 * The tail, because it is the first step in the enqueuing. 232 * It is only the next pointers that might be inconsistent. 233 */ 234 if (head == &dummy && qatomic_read(&tail) == &dummy.next) { 235 abort(); 236 } 237 238 /* 239 * Since we are the sole consumer, and we excluded the empty case 240 * above, the queue will always have at least two nodes: the 241 * dummy node, and the one being removed. So we do not need to update 242 * the tail pointer. 243 */ 244 head = next; 245 246 /* If we dequeued the dummy node, add it back at the end and retry. */ 247 if (node == &dummy) { 248 enqueue(node); 249 goto retry; 250 } 251 252 return node; 253 } 254 255 static void *call_rcu_thread(void *opaque) 256 { 257 struct rcu_head *node; 258 259 rcu_register_thread(); 260 261 for (;;) { 262 int tries = 0; 263 int n = qatomic_read(&rcu_call_count); 264 265 /* Heuristically wait for a decent number of callbacks to pile up. 266 * Fetch rcu_call_count now, we only must process elements that were 267 * added before synchronize_rcu() starts. 268 */ 269 while (n == 0 || (n < RCU_CALL_MIN_SIZE && ++tries <= 5)) { 270 g_usleep(10000); 271 if (n == 0) { 272 qemu_event_reset(&rcu_call_ready_event); 273 n = qatomic_read(&rcu_call_count); 274 if (n == 0) { 275 #if defined(CONFIG_MALLOC_TRIM) 276 malloc_trim(4 * 1024 * 1024); 277 #endif 278 qemu_event_wait(&rcu_call_ready_event); 279 } 280 } 281 n = qatomic_read(&rcu_call_count); 282 } 283 284 qatomic_sub(&rcu_call_count, n); 285 synchronize_rcu(); 286 qemu_mutex_lock_iothread(); 287 while (n > 0) { 288 node = try_dequeue(); 289 while (!node) { 290 qemu_mutex_unlock_iothread(); 291 qemu_event_reset(&rcu_call_ready_event); 292 node = try_dequeue(); 293 if (!node) { 294 qemu_event_wait(&rcu_call_ready_event); 295 node = try_dequeue(); 296 } 297 qemu_mutex_lock_iothread(); 298 } 299 300 n--; 301 node->func(node); 302 } 303 qemu_mutex_unlock_iothread(); 304 } 305 abort(); 306 } 307 308 void call_rcu1(struct rcu_head *node, void (*func)(struct rcu_head *node)) 309 { 310 node->func = func; 311 enqueue(node); 312 qatomic_inc(&rcu_call_count); 313 qemu_event_set(&rcu_call_ready_event); 314 } 315 316 317 struct rcu_drain { 318 struct rcu_head rcu; 319 QemuEvent drain_complete_event; 320 }; 321 322 static void drain_rcu_callback(struct rcu_head *node) 323 { 324 struct rcu_drain *event = (struct rcu_drain *)node; 325 qemu_event_set(&event->drain_complete_event); 326 } 327 328 /* 329 * This function ensures that all pending RCU callbacks 330 * on the current thread are done executing 331 332 * drops big qemu lock during the wait to allow RCU thread 333 * to process the callbacks 334 * 335 */ 336 337 void drain_call_rcu(void) 338 { 339 struct rcu_drain rcu_drain; 340 bool locked = qemu_mutex_iothread_locked(); 341 342 memset(&rcu_drain, 0, sizeof(struct rcu_drain)); 343 qemu_event_init(&rcu_drain.drain_complete_event, false); 344 345 if (locked) { 346 qemu_mutex_unlock_iothread(); 347 } 348 349 350 /* 351 * RCU callbacks are invoked in the same order as in which they 352 * are registered, thus we can be sure that when 'drain_rcu_callback' 353 * is called, all RCU callbacks that were registered on this thread 354 * prior to calling this function are completed. 355 * 356 * Note that since we have only one global queue of the RCU callbacks, 357 * we also end up waiting for most of RCU callbacks that were registered 358 * on the other threads, but this is a side effect that shouldn't be 359 * assumed. 360 */ 361 362 qatomic_inc(&in_drain_call_rcu); 363 call_rcu1(&rcu_drain.rcu, drain_rcu_callback); 364 qemu_event_wait(&rcu_drain.drain_complete_event); 365 qatomic_dec(&in_drain_call_rcu); 366 367 if (locked) { 368 qemu_mutex_lock_iothread(); 369 } 370 371 } 372 373 void rcu_register_thread(void) 374 { 375 assert(get_ptr_rcu_reader()->ctr == 0); 376 qemu_mutex_lock(&rcu_registry_lock); 377 QLIST_INSERT_HEAD(®istry, get_ptr_rcu_reader(), node); 378 qemu_mutex_unlock(&rcu_registry_lock); 379 } 380 381 void rcu_unregister_thread(void) 382 { 383 qemu_mutex_lock(&rcu_registry_lock); 384 QLIST_REMOVE(get_ptr_rcu_reader(), node); 385 qemu_mutex_unlock(&rcu_registry_lock); 386 } 387 388 void rcu_add_force_rcu_notifier(Notifier *n) 389 { 390 qemu_mutex_lock(&rcu_registry_lock); 391 notifier_list_add(&get_ptr_rcu_reader()->force_rcu, n); 392 qemu_mutex_unlock(&rcu_registry_lock); 393 } 394 395 void rcu_remove_force_rcu_notifier(Notifier *n) 396 { 397 qemu_mutex_lock(&rcu_registry_lock); 398 notifier_remove(n); 399 qemu_mutex_unlock(&rcu_registry_lock); 400 } 401 402 static void rcu_init_complete(void) 403 { 404 QemuThread thread; 405 406 qemu_mutex_init(&rcu_registry_lock); 407 qemu_mutex_init(&rcu_sync_lock); 408 qemu_event_init(&rcu_gp_event, true); 409 410 qemu_event_init(&rcu_call_ready_event, false); 411 412 /* The caller is assumed to have iothread lock, so the call_rcu thread 413 * must have been quiescent even after forking, just recreate it. 414 */ 415 qemu_thread_create(&thread, "call_rcu", call_rcu_thread, 416 NULL, QEMU_THREAD_DETACHED); 417 418 rcu_register_thread(); 419 } 420 421 static int atfork_depth = 1; 422 423 void rcu_enable_atfork(void) 424 { 425 atfork_depth++; 426 } 427 428 void rcu_disable_atfork(void) 429 { 430 atfork_depth--; 431 } 432 433 #ifdef CONFIG_POSIX 434 static void rcu_init_lock(void) 435 { 436 if (atfork_depth < 1) { 437 return; 438 } 439 440 qemu_mutex_lock(&rcu_sync_lock); 441 qemu_mutex_lock(&rcu_registry_lock); 442 } 443 444 static void rcu_init_unlock(void) 445 { 446 if (atfork_depth < 1) { 447 return; 448 } 449 450 qemu_mutex_unlock(&rcu_registry_lock); 451 qemu_mutex_unlock(&rcu_sync_lock); 452 } 453 454 static void rcu_init_child(void) 455 { 456 if (atfork_depth < 1) { 457 return; 458 } 459 460 memset(®istry, 0, sizeof(registry)); 461 rcu_init_complete(); 462 } 463 #endif 464 465 static void __attribute__((__constructor__)) rcu_init(void) 466 { 467 smp_mb_global_init(); 468 #ifdef CONFIG_POSIX 469 pthread_atfork(rcu_init_lock, rcu_init_unlock, rcu_init_child); 470 #endif 471 rcu_init_complete(); 472 } 473