1 // SPDX-License-Identifier: GPL-2.0-only 2 /****************************************************************************** 3 ******************************************************************************* 4 ** 5 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 6 ** Copyright (C) 2004-2011 Red Hat, Inc. All rights reserved. 7 ** 8 ** 9 ******************************************************************************* 10 ******************************************************************************/ 11 12 #include <linux/module.h> 13 14 #include "dlm_internal.h" 15 #include "lockspace.h" 16 #include "member.h" 17 #include "recoverd.h" 18 #include "dir.h" 19 #include "lowcomms.h" 20 #include "config.h" 21 #include "memory.h" 22 #include "lock.h" 23 #include "recover.h" 24 #include "requestqueue.h" 25 #include "user.h" 26 #include "ast.h" 27 28 static int ls_count; 29 static struct mutex ls_lock; 30 static struct list_head lslist; 31 static spinlock_t lslist_lock; 32 static struct task_struct * scand_task; 33 34 35 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len) 36 { 37 ssize_t ret = len; 38 int n; 39 int rc = kstrtoint(buf, 0, &n); 40 41 if (rc) 42 return rc; 43 ls = dlm_find_lockspace_local(ls->ls_local_handle); 44 if (!ls) 45 return -EINVAL; 46 47 switch (n) { 48 case 0: 49 dlm_ls_stop(ls); 50 break; 51 case 1: 52 dlm_ls_start(ls); 53 break; 54 default: 55 ret = -EINVAL; 56 } 57 dlm_put_lockspace(ls); 58 return ret; 59 } 60 61 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len) 62 { 63 int rc = kstrtoint(buf, 0, &ls->ls_uevent_result); 64 65 if (rc) 66 return rc; 67 set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags); 68 wake_up(&ls->ls_uevent_wait); 69 return len; 70 } 71 72 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf) 73 { 74 return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id); 75 } 76 77 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len) 78 { 79 int rc = kstrtouint(buf, 0, &ls->ls_global_id); 80 81 if (rc) 82 return rc; 83 return len; 84 } 85 86 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf) 87 { 88 return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls)); 89 } 90 91 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len) 92 { 93 int val; 94 int rc = kstrtoint(buf, 0, &val); 95 96 if (rc) 97 return rc; 98 if (val == 1) 99 set_bit(LSFL_NODIR, &ls->ls_flags); 100 return len; 101 } 102 103 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf) 104 { 105 uint32_t status = dlm_recover_status(ls); 106 return snprintf(buf, PAGE_SIZE, "%x\n", status); 107 } 108 109 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf) 110 { 111 return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid); 112 } 113 114 struct dlm_attr { 115 struct attribute attr; 116 ssize_t (*show)(struct dlm_ls *, char *); 117 ssize_t (*store)(struct dlm_ls *, const char *, size_t); 118 }; 119 120 static struct dlm_attr dlm_attr_control = { 121 .attr = {.name = "control", .mode = S_IWUSR}, 122 .store = dlm_control_store 123 }; 124 125 static struct dlm_attr dlm_attr_event = { 126 .attr = {.name = "event_done", .mode = S_IWUSR}, 127 .store = dlm_event_store 128 }; 129 130 static struct dlm_attr dlm_attr_id = { 131 .attr = {.name = "id", .mode = S_IRUGO | S_IWUSR}, 132 .show = dlm_id_show, 133 .store = dlm_id_store 134 }; 135 136 static struct dlm_attr dlm_attr_nodir = { 137 .attr = {.name = "nodir", .mode = S_IRUGO | S_IWUSR}, 138 .show = dlm_nodir_show, 139 .store = dlm_nodir_store 140 }; 141 142 static struct dlm_attr dlm_attr_recover_status = { 143 .attr = {.name = "recover_status", .mode = S_IRUGO}, 144 .show = dlm_recover_status_show 145 }; 146 147 static struct dlm_attr dlm_attr_recover_nodeid = { 148 .attr = {.name = "recover_nodeid", .mode = S_IRUGO}, 149 .show = dlm_recover_nodeid_show 150 }; 151 152 static struct attribute *dlm_attrs[] = { 153 &dlm_attr_control.attr, 154 &dlm_attr_event.attr, 155 &dlm_attr_id.attr, 156 &dlm_attr_nodir.attr, 157 &dlm_attr_recover_status.attr, 158 &dlm_attr_recover_nodeid.attr, 159 NULL, 160 }; 161 ATTRIBUTE_GROUPS(dlm); 162 163 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr, 164 char *buf) 165 { 166 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj); 167 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr); 168 return a->show ? a->show(ls, buf) : 0; 169 } 170 171 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr, 172 const char *buf, size_t len) 173 { 174 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj); 175 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr); 176 return a->store ? a->store(ls, buf, len) : len; 177 } 178 179 static void lockspace_kobj_release(struct kobject *k) 180 { 181 struct dlm_ls *ls = container_of(k, struct dlm_ls, ls_kobj); 182 kfree(ls); 183 } 184 185 static const struct sysfs_ops dlm_attr_ops = { 186 .show = dlm_attr_show, 187 .store = dlm_attr_store, 188 }; 189 190 static struct kobj_type dlm_ktype = { 191 .default_groups = dlm_groups, 192 .sysfs_ops = &dlm_attr_ops, 193 .release = lockspace_kobj_release, 194 }; 195 196 static struct kset *dlm_kset; 197 198 static int do_uevent(struct dlm_ls *ls, int in) 199 { 200 if (in) 201 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE); 202 else 203 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE); 204 205 log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving"); 206 207 /* dlm_controld will see the uevent, do the necessary group management 208 and then write to sysfs to wake us */ 209 210 wait_event(ls->ls_uevent_wait, 211 test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags)); 212 213 log_rinfo(ls, "group event done %d", ls->ls_uevent_result); 214 215 return ls->ls_uevent_result; 216 } 217 218 static int dlm_uevent(struct kset *kset, struct kobject *kobj, 219 struct kobj_uevent_env *env) 220 { 221 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj); 222 223 add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name); 224 return 0; 225 } 226 227 static const struct kset_uevent_ops dlm_uevent_ops = { 228 .uevent = dlm_uevent, 229 }; 230 231 int __init dlm_lockspace_init(void) 232 { 233 ls_count = 0; 234 mutex_init(&ls_lock); 235 INIT_LIST_HEAD(&lslist); 236 spin_lock_init(&lslist_lock); 237 238 dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj); 239 if (!dlm_kset) { 240 printk(KERN_WARNING "%s: can not create kset\n", __func__); 241 return -ENOMEM; 242 } 243 return 0; 244 } 245 246 void dlm_lockspace_exit(void) 247 { 248 kset_unregister(dlm_kset); 249 } 250 251 static struct dlm_ls *find_ls_to_scan(void) 252 { 253 struct dlm_ls *ls; 254 255 spin_lock(&lslist_lock); 256 list_for_each_entry(ls, &lslist, ls_list) { 257 if (time_after_eq(jiffies, ls->ls_scan_time + 258 dlm_config.ci_scan_secs * HZ)) { 259 spin_unlock(&lslist_lock); 260 return ls; 261 } 262 } 263 spin_unlock(&lslist_lock); 264 return NULL; 265 } 266 267 static int dlm_scand(void *data) 268 { 269 struct dlm_ls *ls; 270 271 while (!kthread_should_stop()) { 272 ls = find_ls_to_scan(); 273 if (ls) { 274 if (dlm_lock_recovery_try(ls)) { 275 ls->ls_scan_time = jiffies; 276 dlm_scan_rsbs(ls); 277 dlm_scan_timeout(ls); 278 dlm_scan_waiters(ls); 279 dlm_unlock_recovery(ls); 280 } else { 281 ls->ls_scan_time += HZ; 282 } 283 continue; 284 } 285 schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ); 286 } 287 return 0; 288 } 289 290 static int dlm_scand_start(void) 291 { 292 struct task_struct *p; 293 int error = 0; 294 295 p = kthread_run(dlm_scand, NULL, "dlm_scand"); 296 if (IS_ERR(p)) 297 error = PTR_ERR(p); 298 else 299 scand_task = p; 300 return error; 301 } 302 303 static void dlm_scand_stop(void) 304 { 305 kthread_stop(scand_task); 306 } 307 308 struct dlm_ls *dlm_find_lockspace_global(uint32_t id) 309 { 310 struct dlm_ls *ls; 311 312 spin_lock(&lslist_lock); 313 314 list_for_each_entry(ls, &lslist, ls_list) { 315 if (ls->ls_global_id == id) { 316 ls->ls_count++; 317 goto out; 318 } 319 } 320 ls = NULL; 321 out: 322 spin_unlock(&lslist_lock); 323 return ls; 324 } 325 326 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace) 327 { 328 struct dlm_ls *ls; 329 330 spin_lock(&lslist_lock); 331 list_for_each_entry(ls, &lslist, ls_list) { 332 if (ls->ls_local_handle == lockspace) { 333 ls->ls_count++; 334 goto out; 335 } 336 } 337 ls = NULL; 338 out: 339 spin_unlock(&lslist_lock); 340 return ls; 341 } 342 343 struct dlm_ls *dlm_find_lockspace_device(int minor) 344 { 345 struct dlm_ls *ls; 346 347 spin_lock(&lslist_lock); 348 list_for_each_entry(ls, &lslist, ls_list) { 349 if (ls->ls_device.minor == minor) { 350 ls->ls_count++; 351 goto out; 352 } 353 } 354 ls = NULL; 355 out: 356 spin_unlock(&lslist_lock); 357 return ls; 358 } 359 360 void dlm_put_lockspace(struct dlm_ls *ls) 361 { 362 spin_lock(&lslist_lock); 363 ls->ls_count--; 364 spin_unlock(&lslist_lock); 365 } 366 367 static void remove_lockspace(struct dlm_ls *ls) 368 { 369 for (;;) { 370 spin_lock(&lslist_lock); 371 if (ls->ls_count == 0) { 372 WARN_ON(ls->ls_create_count != 0); 373 list_del(&ls->ls_list); 374 spin_unlock(&lslist_lock); 375 return; 376 } 377 spin_unlock(&lslist_lock); 378 ssleep(1); 379 } 380 } 381 382 static int threads_start(void) 383 { 384 int error; 385 386 error = dlm_scand_start(); 387 if (error) { 388 log_print("cannot start dlm_scand thread %d", error); 389 goto fail; 390 } 391 392 /* Thread for sending/receiving messages for all lockspace's */ 393 error = dlm_lowcomms_start(); 394 if (error) { 395 log_print("cannot start dlm lowcomms %d", error); 396 goto scand_fail; 397 } 398 399 return 0; 400 401 scand_fail: 402 dlm_scand_stop(); 403 fail: 404 return error; 405 } 406 407 static int new_lockspace(const char *name, const char *cluster, 408 uint32_t flags, int lvblen, 409 const struct dlm_lockspace_ops *ops, void *ops_arg, 410 int *ops_result, dlm_lockspace_t **lockspace) 411 { 412 struct dlm_ls *ls; 413 int i, size, error; 414 int do_unreg = 0; 415 int namelen = strlen(name); 416 417 if (namelen > DLM_LOCKSPACE_LEN || namelen == 0) 418 return -EINVAL; 419 420 if (!lvblen || (lvblen % 8)) 421 return -EINVAL; 422 423 if (!try_module_get(THIS_MODULE)) 424 return -EINVAL; 425 426 if (!dlm_user_daemon_available()) { 427 log_print("dlm user daemon not available"); 428 error = -EUNATCH; 429 goto out; 430 } 431 432 if (ops && ops_result) { 433 if (!dlm_config.ci_recover_callbacks) 434 *ops_result = -EOPNOTSUPP; 435 else 436 *ops_result = 0; 437 } 438 439 if (!cluster) 440 log_print("dlm cluster name '%s' is being used without an application provided cluster name", 441 dlm_config.ci_cluster_name); 442 443 if (dlm_config.ci_recover_callbacks && cluster && 444 strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) { 445 log_print("dlm cluster name '%s' does not match " 446 "the application cluster name '%s'", 447 dlm_config.ci_cluster_name, cluster); 448 error = -EBADR; 449 goto out; 450 } 451 452 error = 0; 453 454 spin_lock(&lslist_lock); 455 list_for_each_entry(ls, &lslist, ls_list) { 456 WARN_ON(ls->ls_create_count <= 0); 457 if (ls->ls_namelen != namelen) 458 continue; 459 if (memcmp(ls->ls_name, name, namelen)) 460 continue; 461 if (flags & DLM_LSFL_NEWEXCL) { 462 error = -EEXIST; 463 break; 464 } 465 ls->ls_create_count++; 466 *lockspace = ls; 467 error = 1; 468 break; 469 } 470 spin_unlock(&lslist_lock); 471 472 if (error) 473 goto out; 474 475 error = -ENOMEM; 476 477 ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS); 478 if (!ls) 479 goto out; 480 memcpy(ls->ls_name, name, namelen); 481 ls->ls_namelen = namelen; 482 ls->ls_lvblen = lvblen; 483 ls->ls_count = 0; 484 ls->ls_flags = 0; 485 ls->ls_scan_time = jiffies; 486 487 if (ops && dlm_config.ci_recover_callbacks) { 488 ls->ls_ops = ops; 489 ls->ls_ops_arg = ops_arg; 490 } 491 492 if (flags & DLM_LSFL_TIMEWARN) 493 set_bit(LSFL_TIMEWARN, &ls->ls_flags); 494 495 /* ls_exflags are forced to match among nodes, and we don't 496 need to require all nodes to have some flags set */ 497 ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS | 498 DLM_LSFL_NEWEXCL)); 499 500 size = dlm_config.ci_rsbtbl_size; 501 ls->ls_rsbtbl_size = size; 502 503 ls->ls_rsbtbl = vmalloc(array_size(size, sizeof(struct dlm_rsbtable))); 504 if (!ls->ls_rsbtbl) 505 goto out_lsfree; 506 for (i = 0; i < size; i++) { 507 ls->ls_rsbtbl[i].keep.rb_node = NULL; 508 ls->ls_rsbtbl[i].toss.rb_node = NULL; 509 spin_lock_init(&ls->ls_rsbtbl[i].lock); 510 } 511 512 spin_lock_init(&ls->ls_remove_spin); 513 514 for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) { 515 ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1, 516 GFP_KERNEL); 517 if (!ls->ls_remove_names[i]) 518 goto out_rsbtbl; 519 } 520 521 idr_init(&ls->ls_lkbidr); 522 spin_lock_init(&ls->ls_lkbidr_spin); 523 524 INIT_LIST_HEAD(&ls->ls_waiters); 525 mutex_init(&ls->ls_waiters_mutex); 526 INIT_LIST_HEAD(&ls->ls_orphans); 527 mutex_init(&ls->ls_orphans_mutex); 528 INIT_LIST_HEAD(&ls->ls_timeout); 529 mutex_init(&ls->ls_timeout_mutex); 530 531 INIT_LIST_HEAD(&ls->ls_new_rsb); 532 spin_lock_init(&ls->ls_new_rsb_spin); 533 534 INIT_LIST_HEAD(&ls->ls_nodes); 535 INIT_LIST_HEAD(&ls->ls_nodes_gone); 536 ls->ls_num_nodes = 0; 537 ls->ls_low_nodeid = 0; 538 ls->ls_total_weight = 0; 539 ls->ls_node_array = NULL; 540 541 memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb)); 542 ls->ls_stub_rsb.res_ls = ls; 543 544 ls->ls_debug_rsb_dentry = NULL; 545 ls->ls_debug_waiters_dentry = NULL; 546 547 init_waitqueue_head(&ls->ls_uevent_wait); 548 ls->ls_uevent_result = 0; 549 init_completion(&ls->ls_members_done); 550 ls->ls_members_result = -1; 551 552 mutex_init(&ls->ls_cb_mutex); 553 INIT_LIST_HEAD(&ls->ls_cb_delay); 554 555 ls->ls_recoverd_task = NULL; 556 mutex_init(&ls->ls_recoverd_active); 557 spin_lock_init(&ls->ls_recover_lock); 558 spin_lock_init(&ls->ls_rcom_spin); 559 get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t)); 560 ls->ls_recover_status = 0; 561 ls->ls_recover_seq = 0; 562 ls->ls_recover_args = NULL; 563 init_rwsem(&ls->ls_in_recovery); 564 init_rwsem(&ls->ls_recv_active); 565 INIT_LIST_HEAD(&ls->ls_requestqueue); 566 mutex_init(&ls->ls_requestqueue_mutex); 567 mutex_init(&ls->ls_clear_proc_locks); 568 569 ls->ls_recover_buf = kmalloc(LOWCOMMS_MAX_TX_BUFFER_LEN, GFP_NOFS); 570 if (!ls->ls_recover_buf) 571 goto out_lkbidr; 572 573 ls->ls_slot = 0; 574 ls->ls_num_slots = 0; 575 ls->ls_slots_size = 0; 576 ls->ls_slots = NULL; 577 578 INIT_LIST_HEAD(&ls->ls_recover_list); 579 spin_lock_init(&ls->ls_recover_list_lock); 580 idr_init(&ls->ls_recover_idr); 581 spin_lock_init(&ls->ls_recover_idr_lock); 582 ls->ls_recover_list_count = 0; 583 ls->ls_local_handle = ls; 584 init_waitqueue_head(&ls->ls_wait_general); 585 INIT_LIST_HEAD(&ls->ls_root_list); 586 init_rwsem(&ls->ls_root_sem); 587 588 spin_lock(&lslist_lock); 589 ls->ls_create_count = 1; 590 list_add(&ls->ls_list, &lslist); 591 spin_unlock(&lslist_lock); 592 593 if (flags & DLM_LSFL_FS) { 594 error = dlm_callback_start(ls); 595 if (error) { 596 log_error(ls, "can't start dlm_callback %d", error); 597 goto out_delist; 598 } 599 } 600 601 init_waitqueue_head(&ls->ls_recover_lock_wait); 602 603 /* 604 * Once started, dlm_recoverd first looks for ls in lslist, then 605 * initializes ls_in_recovery as locked in "down" mode. We need 606 * to wait for the wakeup from dlm_recoverd because in_recovery 607 * has to start out in down mode. 608 */ 609 610 error = dlm_recoverd_start(ls); 611 if (error) { 612 log_error(ls, "can't start dlm_recoverd %d", error); 613 goto out_callback; 614 } 615 616 wait_event(ls->ls_recover_lock_wait, 617 test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags)); 618 619 /* let kobject handle freeing of ls if there's an error */ 620 do_unreg = 1; 621 622 ls->ls_kobj.kset = dlm_kset; 623 error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL, 624 "%s", ls->ls_name); 625 if (error) 626 goto out_recoverd; 627 kobject_uevent(&ls->ls_kobj, KOBJ_ADD); 628 629 /* This uevent triggers dlm_controld in userspace to add us to the 630 group of nodes that are members of this lockspace (managed by the 631 cluster infrastructure.) Once it's done that, it tells us who the 632 current lockspace members are (via configfs) and then tells the 633 lockspace to start running (via sysfs) in dlm_ls_start(). */ 634 635 error = do_uevent(ls, 1); 636 if (error) 637 goto out_recoverd; 638 639 wait_for_completion(&ls->ls_members_done); 640 error = ls->ls_members_result; 641 if (error) 642 goto out_members; 643 644 dlm_create_debug_file(ls); 645 646 log_rinfo(ls, "join complete"); 647 *lockspace = ls; 648 return 0; 649 650 out_members: 651 do_uevent(ls, 0); 652 dlm_clear_members(ls); 653 kfree(ls->ls_node_array); 654 out_recoverd: 655 dlm_recoverd_stop(ls); 656 out_callback: 657 dlm_callback_stop(ls); 658 out_delist: 659 spin_lock(&lslist_lock); 660 list_del(&ls->ls_list); 661 spin_unlock(&lslist_lock); 662 idr_destroy(&ls->ls_recover_idr); 663 kfree(ls->ls_recover_buf); 664 out_lkbidr: 665 idr_destroy(&ls->ls_lkbidr); 666 out_rsbtbl: 667 for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) 668 kfree(ls->ls_remove_names[i]); 669 vfree(ls->ls_rsbtbl); 670 out_lsfree: 671 if (do_unreg) 672 kobject_put(&ls->ls_kobj); 673 else 674 kfree(ls); 675 out: 676 module_put(THIS_MODULE); 677 return error; 678 } 679 680 int dlm_new_lockspace(const char *name, const char *cluster, 681 uint32_t flags, int lvblen, 682 const struct dlm_lockspace_ops *ops, void *ops_arg, 683 int *ops_result, dlm_lockspace_t **lockspace) 684 { 685 int error = 0; 686 687 mutex_lock(&ls_lock); 688 if (!ls_count) 689 error = threads_start(); 690 if (error) 691 goto out; 692 693 error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg, 694 ops_result, lockspace); 695 if (!error) 696 ls_count++; 697 if (error > 0) 698 error = 0; 699 if (!ls_count) { 700 dlm_scand_stop(); 701 dlm_lowcomms_shutdown(); 702 dlm_lowcomms_stop(); 703 } 704 out: 705 mutex_unlock(&ls_lock); 706 return error; 707 } 708 709 static int lkb_idr_is_local(int id, void *p, void *data) 710 { 711 struct dlm_lkb *lkb = p; 712 713 return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV; 714 } 715 716 static int lkb_idr_is_any(int id, void *p, void *data) 717 { 718 return 1; 719 } 720 721 static int lkb_idr_free(int id, void *p, void *data) 722 { 723 struct dlm_lkb *lkb = p; 724 725 if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY) 726 dlm_free_lvb(lkb->lkb_lvbptr); 727 728 dlm_free_lkb(lkb); 729 return 0; 730 } 731 732 /* NOTE: We check the lkbidr here rather than the resource table. 733 This is because there may be LKBs queued as ASTs that have been unlinked 734 from their RSBs and are pending deletion once the AST has been delivered */ 735 736 static int lockspace_busy(struct dlm_ls *ls, int force) 737 { 738 int rv; 739 740 spin_lock(&ls->ls_lkbidr_spin); 741 if (force == 0) { 742 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls); 743 } else if (force == 1) { 744 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls); 745 } else { 746 rv = 0; 747 } 748 spin_unlock(&ls->ls_lkbidr_spin); 749 return rv; 750 } 751 752 static int release_lockspace(struct dlm_ls *ls, int force) 753 { 754 struct dlm_rsb *rsb; 755 struct rb_node *n; 756 int i, busy, rv; 757 758 busy = lockspace_busy(ls, force); 759 760 spin_lock(&lslist_lock); 761 if (ls->ls_create_count == 1) { 762 if (busy) { 763 rv = -EBUSY; 764 } else { 765 /* remove_lockspace takes ls off lslist */ 766 ls->ls_create_count = 0; 767 rv = 0; 768 } 769 } else if (ls->ls_create_count > 1) { 770 rv = --ls->ls_create_count; 771 } else { 772 rv = -EINVAL; 773 } 774 spin_unlock(&lslist_lock); 775 776 if (rv) { 777 log_debug(ls, "release_lockspace no remove %d", rv); 778 return rv; 779 } 780 781 dlm_device_deregister(ls); 782 783 if (force < 3 && dlm_user_daemon_available()) 784 do_uevent(ls, 0); 785 786 dlm_recoverd_stop(ls); 787 788 if (ls_count == 1) { 789 dlm_scand_stop(); 790 dlm_lowcomms_shutdown(); 791 } 792 793 dlm_callback_stop(ls); 794 795 remove_lockspace(ls); 796 797 dlm_delete_debug_file(ls); 798 799 idr_destroy(&ls->ls_recover_idr); 800 kfree(ls->ls_recover_buf); 801 802 /* 803 * Free all lkb's in idr 804 */ 805 806 idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls); 807 idr_destroy(&ls->ls_lkbidr); 808 809 /* 810 * Free all rsb's on rsbtbl[] lists 811 */ 812 813 for (i = 0; i < ls->ls_rsbtbl_size; i++) { 814 while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) { 815 rsb = rb_entry(n, struct dlm_rsb, res_hashnode); 816 rb_erase(n, &ls->ls_rsbtbl[i].keep); 817 dlm_free_rsb(rsb); 818 } 819 820 while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) { 821 rsb = rb_entry(n, struct dlm_rsb, res_hashnode); 822 rb_erase(n, &ls->ls_rsbtbl[i].toss); 823 dlm_free_rsb(rsb); 824 } 825 } 826 827 vfree(ls->ls_rsbtbl); 828 829 for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) 830 kfree(ls->ls_remove_names[i]); 831 832 while (!list_empty(&ls->ls_new_rsb)) { 833 rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, 834 res_hashchain); 835 list_del(&rsb->res_hashchain); 836 dlm_free_rsb(rsb); 837 } 838 839 /* 840 * Free structures on any other lists 841 */ 842 843 dlm_purge_requestqueue(ls); 844 kfree(ls->ls_recover_args); 845 dlm_clear_members(ls); 846 dlm_clear_members_gone(ls); 847 kfree(ls->ls_node_array); 848 log_rinfo(ls, "release_lockspace final free"); 849 kobject_put(&ls->ls_kobj); 850 /* The ls structure will be freed when the kobject is done with */ 851 852 module_put(THIS_MODULE); 853 return 0; 854 } 855 856 /* 857 * Called when a system has released all its locks and is not going to use the 858 * lockspace any longer. We free everything we're managing for this lockspace. 859 * Remaining nodes will go through the recovery process as if we'd died. The 860 * lockspace must continue to function as usual, participating in recoveries, 861 * until this returns. 862 * 863 * Force has 4 possible values: 864 * 0 - don't destroy locksapce if it has any LKBs 865 * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs 866 * 2 - destroy lockspace regardless of LKBs 867 * 3 - destroy lockspace as part of a forced shutdown 868 */ 869 870 int dlm_release_lockspace(void *lockspace, int force) 871 { 872 struct dlm_ls *ls; 873 int error; 874 875 ls = dlm_find_lockspace_local(lockspace); 876 if (!ls) 877 return -EINVAL; 878 dlm_put_lockspace(ls); 879 880 mutex_lock(&ls_lock); 881 error = release_lockspace(ls, force); 882 if (!error) 883 ls_count--; 884 if (!ls_count) 885 dlm_lowcomms_stop(); 886 mutex_unlock(&ls_lock); 887 888 return error; 889 } 890 891 void dlm_stop_lockspaces(void) 892 { 893 struct dlm_ls *ls; 894 int count; 895 896 restart: 897 count = 0; 898 spin_lock(&lslist_lock); 899 list_for_each_entry(ls, &lslist, ls_list) { 900 if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) { 901 count++; 902 continue; 903 } 904 spin_unlock(&lslist_lock); 905 log_error(ls, "no userland control daemon, stopping lockspace"); 906 dlm_ls_stop(ls); 907 goto restart; 908 } 909 spin_unlock(&lslist_lock); 910 911 if (count) 912 log_print("dlm user daemon left %d lockspaces", count); 913 } 914 915