1.. _list_rcu_doc: 2 3Using RCU to Protect Read-Mostly Linked Lists 4============================================= 5 6One of the best applications of RCU is to protect read-mostly linked lists 7(``struct list_head`` in list.h). One big advantage of this approach 8is that all of the required memory barriers are included for you in 9the list macros. This document describes several applications of RCU, 10with the best fits first. 11 12 13Example 1: Read-mostly list: Deferred Destruction 14------------------------------------------------- 15 16A widely used usecase for RCU lists in the kernel is lockless iteration over 17all processes in the system. ``task_struct::tasks`` represents the list node that 18links all the processes. The list can be traversed in parallel to any list 19additions or removals. 20 21The traversal of the list is done using ``for_each_process()`` which is defined 22by the 2 macros:: 23 24 #define next_task(p) \ 25 list_entry_rcu((p)->tasks.next, struct task_struct, tasks) 26 27 #define for_each_process(p) \ 28 for (p = &init_task ; (p = next_task(p)) != &init_task ; ) 29 30The code traversing the list of all processes typically looks like:: 31 32 rcu_read_lock(); 33 for_each_process(p) { 34 /* Do something with p */ 35 } 36 rcu_read_unlock(); 37 38The simplified code for removing a process from a task list is:: 39 40 void release_task(struct task_struct *p) 41 { 42 write_lock(&tasklist_lock); 43 list_del_rcu(&p->tasks); 44 write_unlock(&tasklist_lock); 45 call_rcu(&p->rcu, delayed_put_task_struct); 46 } 47 48When a process exits, ``release_task()`` calls ``list_del_rcu(&p->tasks)`` under 49``tasklist_lock`` writer lock protection, to remove the task from the list of 50all tasks. The ``tasklist_lock`` prevents concurrent list additions/removals 51from corrupting the list. Readers using ``for_each_process()`` are not protected 52with the ``tasklist_lock``. To prevent readers from noticing changes in the list 53pointers, the ``task_struct`` object is freed only after one or more grace 54periods elapse (with the help of call_rcu()). This deferring of destruction 55ensures that any readers traversing the list will see valid ``p->tasks.next`` 56pointers and deletion/freeing can happen in parallel with traversal of the list. 57This pattern is also called an **existence lock**, since RCU pins the object in 58memory until all existing readers finish. 59 60 61Example 2: Read-Side Action Taken Outside of Lock: No In-Place Updates 62---------------------------------------------------------------------- 63 64The best applications are cases where, if reader-writer locking were 65used, the read-side lock would be dropped before taking any action 66based on the results of the search. The most celebrated example is 67the routing table. Because the routing table is tracking the state of 68equipment outside of the computer, it will at times contain stale data. 69Therefore, once the route has been computed, there is no need to hold 70the routing table static during transmission of the packet. After all, 71you can hold the routing table static all you want, but that won't keep 72the external Internet from changing, and it is the state of the external 73Internet that really matters. In addition, routing entries are typically 74added or deleted, rather than being modified in place. 75 76A straightforward example of this use of RCU may be found in the 77system-call auditing support. For example, a reader-writer locked 78implementation of ``audit_filter_task()`` might be as follows:: 79 80 static enum audit_state audit_filter_task(struct task_struct *tsk) 81 { 82 struct audit_entry *e; 83 enum audit_state state; 84 85 read_lock(&auditsc_lock); 86 /* Note: audit_filter_mutex held by caller. */ 87 list_for_each_entry(e, &audit_tsklist, list) { 88 if (audit_filter_rules(tsk, &e->rule, NULL, &state)) { 89 read_unlock(&auditsc_lock); 90 return state; 91 } 92 } 93 read_unlock(&auditsc_lock); 94 return AUDIT_BUILD_CONTEXT; 95 } 96 97Here the list is searched under the lock, but the lock is dropped before 98the corresponding value is returned. By the time that this value is acted 99on, the list may well have been modified. This makes sense, since if 100you are turning auditing off, it is OK to audit a few extra system calls. 101 102This means that RCU can be easily applied to the read side, as follows:: 103 104 static enum audit_state audit_filter_task(struct task_struct *tsk) 105 { 106 struct audit_entry *e; 107 enum audit_state state; 108 109 rcu_read_lock(); 110 /* Note: audit_filter_mutex held by caller. */ 111 list_for_each_entry_rcu(e, &audit_tsklist, list) { 112 if (audit_filter_rules(tsk, &e->rule, NULL, &state)) { 113 rcu_read_unlock(); 114 return state; 115 } 116 } 117 rcu_read_unlock(); 118 return AUDIT_BUILD_CONTEXT; 119 } 120 121The ``read_lock()`` and ``read_unlock()`` calls have become rcu_read_lock() 122and rcu_read_unlock(), respectively, and the list_for_each_entry() has 123become list_for_each_entry_rcu(). The **_rcu()** list-traversal primitives 124insert the read-side memory barriers that are required on DEC Alpha CPUs. 125 126The changes to the update side are also straightforward. A reader-writer lock 127might be used as follows for deletion and insertion:: 128 129 static inline int audit_del_rule(struct audit_rule *rule, 130 struct list_head *list) 131 { 132 struct audit_entry *e; 133 134 write_lock(&auditsc_lock); 135 list_for_each_entry(e, list, list) { 136 if (!audit_compare_rule(rule, &e->rule)) { 137 list_del(&e->list); 138 write_unlock(&auditsc_lock); 139 return 0; 140 } 141 } 142 write_unlock(&auditsc_lock); 143 return -EFAULT; /* No matching rule */ 144 } 145 146 static inline int audit_add_rule(struct audit_entry *entry, 147 struct list_head *list) 148 { 149 write_lock(&auditsc_lock); 150 if (entry->rule.flags & AUDIT_PREPEND) { 151 entry->rule.flags &= ~AUDIT_PREPEND; 152 list_add(&entry->list, list); 153 } else { 154 list_add_tail(&entry->list, list); 155 } 156 write_unlock(&auditsc_lock); 157 return 0; 158 } 159 160Following are the RCU equivalents for these two functions:: 161 162 static inline int audit_del_rule(struct audit_rule *rule, 163 struct list_head *list) 164 { 165 struct audit_entry *e; 166 167 /* No need to use the _rcu iterator here, since this is the only 168 * deletion routine. */ 169 list_for_each_entry(e, list, list) { 170 if (!audit_compare_rule(rule, &e->rule)) { 171 list_del_rcu(&e->list); 172 call_rcu(&e->rcu, audit_free_rule); 173 return 0; 174 } 175 } 176 return -EFAULT; /* No matching rule */ 177 } 178 179 static inline int audit_add_rule(struct audit_entry *entry, 180 struct list_head *list) 181 { 182 if (entry->rule.flags & AUDIT_PREPEND) { 183 entry->rule.flags &= ~AUDIT_PREPEND; 184 list_add_rcu(&entry->list, list); 185 } else { 186 list_add_tail_rcu(&entry->list, list); 187 } 188 return 0; 189 } 190 191Normally, the ``write_lock()`` and ``write_unlock()`` would be replaced by a 192spin_lock() and a spin_unlock(). But in this case, all callers hold 193``audit_filter_mutex``, so no additional locking is required. The 194``auditsc_lock`` can therefore be eliminated, since use of RCU eliminates the 195need for writers to exclude readers. 196 197The list_del(), list_add(), and list_add_tail() primitives have been 198replaced by list_del_rcu(), list_add_rcu(), and list_add_tail_rcu(). 199The **_rcu()** list-manipulation primitives add memory barriers that are needed on 200weakly ordered CPUs (most of them!). The list_del_rcu() primitive omits the 201pointer poisoning debug-assist code that would otherwise cause concurrent 202readers to fail spectacularly. 203 204So, when readers can tolerate stale data and when entries are either added or 205deleted, without in-place modification, it is very easy to use RCU! 206 207 208Example 3: Handling In-Place Updates 209------------------------------------ 210 211The system-call auditing code does not update auditing rules in place. However, 212if it did, the reader-writer-locked code to do so might look as follows 213(assuming only ``field_count`` is updated, otherwise, the added fields would 214need to be filled in):: 215 216 static inline int audit_upd_rule(struct audit_rule *rule, 217 struct list_head *list, 218 __u32 newaction, 219 __u32 newfield_count) 220 { 221 struct audit_entry *e; 222 struct audit_entry *ne; 223 224 write_lock(&auditsc_lock); 225 /* Note: audit_filter_mutex held by caller. */ 226 list_for_each_entry(e, list, list) { 227 if (!audit_compare_rule(rule, &e->rule)) { 228 e->rule.action = newaction; 229 e->rule.field_count = newfield_count; 230 write_unlock(&auditsc_lock); 231 return 0; 232 } 233 } 234 write_unlock(&auditsc_lock); 235 return -EFAULT; /* No matching rule */ 236 } 237 238The RCU version creates a copy, updates the copy, then replaces the old 239entry with the newly updated entry. This sequence of actions, allowing 240concurrent reads while making a copy to perform an update, is what gives 241RCU (*read-copy update*) its name. The RCU code is as follows:: 242 243 static inline int audit_upd_rule(struct audit_rule *rule, 244 struct list_head *list, 245 __u32 newaction, 246 __u32 newfield_count) 247 { 248 struct audit_entry *e; 249 struct audit_entry *ne; 250 251 list_for_each_entry(e, list, list) { 252 if (!audit_compare_rule(rule, &e->rule)) { 253 ne = kmalloc(sizeof(*entry), GFP_ATOMIC); 254 if (ne == NULL) 255 return -ENOMEM; 256 audit_copy_rule(&ne->rule, &e->rule); 257 ne->rule.action = newaction; 258 ne->rule.field_count = newfield_count; 259 list_replace_rcu(&e->list, &ne->list); 260 call_rcu(&e->rcu, audit_free_rule); 261 return 0; 262 } 263 } 264 return -EFAULT; /* No matching rule */ 265 } 266 267Again, this assumes that the caller holds ``audit_filter_mutex``. Normally, the 268writer lock would become a spinlock in this sort of code. 269 270Another use of this pattern can be found in the openswitch driver's *connection 271tracking table* code in ``ct_limit_set()``. The table holds connection tracking 272entries and has a limit on the maximum entries. There is one such table 273per-zone and hence one *limit* per zone. The zones are mapped to their limits 274through a hashtable using an RCU-managed hlist for the hash chains. When a new 275limit is set, a new limit object is allocated and ``ct_limit_set()`` is called 276to replace the old limit object with the new one using list_replace_rcu(). 277The old limit object is then freed after a grace period using kfree_rcu(). 278 279 280Example 4: Eliminating Stale Data 281--------------------------------- 282 283The auditing example above tolerates stale data, as do most algorithms 284that are tracking external state. Because there is a delay from the 285time the external state changes before Linux becomes aware of the change, 286additional RCU-induced staleness is generally not a problem. 287 288However, there are many examples where stale data cannot be tolerated. 289One example in the Linux kernel is the System V IPC (see the shm_lock() 290function in ipc/shm.c). This code checks a *deleted* flag under a 291per-entry spinlock, and, if the *deleted* flag is set, pretends that the 292entry does not exist. For this to be helpful, the search function must 293return holding the per-entry spinlock, as shm_lock() does in fact do. 294 295.. _quick_quiz: 296 297Quick Quiz: 298 For the deleted-flag technique to be helpful, why is it necessary 299 to hold the per-entry lock while returning from the search function? 300 301:ref:`Answer to Quick Quiz <quick_quiz_answer>` 302 303If the system-call audit module were to ever need to reject stale data, one way 304to accomplish this would be to add a ``deleted`` flag and a ``lock`` spinlock to the 305audit_entry structure, and modify ``audit_filter_task()`` as follows:: 306 307 static enum audit_state audit_filter_task(struct task_struct *tsk) 308 { 309 struct audit_entry *e; 310 enum audit_state state; 311 312 rcu_read_lock(); 313 list_for_each_entry_rcu(e, &audit_tsklist, list) { 314 if (audit_filter_rules(tsk, &e->rule, NULL, &state)) { 315 spin_lock(&e->lock); 316 if (e->deleted) { 317 spin_unlock(&e->lock); 318 rcu_read_unlock(); 319 return AUDIT_BUILD_CONTEXT; 320 } 321 rcu_read_unlock(); 322 return state; 323 } 324 } 325 rcu_read_unlock(); 326 return AUDIT_BUILD_CONTEXT; 327 } 328 329Note that this example assumes that entries are only added and deleted. 330Additional mechanism is required to deal correctly with the update-in-place 331performed by ``audit_upd_rule()``. For one thing, ``audit_upd_rule()`` would 332need additional memory barriers to ensure that the list_add_rcu() was really 333executed before the list_del_rcu(). 334 335The ``audit_del_rule()`` function would need to set the ``deleted`` flag under the 336spinlock as follows:: 337 338 static inline int audit_del_rule(struct audit_rule *rule, 339 struct list_head *list) 340 { 341 struct audit_entry *e; 342 343 /* No need to use the _rcu iterator here, since this 344 * is the only deletion routine. */ 345 list_for_each_entry(e, list, list) { 346 if (!audit_compare_rule(rule, &e->rule)) { 347 spin_lock(&e->lock); 348 list_del_rcu(&e->list); 349 e->deleted = 1; 350 spin_unlock(&e->lock); 351 call_rcu(&e->rcu, audit_free_rule); 352 return 0; 353 } 354 } 355 return -EFAULT; /* No matching rule */ 356 } 357 358This too assumes that the caller holds ``audit_filter_mutex``. 359 360 361Example 5: Skipping Stale Objects 362--------------------------------- 363 364For some usecases, reader performance can be improved by skipping stale objects 365during read-side list traversal if the object in concern is pending destruction 366after one or more grace periods. One such example can be found in the timerfd 367subsystem. When a ``CLOCK_REALTIME`` clock is reprogrammed - for example due to 368setting of the system time, then all programmed timerfds that depend on this 369clock get triggered and processes waiting on them to expire are woken up in 370advance of their scheduled expiry. To facilitate this, all such timers are added 371to an RCU-managed ``cancel_list`` when they are setup in 372``timerfd_setup_cancel()``:: 373 374 static void timerfd_setup_cancel(struct timerfd_ctx *ctx, int flags) 375 { 376 spin_lock(&ctx->cancel_lock); 377 if ((ctx->clockid == CLOCK_REALTIME && 378 (flags & TFD_TIMER_ABSTIME) && (flags & TFD_TIMER_CANCEL_ON_SET)) { 379 if (!ctx->might_cancel) { 380 ctx->might_cancel = true; 381 spin_lock(&cancel_lock); 382 list_add_rcu(&ctx->clist, &cancel_list); 383 spin_unlock(&cancel_lock); 384 } 385 } 386 spin_unlock(&ctx->cancel_lock); 387 } 388 389When a timerfd is freed (fd is closed), then the ``might_cancel`` flag of the 390timerfd object is cleared, the object removed from the ``cancel_list`` and 391destroyed:: 392 393 int timerfd_release(struct inode *inode, struct file *file) 394 { 395 struct timerfd_ctx *ctx = file->private_data; 396 397 spin_lock(&ctx->cancel_lock); 398 if (ctx->might_cancel) { 399 ctx->might_cancel = false; 400 spin_lock(&cancel_lock); 401 list_del_rcu(&ctx->clist); 402 spin_unlock(&cancel_lock); 403 } 404 spin_unlock(&ctx->cancel_lock); 405 406 hrtimer_cancel(&ctx->t.tmr); 407 kfree_rcu(ctx, rcu); 408 return 0; 409 } 410 411If the ``CLOCK_REALTIME`` clock is set, for example by a time server, the 412hrtimer framework calls ``timerfd_clock_was_set()`` which walks the 413``cancel_list`` and wakes up processes waiting on the timerfd. While iterating 414the ``cancel_list``, the ``might_cancel`` flag is consulted to skip stale 415objects:: 416 417 void timerfd_clock_was_set(void) 418 { 419 struct timerfd_ctx *ctx; 420 unsigned long flags; 421 422 rcu_read_lock(); 423 list_for_each_entry_rcu(ctx, &cancel_list, clist) { 424 if (!ctx->might_cancel) 425 continue; 426 spin_lock_irqsave(&ctx->wqh.lock, flags); 427 if (ctx->moffs != ktime_mono_to_real(0)) { 428 ctx->moffs = KTIME_MAX; 429 ctx->ticks++; 430 wake_up_locked_poll(&ctx->wqh, EPOLLIN); 431 } 432 spin_unlock_irqrestore(&ctx->wqh.lock, flags); 433 } 434 rcu_read_unlock(); 435 } 436 437The key point here is, because RCU-traversal of the ``cancel_list`` happens 438while objects are being added and removed to the list, sometimes the traversal 439can step on an object that has been removed from the list. In this example, it 440is seen that it is better to skip such objects using a flag. 441 442 443Summary 444------- 445 446Read-mostly list-based data structures that can tolerate stale data are 447the most amenable to use of RCU. The simplest case is where entries are 448either added or deleted from the data structure (or atomically modified 449in place), but non-atomic in-place modifications can be handled by making 450a copy, updating the copy, then replacing the original with the copy. 451If stale data cannot be tolerated, then a *deleted* flag may be used 452in conjunction with a per-entry spinlock in order to allow the search 453function to reject newly deleted data. 454 455.. _quick_quiz_answer: 456 457Answer to Quick Quiz: 458 For the deleted-flag technique to be helpful, why is it necessary 459 to hold the per-entry lock while returning from the search function? 460 461 If the search function drops the per-entry lock before returning, 462 then the caller will be processing stale data in any case. If it 463 is really OK to be processing stale data, then you don't need a 464 *deleted* flag. If processing stale data really is a problem, 465 then you need to hold the per-entry lock across all of the code 466 that uses the value that was returned. 467 468:ref:`Back to Quick Quiz <quick_quiz>` 469