1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Copyright (C) 2007 Oracle. All rights reserved. 4 * Copyright (C) 2014 Fujitsu. All rights reserved. 5 */ 6 7 #include <linux/kthread.h> 8 #include <linux/slab.h> 9 #include <linux/list.h> 10 #include <linux/spinlock.h> 11 #include <linux/freezer.h> 12 #include "async-thread.h" 13 #include "ctree.h" 14 15 #define WORK_DONE_BIT 0 16 #define WORK_ORDER_DONE_BIT 1 17 #define WORK_HIGH_PRIO_BIT 2 18 19 #define NO_THRESHOLD (-1) 20 #define DFT_THRESHOLD (32) 21 22 struct __btrfs_workqueue { 23 struct workqueue_struct *normal_wq; 24 25 /* File system this workqueue services */ 26 struct btrfs_fs_info *fs_info; 27 28 /* List head pointing to ordered work list */ 29 struct list_head ordered_list; 30 31 /* Spinlock for ordered_list */ 32 spinlock_t list_lock; 33 34 /* Thresholding related variants */ 35 atomic_t pending; 36 37 /* Up limit of concurrency workers */ 38 int limit_active; 39 40 /* Current number of concurrency workers */ 41 int current_active; 42 43 /* Threshold to change current_active */ 44 int thresh; 45 unsigned int count; 46 spinlock_t thres_lock; 47 }; 48 49 struct btrfs_workqueue { 50 struct __btrfs_workqueue *normal; 51 struct __btrfs_workqueue *high; 52 }; 53 54 static void normal_work_helper(struct btrfs_work *work); 55 56 #define BTRFS_WORK_HELPER(name) \ 57 noinline_for_stack void btrfs_##name(struct work_struct *arg) \ 58 { \ 59 struct btrfs_work *work = container_of(arg, struct btrfs_work, \ 60 normal_work); \ 61 normal_work_helper(work); \ 62 } 63 64 struct btrfs_fs_info * 65 btrfs_workqueue_owner(const struct __btrfs_workqueue *wq) 66 { 67 return wq->fs_info; 68 } 69 70 struct btrfs_fs_info * 71 btrfs_work_owner(const struct btrfs_work *work) 72 { 73 return work->wq->fs_info; 74 } 75 76 bool btrfs_workqueue_normal_congested(const struct btrfs_workqueue *wq) 77 { 78 /* 79 * We could compare wq->normal->pending with num_online_cpus() 80 * to support "thresh == NO_THRESHOLD" case, but it requires 81 * moving up atomic_inc/dec in thresh_queue/exec_hook. Let's 82 * postpone it until someone needs the support of that case. 83 */ 84 if (wq->normal->thresh == NO_THRESHOLD) 85 return false; 86 87 return atomic_read(&wq->normal->pending) > wq->normal->thresh * 2; 88 } 89 90 BTRFS_WORK_HELPER(worker_helper); 91 BTRFS_WORK_HELPER(delalloc_helper); 92 BTRFS_WORK_HELPER(flush_delalloc_helper); 93 BTRFS_WORK_HELPER(cache_helper); 94 BTRFS_WORK_HELPER(submit_helper); 95 BTRFS_WORK_HELPER(fixup_helper); 96 BTRFS_WORK_HELPER(endio_helper); 97 BTRFS_WORK_HELPER(endio_meta_helper); 98 BTRFS_WORK_HELPER(endio_meta_write_helper); 99 BTRFS_WORK_HELPER(endio_raid56_helper); 100 BTRFS_WORK_HELPER(endio_repair_helper); 101 BTRFS_WORK_HELPER(rmw_helper); 102 BTRFS_WORK_HELPER(endio_write_helper); 103 BTRFS_WORK_HELPER(freespace_write_helper); 104 BTRFS_WORK_HELPER(delayed_meta_helper); 105 BTRFS_WORK_HELPER(readahead_helper); 106 BTRFS_WORK_HELPER(qgroup_rescan_helper); 107 BTRFS_WORK_HELPER(extent_refs_helper); 108 BTRFS_WORK_HELPER(scrub_helper); 109 BTRFS_WORK_HELPER(scrubwrc_helper); 110 BTRFS_WORK_HELPER(scrubnc_helper); 111 BTRFS_WORK_HELPER(scrubparity_helper); 112 113 static struct __btrfs_workqueue * 114 __btrfs_alloc_workqueue(struct btrfs_fs_info *fs_info, const char *name, 115 unsigned int flags, int limit_active, int thresh) 116 { 117 struct __btrfs_workqueue *ret = kzalloc(sizeof(*ret), GFP_KERNEL); 118 119 if (!ret) 120 return NULL; 121 122 ret->fs_info = fs_info; 123 ret->limit_active = limit_active; 124 atomic_set(&ret->pending, 0); 125 if (thresh == 0) 126 thresh = DFT_THRESHOLD; 127 /* For low threshold, disabling threshold is a better choice */ 128 if (thresh < DFT_THRESHOLD) { 129 ret->current_active = limit_active; 130 ret->thresh = NO_THRESHOLD; 131 } else { 132 /* 133 * For threshold-able wq, let its concurrency grow on demand. 134 * Use minimal max_active at alloc time to reduce resource 135 * usage. 136 */ 137 ret->current_active = 1; 138 ret->thresh = thresh; 139 } 140 141 if (flags & WQ_HIGHPRI) 142 ret->normal_wq = alloc_workqueue("btrfs-%s-high", flags, 143 ret->current_active, name); 144 else 145 ret->normal_wq = alloc_workqueue("btrfs-%s", flags, 146 ret->current_active, name); 147 if (!ret->normal_wq) { 148 kfree(ret); 149 return NULL; 150 } 151 152 INIT_LIST_HEAD(&ret->ordered_list); 153 spin_lock_init(&ret->list_lock); 154 spin_lock_init(&ret->thres_lock); 155 trace_btrfs_workqueue_alloc(ret, name, flags & WQ_HIGHPRI); 156 return ret; 157 } 158 159 static inline void 160 __btrfs_destroy_workqueue(struct __btrfs_workqueue *wq); 161 162 struct btrfs_workqueue *btrfs_alloc_workqueue(struct btrfs_fs_info *fs_info, 163 const char *name, 164 unsigned int flags, 165 int limit_active, 166 int thresh) 167 { 168 struct btrfs_workqueue *ret = kzalloc(sizeof(*ret), GFP_KERNEL); 169 170 if (!ret) 171 return NULL; 172 173 ret->normal = __btrfs_alloc_workqueue(fs_info, name, 174 flags & ~WQ_HIGHPRI, 175 limit_active, thresh); 176 if (!ret->normal) { 177 kfree(ret); 178 return NULL; 179 } 180 181 if (flags & WQ_HIGHPRI) { 182 ret->high = __btrfs_alloc_workqueue(fs_info, name, flags, 183 limit_active, thresh); 184 if (!ret->high) { 185 __btrfs_destroy_workqueue(ret->normal); 186 kfree(ret); 187 return NULL; 188 } 189 } 190 return ret; 191 } 192 193 /* 194 * Hook for threshold which will be called in btrfs_queue_work. 195 * This hook WILL be called in IRQ handler context, 196 * so workqueue_set_max_active MUST NOT be called in this hook 197 */ 198 static inline void thresh_queue_hook(struct __btrfs_workqueue *wq) 199 { 200 if (wq->thresh == NO_THRESHOLD) 201 return; 202 atomic_inc(&wq->pending); 203 } 204 205 /* 206 * Hook for threshold which will be called before executing the work, 207 * This hook is called in kthread content. 208 * So workqueue_set_max_active is called here. 209 */ 210 static inline void thresh_exec_hook(struct __btrfs_workqueue *wq) 211 { 212 int new_current_active; 213 long pending; 214 int need_change = 0; 215 216 if (wq->thresh == NO_THRESHOLD) 217 return; 218 219 atomic_dec(&wq->pending); 220 spin_lock(&wq->thres_lock); 221 /* 222 * Use wq->count to limit the calling frequency of 223 * workqueue_set_max_active. 224 */ 225 wq->count++; 226 wq->count %= (wq->thresh / 4); 227 if (!wq->count) 228 goto out; 229 new_current_active = wq->current_active; 230 231 /* 232 * pending may be changed later, but it's OK since we really 233 * don't need it so accurate to calculate new_max_active. 234 */ 235 pending = atomic_read(&wq->pending); 236 if (pending > wq->thresh) 237 new_current_active++; 238 if (pending < wq->thresh / 2) 239 new_current_active--; 240 new_current_active = clamp_val(new_current_active, 1, wq->limit_active); 241 if (new_current_active != wq->current_active) { 242 need_change = 1; 243 wq->current_active = new_current_active; 244 } 245 out: 246 spin_unlock(&wq->thres_lock); 247 248 if (need_change) { 249 workqueue_set_max_active(wq->normal_wq, wq->current_active); 250 } 251 } 252 253 static void run_ordered_work(struct __btrfs_workqueue *wq) 254 { 255 struct list_head *list = &wq->ordered_list; 256 struct btrfs_work *work; 257 spinlock_t *lock = &wq->list_lock; 258 unsigned long flags; 259 260 while (1) { 261 void *wtag; 262 263 spin_lock_irqsave(lock, flags); 264 if (list_empty(list)) 265 break; 266 work = list_entry(list->next, struct btrfs_work, 267 ordered_list); 268 if (!test_bit(WORK_DONE_BIT, &work->flags)) 269 break; 270 271 /* 272 * we are going to call the ordered done function, but 273 * we leave the work item on the list as a barrier so 274 * that later work items that are done don't have their 275 * functions called before this one returns 276 */ 277 if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags)) 278 break; 279 trace_btrfs_ordered_sched(work); 280 spin_unlock_irqrestore(lock, flags); 281 work->ordered_func(work); 282 283 /* now take the lock again and drop our item from the list */ 284 spin_lock_irqsave(lock, flags); 285 list_del(&work->ordered_list); 286 spin_unlock_irqrestore(lock, flags); 287 288 /* 289 * We don't want to call the ordered free functions with the 290 * lock held though. Save the work as tag for the trace event, 291 * because the callback could free the structure. 292 */ 293 wtag = work; 294 work->ordered_free(work); 295 trace_btrfs_all_work_done(wq->fs_info, wtag); 296 } 297 spin_unlock_irqrestore(lock, flags); 298 } 299 300 static void normal_work_helper(struct btrfs_work *work) 301 { 302 struct __btrfs_workqueue *wq; 303 void *wtag; 304 int need_order = 0; 305 306 /* 307 * We should not touch things inside work in the following cases: 308 * 1) after work->func() if it has no ordered_free 309 * Since the struct is freed in work->func(). 310 * 2) after setting WORK_DONE_BIT 311 * The work may be freed in other threads almost instantly. 312 * So we save the needed things here. 313 */ 314 if (work->ordered_func) 315 need_order = 1; 316 wq = work->wq; 317 /* Safe for tracepoints in case work gets freed by the callback */ 318 wtag = work; 319 320 trace_btrfs_work_sched(work); 321 thresh_exec_hook(wq); 322 work->func(work); 323 if (need_order) { 324 set_bit(WORK_DONE_BIT, &work->flags); 325 run_ordered_work(wq); 326 } 327 if (!need_order) 328 trace_btrfs_all_work_done(wq->fs_info, wtag); 329 } 330 331 void btrfs_init_work(struct btrfs_work *work, btrfs_work_func_t uniq_func, 332 btrfs_func_t func, 333 btrfs_func_t ordered_func, 334 btrfs_func_t ordered_free) 335 { 336 work->func = func; 337 work->ordered_func = ordered_func; 338 work->ordered_free = ordered_free; 339 INIT_WORK(&work->normal_work, uniq_func); 340 INIT_LIST_HEAD(&work->ordered_list); 341 work->flags = 0; 342 } 343 344 static inline void __btrfs_queue_work(struct __btrfs_workqueue *wq, 345 struct btrfs_work *work) 346 { 347 unsigned long flags; 348 349 work->wq = wq; 350 thresh_queue_hook(wq); 351 if (work->ordered_func) { 352 spin_lock_irqsave(&wq->list_lock, flags); 353 list_add_tail(&work->ordered_list, &wq->ordered_list); 354 spin_unlock_irqrestore(&wq->list_lock, flags); 355 } 356 trace_btrfs_work_queued(work); 357 queue_work(wq->normal_wq, &work->normal_work); 358 } 359 360 void btrfs_queue_work(struct btrfs_workqueue *wq, 361 struct btrfs_work *work) 362 { 363 struct __btrfs_workqueue *dest_wq; 364 365 if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags) && wq->high) 366 dest_wq = wq->high; 367 else 368 dest_wq = wq->normal; 369 __btrfs_queue_work(dest_wq, work); 370 } 371 372 static inline void 373 __btrfs_destroy_workqueue(struct __btrfs_workqueue *wq) 374 { 375 destroy_workqueue(wq->normal_wq); 376 trace_btrfs_workqueue_destroy(wq); 377 kfree(wq); 378 } 379 380 void btrfs_destroy_workqueue(struct btrfs_workqueue *wq) 381 { 382 if (!wq) 383 return; 384 if (wq->high) 385 __btrfs_destroy_workqueue(wq->high); 386 __btrfs_destroy_workqueue(wq->normal); 387 kfree(wq); 388 } 389 390 void btrfs_workqueue_set_max(struct btrfs_workqueue *wq, int limit_active) 391 { 392 if (!wq) 393 return; 394 wq->normal->limit_active = limit_active; 395 if (wq->high) 396 wq->high->limit_active = limit_active; 397 } 398 399 void btrfs_set_work_high_priority(struct btrfs_work *work) 400 { 401 set_bit(WORK_HIGH_PRIO_BIT, &work->flags); 402 } 403