1 /* 2 * padata.c - generic interface to process data streams in parallel 3 * 4 * Copyright (C) 2008, 2009 secunet Security Networks AG 5 * Copyright (C) 2008, 2009 Steffen Klassert <steffen.klassert@secunet.com> 6 * 7 * This program is free software; you can redistribute it and/or modify it 8 * under the terms and conditions of the GNU General Public License, 9 * version 2, as published by the Free Software Foundation. 10 * 11 * This program is distributed in the hope it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for 14 * more details. 15 * 16 * You should have received a copy of the GNU General Public License along with 17 * this program; if not, write to the Free Software Foundation, Inc., 18 * 51 Franklin St - Fifth Floor, Boston, MA 02110-1301 USA. 19 */ 20 21 #include <linux/module.h> 22 #include <linux/cpumask.h> 23 #include <linux/err.h> 24 #include <linux/cpu.h> 25 #include <linux/padata.h> 26 #include <linux/mutex.h> 27 #include <linux/sched.h> 28 #include <linux/rcupdate.h> 29 30 #define MAX_SEQ_NR INT_MAX - NR_CPUS 31 #define MAX_OBJ_NUM 10000 * NR_CPUS 32 33 static int padata_index_to_cpu(struct parallel_data *pd, int cpu_index) 34 { 35 int cpu, target_cpu; 36 37 target_cpu = cpumask_first(pd->cpumask); 38 for (cpu = 0; cpu < cpu_index; cpu++) 39 target_cpu = cpumask_next(target_cpu, pd->cpumask); 40 41 return target_cpu; 42 } 43 44 static int padata_cpu_hash(struct padata_priv *padata) 45 { 46 int cpu_index; 47 struct parallel_data *pd; 48 49 pd = padata->pd; 50 51 /* 52 * Hash the sequence numbers to the cpus by taking 53 * seq_nr mod. number of cpus in use. 54 */ 55 cpu_index = padata->seq_nr % cpumask_weight(pd->cpumask); 56 57 return padata_index_to_cpu(pd, cpu_index); 58 } 59 60 static void padata_parallel_worker(struct work_struct *work) 61 { 62 struct padata_queue *queue; 63 struct parallel_data *pd; 64 struct padata_instance *pinst; 65 LIST_HEAD(local_list); 66 67 local_bh_disable(); 68 queue = container_of(work, struct padata_queue, pwork); 69 pd = queue->pd; 70 pinst = pd->pinst; 71 72 spin_lock(&queue->parallel.lock); 73 list_replace_init(&queue->parallel.list, &local_list); 74 spin_unlock(&queue->parallel.lock); 75 76 while (!list_empty(&local_list)) { 77 struct padata_priv *padata; 78 79 padata = list_entry(local_list.next, 80 struct padata_priv, list); 81 82 list_del_init(&padata->list); 83 84 padata->parallel(padata); 85 } 86 87 local_bh_enable(); 88 } 89 90 /* 91 * padata_do_parallel - padata parallelization function 92 * 93 * @pinst: padata instance 94 * @padata: object to be parallelized 95 * @cb_cpu: cpu the serialization callback function will run on, 96 * must be in the cpumask of padata. 97 * 98 * The parallelization callback function will run with BHs off. 99 * Note: Every object which is parallelized by padata_do_parallel 100 * must be seen by padata_do_serial. 101 */ 102 int padata_do_parallel(struct padata_instance *pinst, 103 struct padata_priv *padata, int cb_cpu) 104 { 105 int target_cpu, err; 106 struct padata_queue *queue; 107 struct parallel_data *pd; 108 109 rcu_read_lock_bh(); 110 111 pd = rcu_dereference(pinst->pd); 112 113 err = 0; 114 if (!(pinst->flags & PADATA_INIT)) 115 goto out; 116 117 err = -EBUSY; 118 if ((pinst->flags & PADATA_RESET)) 119 goto out; 120 121 if (atomic_read(&pd->refcnt) >= MAX_OBJ_NUM) 122 goto out; 123 124 err = -EINVAL; 125 if (!cpumask_test_cpu(cb_cpu, pd->cpumask)) 126 goto out; 127 128 err = -EINPROGRESS; 129 atomic_inc(&pd->refcnt); 130 padata->pd = pd; 131 padata->cb_cpu = cb_cpu; 132 133 if (unlikely(atomic_read(&pd->seq_nr) == pd->max_seq_nr)) 134 atomic_set(&pd->seq_nr, -1); 135 136 padata->seq_nr = atomic_inc_return(&pd->seq_nr); 137 138 target_cpu = padata_cpu_hash(padata); 139 queue = per_cpu_ptr(pd->queue, target_cpu); 140 141 spin_lock(&queue->parallel.lock); 142 list_add_tail(&padata->list, &queue->parallel.list); 143 spin_unlock(&queue->parallel.lock); 144 145 queue_work_on(target_cpu, pinst->wq, &queue->pwork); 146 147 out: 148 rcu_read_unlock_bh(); 149 150 return err; 151 } 152 EXPORT_SYMBOL(padata_do_parallel); 153 154 static struct padata_priv *padata_get_next(struct parallel_data *pd) 155 { 156 int cpu, num_cpus, empty, calc_seq_nr; 157 int seq_nr, next_nr, overrun, next_overrun; 158 struct padata_queue *queue, *next_queue; 159 struct padata_priv *padata; 160 struct padata_list *reorder; 161 162 empty = 0; 163 next_nr = -1; 164 next_overrun = 0; 165 next_queue = NULL; 166 167 num_cpus = cpumask_weight(pd->cpumask); 168 169 for_each_cpu(cpu, pd->cpumask) { 170 queue = per_cpu_ptr(pd->queue, cpu); 171 reorder = &queue->reorder; 172 173 /* 174 * Calculate the seq_nr of the object that should be 175 * next in this queue. 176 */ 177 overrun = 0; 178 calc_seq_nr = (atomic_read(&queue->num_obj) * num_cpus) 179 + queue->cpu_index; 180 181 if (unlikely(calc_seq_nr > pd->max_seq_nr)) { 182 calc_seq_nr = calc_seq_nr - pd->max_seq_nr - 1; 183 overrun = 1; 184 } 185 186 if (!list_empty(&reorder->list)) { 187 padata = list_entry(reorder->list.next, 188 struct padata_priv, list); 189 190 seq_nr = padata->seq_nr; 191 BUG_ON(calc_seq_nr != seq_nr); 192 } else { 193 seq_nr = calc_seq_nr; 194 empty++; 195 } 196 197 if (next_nr < 0 || seq_nr < next_nr 198 || (next_overrun && !overrun)) { 199 next_nr = seq_nr; 200 next_overrun = overrun; 201 next_queue = queue; 202 } 203 } 204 205 padata = NULL; 206 207 if (empty == num_cpus) 208 goto out; 209 210 reorder = &next_queue->reorder; 211 212 if (!list_empty(&reorder->list)) { 213 padata = list_entry(reorder->list.next, 214 struct padata_priv, list); 215 216 if (unlikely(next_overrun)) { 217 for_each_cpu(cpu, pd->cpumask) { 218 queue = per_cpu_ptr(pd->queue, cpu); 219 atomic_set(&queue->num_obj, 0); 220 } 221 } 222 223 spin_lock(&reorder->lock); 224 list_del_init(&padata->list); 225 atomic_dec(&pd->reorder_objects); 226 spin_unlock(&reorder->lock); 227 228 atomic_inc(&next_queue->num_obj); 229 230 goto out; 231 } 232 233 if (next_nr % num_cpus == next_queue->cpu_index) { 234 padata = ERR_PTR(-ENODATA); 235 goto out; 236 } 237 238 padata = ERR_PTR(-EINPROGRESS); 239 out: 240 return padata; 241 } 242 243 static void padata_reorder(struct parallel_data *pd) 244 { 245 struct padata_priv *padata; 246 struct padata_queue *queue; 247 struct padata_instance *pinst = pd->pinst; 248 249 try_again: 250 if (!spin_trylock_bh(&pd->lock)) 251 goto out; 252 253 while (1) { 254 padata = padata_get_next(pd); 255 256 if (!padata || PTR_ERR(padata) == -EINPROGRESS) 257 break; 258 259 if (PTR_ERR(padata) == -ENODATA) { 260 spin_unlock_bh(&pd->lock); 261 goto out; 262 } 263 264 queue = per_cpu_ptr(pd->queue, padata->cb_cpu); 265 266 spin_lock(&queue->serial.lock); 267 list_add_tail(&padata->list, &queue->serial.list); 268 spin_unlock(&queue->serial.lock); 269 270 queue_work_on(padata->cb_cpu, pinst->wq, &queue->swork); 271 } 272 273 spin_unlock_bh(&pd->lock); 274 275 if (atomic_read(&pd->reorder_objects)) 276 goto try_again; 277 278 out: 279 return; 280 } 281 282 static void padata_serial_worker(struct work_struct *work) 283 { 284 struct padata_queue *queue; 285 struct parallel_data *pd; 286 LIST_HEAD(local_list); 287 288 local_bh_disable(); 289 queue = container_of(work, struct padata_queue, swork); 290 pd = queue->pd; 291 292 spin_lock(&queue->serial.lock); 293 list_replace_init(&queue->serial.list, &local_list); 294 spin_unlock(&queue->serial.lock); 295 296 while (!list_empty(&local_list)) { 297 struct padata_priv *padata; 298 299 padata = list_entry(local_list.next, 300 struct padata_priv, list); 301 302 list_del_init(&padata->list); 303 304 padata->serial(padata); 305 atomic_dec(&pd->refcnt); 306 } 307 local_bh_enable(); 308 } 309 310 /* 311 * padata_do_serial - padata serialization function 312 * 313 * @padata: object to be serialized. 314 * 315 * padata_do_serial must be called for every parallelized object. 316 * The serialization callback function will run with BHs off. 317 */ 318 void padata_do_serial(struct padata_priv *padata) 319 { 320 int cpu; 321 struct padata_queue *queue; 322 struct parallel_data *pd; 323 324 pd = padata->pd; 325 326 cpu = get_cpu(); 327 queue = per_cpu_ptr(pd->queue, cpu); 328 329 spin_lock(&queue->reorder.lock); 330 atomic_inc(&pd->reorder_objects); 331 list_add_tail(&padata->list, &queue->reorder.list); 332 spin_unlock(&queue->reorder.lock); 333 334 put_cpu(); 335 336 padata_reorder(pd); 337 } 338 EXPORT_SYMBOL(padata_do_serial); 339 340 static struct parallel_data *padata_alloc_pd(struct padata_instance *pinst, 341 const struct cpumask *cpumask) 342 { 343 int cpu, cpu_index, num_cpus; 344 struct padata_queue *queue; 345 struct parallel_data *pd; 346 347 cpu_index = 0; 348 349 pd = kzalloc(sizeof(struct parallel_data), GFP_KERNEL); 350 if (!pd) 351 goto err; 352 353 pd->queue = alloc_percpu(struct padata_queue); 354 if (!pd->queue) 355 goto err_free_pd; 356 357 if (!alloc_cpumask_var(&pd->cpumask, GFP_KERNEL)) 358 goto err_free_queue; 359 360 for_each_possible_cpu(cpu) { 361 queue = per_cpu_ptr(pd->queue, cpu); 362 363 queue->pd = pd; 364 365 if (cpumask_test_cpu(cpu, cpumask) 366 && cpumask_test_cpu(cpu, cpu_active_mask)) { 367 queue->cpu_index = cpu_index; 368 cpu_index++; 369 } else 370 queue->cpu_index = -1; 371 372 INIT_LIST_HEAD(&queue->reorder.list); 373 INIT_LIST_HEAD(&queue->parallel.list); 374 INIT_LIST_HEAD(&queue->serial.list); 375 spin_lock_init(&queue->reorder.lock); 376 spin_lock_init(&queue->parallel.lock); 377 spin_lock_init(&queue->serial.lock); 378 379 INIT_WORK(&queue->pwork, padata_parallel_worker); 380 INIT_WORK(&queue->swork, padata_serial_worker); 381 atomic_set(&queue->num_obj, 0); 382 } 383 384 cpumask_and(pd->cpumask, cpumask, cpu_active_mask); 385 386 num_cpus = cpumask_weight(pd->cpumask); 387 pd->max_seq_nr = (MAX_SEQ_NR / num_cpus) * num_cpus - 1; 388 389 atomic_set(&pd->seq_nr, -1); 390 atomic_set(&pd->reorder_objects, 0); 391 atomic_set(&pd->refcnt, 0); 392 pd->pinst = pinst; 393 spin_lock_init(&pd->lock); 394 395 return pd; 396 397 err_free_queue: 398 free_percpu(pd->queue); 399 err_free_pd: 400 kfree(pd); 401 err: 402 return NULL; 403 } 404 405 static void padata_free_pd(struct parallel_data *pd) 406 { 407 free_cpumask_var(pd->cpumask); 408 free_percpu(pd->queue); 409 kfree(pd); 410 } 411 412 static void padata_replace(struct padata_instance *pinst, 413 struct parallel_data *pd_new) 414 { 415 struct parallel_data *pd_old = pinst->pd; 416 417 pinst->flags |= PADATA_RESET; 418 419 rcu_assign_pointer(pinst->pd, pd_new); 420 421 synchronize_rcu(); 422 423 while (atomic_read(&pd_old->refcnt) != 0) 424 yield(); 425 426 flush_workqueue(pinst->wq); 427 428 padata_free_pd(pd_old); 429 430 pinst->flags &= ~PADATA_RESET; 431 } 432 433 /* 434 * padata_set_cpumask - set the cpumask that padata should use 435 * 436 * @pinst: padata instance 437 * @cpumask: the cpumask to use 438 */ 439 int padata_set_cpumask(struct padata_instance *pinst, 440 cpumask_var_t cpumask) 441 { 442 struct parallel_data *pd; 443 int err = 0; 444 445 might_sleep(); 446 447 mutex_lock(&pinst->lock); 448 449 pd = padata_alloc_pd(pinst, cpumask); 450 if (!pd) { 451 err = -ENOMEM; 452 goto out; 453 } 454 455 cpumask_copy(pinst->cpumask, cpumask); 456 457 padata_replace(pinst, pd); 458 459 out: 460 mutex_unlock(&pinst->lock); 461 462 return err; 463 } 464 EXPORT_SYMBOL(padata_set_cpumask); 465 466 static int __padata_add_cpu(struct padata_instance *pinst, int cpu) 467 { 468 struct parallel_data *pd; 469 470 if (cpumask_test_cpu(cpu, cpu_active_mask)) { 471 pd = padata_alloc_pd(pinst, pinst->cpumask); 472 if (!pd) 473 return -ENOMEM; 474 475 padata_replace(pinst, pd); 476 } 477 478 return 0; 479 } 480 481 /* 482 * padata_add_cpu - add a cpu to the padata cpumask 483 * 484 * @pinst: padata instance 485 * @cpu: cpu to add 486 */ 487 int padata_add_cpu(struct padata_instance *pinst, int cpu) 488 { 489 int err; 490 491 might_sleep(); 492 493 mutex_lock(&pinst->lock); 494 495 cpumask_set_cpu(cpu, pinst->cpumask); 496 err = __padata_add_cpu(pinst, cpu); 497 498 mutex_unlock(&pinst->lock); 499 500 return err; 501 } 502 EXPORT_SYMBOL(padata_add_cpu); 503 504 static int __padata_remove_cpu(struct padata_instance *pinst, int cpu) 505 { 506 struct parallel_data *pd; 507 508 if (cpumask_test_cpu(cpu, cpu_online_mask)) { 509 pd = padata_alloc_pd(pinst, pinst->cpumask); 510 if (!pd) 511 return -ENOMEM; 512 513 padata_replace(pinst, pd); 514 } 515 516 return 0; 517 } 518 519 /* 520 * padata_remove_cpu - remove a cpu from the padata cpumask 521 * 522 * @pinst: padata instance 523 * @cpu: cpu to remove 524 */ 525 int padata_remove_cpu(struct padata_instance *pinst, int cpu) 526 { 527 int err; 528 529 might_sleep(); 530 531 mutex_lock(&pinst->lock); 532 533 cpumask_clear_cpu(cpu, pinst->cpumask); 534 err = __padata_remove_cpu(pinst, cpu); 535 536 mutex_unlock(&pinst->lock); 537 538 return err; 539 } 540 EXPORT_SYMBOL(padata_remove_cpu); 541 542 /* 543 * padata_start - start the parallel processing 544 * 545 * @pinst: padata instance to start 546 */ 547 void padata_start(struct padata_instance *pinst) 548 { 549 might_sleep(); 550 551 mutex_lock(&pinst->lock); 552 pinst->flags |= PADATA_INIT; 553 mutex_unlock(&pinst->lock); 554 } 555 EXPORT_SYMBOL(padata_start); 556 557 /* 558 * padata_stop - stop the parallel processing 559 * 560 * @pinst: padata instance to stop 561 */ 562 void padata_stop(struct padata_instance *pinst) 563 { 564 might_sleep(); 565 566 mutex_lock(&pinst->lock); 567 pinst->flags &= ~PADATA_INIT; 568 mutex_unlock(&pinst->lock); 569 } 570 EXPORT_SYMBOL(padata_stop); 571 572 static int __cpuinit padata_cpu_callback(struct notifier_block *nfb, 573 unsigned long action, void *hcpu) 574 { 575 int err; 576 struct padata_instance *pinst; 577 int cpu = (unsigned long)hcpu; 578 579 pinst = container_of(nfb, struct padata_instance, cpu_notifier); 580 581 switch (action) { 582 case CPU_ONLINE: 583 case CPU_ONLINE_FROZEN: 584 if (!cpumask_test_cpu(cpu, pinst->cpumask)) 585 break; 586 mutex_lock(&pinst->lock); 587 err = __padata_add_cpu(pinst, cpu); 588 mutex_unlock(&pinst->lock); 589 if (err) 590 return NOTIFY_BAD; 591 break; 592 593 case CPU_DOWN_PREPARE: 594 case CPU_DOWN_PREPARE_FROZEN: 595 if (!cpumask_test_cpu(cpu, pinst->cpumask)) 596 break; 597 mutex_lock(&pinst->lock); 598 err = __padata_remove_cpu(pinst, cpu); 599 mutex_unlock(&pinst->lock); 600 if (err) 601 return NOTIFY_BAD; 602 break; 603 604 case CPU_UP_CANCELED: 605 case CPU_UP_CANCELED_FROZEN: 606 if (!cpumask_test_cpu(cpu, pinst->cpumask)) 607 break; 608 mutex_lock(&pinst->lock); 609 __padata_remove_cpu(pinst, cpu); 610 mutex_unlock(&pinst->lock); 611 612 case CPU_DOWN_FAILED: 613 case CPU_DOWN_FAILED_FROZEN: 614 if (!cpumask_test_cpu(cpu, pinst->cpumask)) 615 break; 616 mutex_lock(&pinst->lock); 617 __padata_add_cpu(pinst, cpu); 618 mutex_unlock(&pinst->lock); 619 } 620 621 return NOTIFY_OK; 622 } 623 624 /* 625 * padata_alloc - allocate and initialize a padata instance 626 * 627 * @cpumask: cpumask that padata uses for parallelization 628 * @wq: workqueue to use for the allocated padata instance 629 */ 630 struct padata_instance *padata_alloc(const struct cpumask *cpumask, 631 struct workqueue_struct *wq) 632 { 633 int err; 634 struct padata_instance *pinst; 635 struct parallel_data *pd; 636 637 pinst = kzalloc(sizeof(struct padata_instance), GFP_KERNEL); 638 if (!pinst) 639 goto err; 640 641 pd = padata_alloc_pd(pinst, cpumask); 642 if (!pd) 643 goto err_free_inst; 644 645 if (!alloc_cpumask_var(&pinst->cpumask, GFP_KERNEL)) 646 goto err_free_pd; 647 648 rcu_assign_pointer(pinst->pd, pd); 649 650 pinst->wq = wq; 651 652 cpumask_copy(pinst->cpumask, cpumask); 653 654 pinst->flags = 0; 655 656 pinst->cpu_notifier.notifier_call = padata_cpu_callback; 657 pinst->cpu_notifier.priority = 0; 658 err = register_hotcpu_notifier(&pinst->cpu_notifier); 659 if (err) 660 goto err_free_cpumask; 661 662 mutex_init(&pinst->lock); 663 664 return pinst; 665 666 err_free_cpumask: 667 free_cpumask_var(pinst->cpumask); 668 err_free_pd: 669 padata_free_pd(pd); 670 err_free_inst: 671 kfree(pinst); 672 err: 673 return NULL; 674 } 675 EXPORT_SYMBOL(padata_alloc); 676 677 /* 678 * padata_free - free a padata instance 679 * 680 * @ padata_inst: padata instance to free 681 */ 682 void padata_free(struct padata_instance *pinst) 683 { 684 padata_stop(pinst); 685 686 synchronize_rcu(); 687 688 while (atomic_read(&pinst->pd->refcnt) != 0) 689 yield(); 690 691 unregister_hotcpu_notifier(&pinst->cpu_notifier); 692 padata_free_pd(pinst->pd); 693 free_cpumask_var(pinst->cpumask); 694 kfree(pinst); 695 } 696 EXPORT_SYMBOL(padata_free); 697