1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Copyright (C) 2007 Oracle. All rights reserved. 4 * Copyright (C) 2014 Fujitsu. All rights reserved. 5 */ 6 7 #include <linux/kthread.h> 8 #include <linux/slab.h> 9 #include <linux/list.h> 10 #include <linux/spinlock.h> 11 #include <linux/freezer.h> 12 #include "async-thread.h" 13 #include "ctree.h" 14 15 enum { 16 WORK_DONE_BIT, 17 WORK_ORDER_DONE_BIT, 18 }; 19 20 #define NO_THRESHOLD (-1) 21 #define DFT_THRESHOLD (32) 22 23 struct btrfs_workqueue { 24 struct workqueue_struct *normal_wq; 25 26 /* File system this workqueue services */ 27 struct btrfs_fs_info *fs_info; 28 29 /* List head pointing to ordered work list */ 30 struct list_head ordered_list; 31 32 /* Spinlock for ordered_list */ 33 spinlock_t list_lock; 34 35 /* Thresholding related variants */ 36 atomic_t pending; 37 38 /* Up limit of concurrency workers */ 39 int limit_active; 40 41 /* Current number of concurrency workers */ 42 int current_active; 43 44 /* Threshold to change current_active */ 45 int thresh; 46 unsigned int count; 47 spinlock_t thres_lock; 48 }; 49 50 struct btrfs_fs_info * __pure btrfs_workqueue_owner(const struct btrfs_workqueue *wq) 51 { 52 return wq->fs_info; 53 } 54 55 struct btrfs_fs_info * __pure btrfs_work_owner(const struct btrfs_work *work) 56 { 57 return work->wq->fs_info; 58 } 59 60 bool btrfs_workqueue_normal_congested(const struct btrfs_workqueue *wq) 61 { 62 /* 63 * We could compare wq->pending with num_online_cpus() 64 * to support "thresh == NO_THRESHOLD" case, but it requires 65 * moving up atomic_inc/dec in thresh_queue/exec_hook. Let's 66 * postpone it until someone needs the support of that case. 67 */ 68 if (wq->thresh == NO_THRESHOLD) 69 return false; 70 71 return atomic_read(&wq->pending) > wq->thresh * 2; 72 } 73 74 static void btrfs_init_workqueue(struct btrfs_workqueue *wq, 75 struct btrfs_fs_info *fs_info) 76 { 77 wq->fs_info = fs_info; 78 atomic_set(&wq->pending, 0); 79 INIT_LIST_HEAD(&wq->ordered_list); 80 spin_lock_init(&wq->list_lock); 81 spin_lock_init(&wq->thres_lock); 82 } 83 84 struct btrfs_workqueue *btrfs_alloc_workqueue(struct btrfs_fs_info *fs_info, 85 const char *name, unsigned int flags, 86 int limit_active, int thresh) 87 { 88 struct btrfs_workqueue *ret = kzalloc(sizeof(*ret), GFP_KERNEL); 89 90 if (!ret) 91 return NULL; 92 93 btrfs_init_workqueue(ret, fs_info); 94 95 ret->limit_active = limit_active; 96 if (thresh == 0) 97 thresh = DFT_THRESHOLD; 98 /* For low threshold, disabling threshold is a better choice */ 99 if (thresh < DFT_THRESHOLD) { 100 ret->current_active = limit_active; 101 ret->thresh = NO_THRESHOLD; 102 } else { 103 /* 104 * For threshold-able wq, let its concurrency grow on demand. 105 * Use minimal max_active at alloc time to reduce resource 106 * usage. 107 */ 108 ret->current_active = 1; 109 ret->thresh = thresh; 110 } 111 112 ret->normal_wq = alloc_workqueue("btrfs-%s", flags, ret->current_active, 113 name); 114 if (!ret->normal_wq) { 115 kfree(ret); 116 return NULL; 117 } 118 119 trace_btrfs_workqueue_alloc(ret, name); 120 return ret; 121 } 122 123 struct btrfs_workqueue *btrfs_alloc_ordered_workqueue( 124 struct btrfs_fs_info *fs_info, const char *name, 125 unsigned int flags) 126 { 127 struct btrfs_workqueue *ret; 128 129 ret = kzalloc(sizeof(*ret), GFP_KERNEL); 130 if (!ret) 131 return NULL; 132 133 btrfs_init_workqueue(ret, fs_info); 134 135 /* Ordered workqueues don't allow @max_active adjustments. */ 136 ret->limit_active = 1; 137 ret->current_active = 1; 138 ret->thresh = NO_THRESHOLD; 139 140 ret->normal_wq = alloc_ordered_workqueue("btrfs-%s", flags, name); 141 if (!ret->normal_wq) { 142 kfree(ret); 143 return NULL; 144 } 145 146 trace_btrfs_workqueue_alloc(ret, name); 147 return ret; 148 } 149 150 /* 151 * Hook for threshold which will be called in btrfs_queue_work. 152 * This hook WILL be called in IRQ handler context, 153 * so workqueue_set_max_active MUST NOT be called in this hook 154 */ 155 static inline void thresh_queue_hook(struct btrfs_workqueue *wq) 156 { 157 if (wq->thresh == NO_THRESHOLD) 158 return; 159 atomic_inc(&wq->pending); 160 } 161 162 /* 163 * Hook for threshold which will be called before executing the work, 164 * This hook is called in kthread content. 165 * So workqueue_set_max_active is called here. 166 */ 167 static inline void thresh_exec_hook(struct btrfs_workqueue *wq) 168 { 169 int new_current_active; 170 long pending; 171 int need_change = 0; 172 173 if (wq->thresh == NO_THRESHOLD) 174 return; 175 176 atomic_dec(&wq->pending); 177 spin_lock(&wq->thres_lock); 178 /* 179 * Use wq->count to limit the calling frequency of 180 * workqueue_set_max_active. 181 */ 182 wq->count++; 183 wq->count %= (wq->thresh / 4); 184 if (!wq->count) 185 goto out; 186 new_current_active = wq->current_active; 187 188 /* 189 * pending may be changed later, but it's OK since we really 190 * don't need it so accurate to calculate new_max_active. 191 */ 192 pending = atomic_read(&wq->pending); 193 if (pending > wq->thresh) 194 new_current_active++; 195 if (pending < wq->thresh / 2) 196 new_current_active--; 197 new_current_active = clamp_val(new_current_active, 1, wq->limit_active); 198 if (new_current_active != wq->current_active) { 199 need_change = 1; 200 wq->current_active = new_current_active; 201 } 202 out: 203 spin_unlock(&wq->thres_lock); 204 205 if (need_change) { 206 workqueue_set_max_active(wq->normal_wq, wq->current_active); 207 } 208 } 209 210 static void run_ordered_work(struct btrfs_workqueue *wq, 211 struct btrfs_work *self) 212 { 213 struct list_head *list = &wq->ordered_list; 214 struct btrfs_work *work; 215 spinlock_t *lock = &wq->list_lock; 216 unsigned long flags; 217 bool free_self = false; 218 219 while (1) { 220 spin_lock_irqsave(lock, flags); 221 if (list_empty(list)) 222 break; 223 work = list_entry(list->next, struct btrfs_work, 224 ordered_list); 225 if (!test_bit(WORK_DONE_BIT, &work->flags)) 226 break; 227 /* 228 * Orders all subsequent loads after reading WORK_DONE_BIT, 229 * paired with the smp_mb__before_atomic in btrfs_work_helper 230 * this guarantees that the ordered function will see all 231 * updates from ordinary work function. 232 */ 233 smp_rmb(); 234 235 /* 236 * we are going to call the ordered done function, but 237 * we leave the work item on the list as a barrier so 238 * that later work items that are done don't have their 239 * functions called before this one returns 240 */ 241 if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags)) 242 break; 243 trace_btrfs_ordered_sched(work); 244 spin_unlock_irqrestore(lock, flags); 245 work->ordered_func(work); 246 247 /* now take the lock again and drop our item from the list */ 248 spin_lock_irqsave(lock, flags); 249 list_del(&work->ordered_list); 250 spin_unlock_irqrestore(lock, flags); 251 252 if (work == self) { 253 /* 254 * This is the work item that the worker is currently 255 * executing. 256 * 257 * The kernel workqueue code guarantees non-reentrancy 258 * of work items. I.e., if a work item with the same 259 * address and work function is queued twice, the second 260 * execution is blocked until the first one finishes. A 261 * work item may be freed and recycled with the same 262 * work function; the workqueue code assumes that the 263 * original work item cannot depend on the recycled work 264 * item in that case (see find_worker_executing_work()). 265 * 266 * Note that different types of Btrfs work can depend on 267 * each other, and one type of work on one Btrfs 268 * filesystem may even depend on the same type of work 269 * on another Btrfs filesystem via, e.g., a loop device. 270 * Therefore, we must not allow the current work item to 271 * be recycled until we are really done, otherwise we 272 * break the above assumption and can deadlock. 273 */ 274 free_self = true; 275 } else { 276 /* 277 * We don't want to call the ordered free functions with 278 * the lock held. 279 */ 280 work->ordered_free(work); 281 /* NB: work must not be dereferenced past this point. */ 282 trace_btrfs_all_work_done(wq->fs_info, work); 283 } 284 } 285 spin_unlock_irqrestore(lock, flags); 286 287 if (free_self) { 288 self->ordered_free(self); 289 /* NB: self must not be dereferenced past this point. */ 290 trace_btrfs_all_work_done(wq->fs_info, self); 291 } 292 } 293 294 static void btrfs_work_helper(struct work_struct *normal_work) 295 { 296 struct btrfs_work *work = container_of(normal_work, struct btrfs_work, 297 normal_work); 298 struct btrfs_workqueue *wq = work->wq; 299 int need_order = 0; 300 301 /* 302 * We should not touch things inside work in the following cases: 303 * 1) after work->func() if it has no ordered_free 304 * Since the struct is freed in work->func(). 305 * 2) after setting WORK_DONE_BIT 306 * The work may be freed in other threads almost instantly. 307 * So we save the needed things here. 308 */ 309 if (work->ordered_func) 310 need_order = 1; 311 312 trace_btrfs_work_sched(work); 313 thresh_exec_hook(wq); 314 work->func(work); 315 if (need_order) { 316 /* 317 * Ensures all memory accesses done in the work function are 318 * ordered before setting the WORK_DONE_BIT. Ensuring the thread 319 * which is going to executed the ordered work sees them. 320 * Pairs with the smp_rmb in run_ordered_work. 321 */ 322 smp_mb__before_atomic(); 323 set_bit(WORK_DONE_BIT, &work->flags); 324 run_ordered_work(wq, work); 325 } else { 326 /* NB: work must not be dereferenced past this point. */ 327 trace_btrfs_all_work_done(wq->fs_info, work); 328 } 329 } 330 331 void btrfs_init_work(struct btrfs_work *work, btrfs_func_t func, 332 btrfs_func_t ordered_func, btrfs_func_t ordered_free) 333 { 334 work->func = func; 335 work->ordered_func = ordered_func; 336 work->ordered_free = ordered_free; 337 INIT_WORK(&work->normal_work, btrfs_work_helper); 338 INIT_LIST_HEAD(&work->ordered_list); 339 work->flags = 0; 340 } 341 342 void btrfs_queue_work(struct btrfs_workqueue *wq, struct btrfs_work *work) 343 { 344 unsigned long flags; 345 346 work->wq = wq; 347 thresh_queue_hook(wq); 348 if (work->ordered_func) { 349 spin_lock_irqsave(&wq->list_lock, flags); 350 list_add_tail(&work->ordered_list, &wq->ordered_list); 351 spin_unlock_irqrestore(&wq->list_lock, flags); 352 } 353 trace_btrfs_work_queued(work); 354 queue_work(wq->normal_wq, &work->normal_work); 355 } 356 357 void btrfs_destroy_workqueue(struct btrfs_workqueue *wq) 358 { 359 if (!wq) 360 return; 361 destroy_workqueue(wq->normal_wq); 362 trace_btrfs_workqueue_destroy(wq); 363 kfree(wq); 364 } 365 366 void btrfs_workqueue_set_max(struct btrfs_workqueue *wq, int limit_active) 367 { 368 if (wq) 369 wq->limit_active = limit_active; 370 } 371 372 void btrfs_flush_workqueue(struct btrfs_workqueue *wq) 373 { 374 flush_workqueue(wq->normal_wq); 375 } 376