1 /****************************************************************************** 2 ******************************************************************************* 3 ** 4 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 5 ** Copyright (C) 2004-2011 Red Hat, Inc. All rights reserved. 6 ** 7 ** This copyrighted material is made available to anyone wishing to use, 8 ** modify, copy, or redistribute it subject to the terms and conditions 9 ** of the GNU General Public License v.2. 10 ** 11 ******************************************************************************* 12 ******************************************************************************/ 13 14 #include "dlm_internal.h" 15 #include "lockspace.h" 16 #include "member.h" 17 #include "recoverd.h" 18 #include "dir.h" 19 #include "lowcomms.h" 20 #include "config.h" 21 #include "memory.h" 22 #include "lock.h" 23 #include "recover.h" 24 #include "requestqueue.h" 25 #include "user.h" 26 #include "ast.h" 27 28 static int ls_count; 29 static struct mutex ls_lock; 30 static struct list_head lslist; 31 static spinlock_t lslist_lock; 32 static struct task_struct * scand_task; 33 34 35 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len) 36 { 37 ssize_t ret = len; 38 int n = simple_strtol(buf, NULL, 0); 39 40 ls = dlm_find_lockspace_local(ls->ls_local_handle); 41 if (!ls) 42 return -EINVAL; 43 44 switch (n) { 45 case 0: 46 dlm_ls_stop(ls); 47 break; 48 case 1: 49 dlm_ls_start(ls); 50 break; 51 default: 52 ret = -EINVAL; 53 } 54 dlm_put_lockspace(ls); 55 return ret; 56 } 57 58 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len) 59 { 60 ls->ls_uevent_result = simple_strtol(buf, NULL, 0); 61 set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags); 62 wake_up(&ls->ls_uevent_wait); 63 return len; 64 } 65 66 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf) 67 { 68 return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id); 69 } 70 71 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len) 72 { 73 ls->ls_global_id = simple_strtoul(buf, NULL, 0); 74 return len; 75 } 76 77 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf) 78 { 79 return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls)); 80 } 81 82 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len) 83 { 84 int val = simple_strtoul(buf, NULL, 0); 85 if (val == 1) 86 set_bit(LSFL_NODIR, &ls->ls_flags); 87 return len; 88 } 89 90 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf) 91 { 92 uint32_t status = dlm_recover_status(ls); 93 return snprintf(buf, PAGE_SIZE, "%x\n", status); 94 } 95 96 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf) 97 { 98 return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid); 99 } 100 101 struct dlm_attr { 102 struct attribute attr; 103 ssize_t (*show)(struct dlm_ls *, char *); 104 ssize_t (*store)(struct dlm_ls *, const char *, size_t); 105 }; 106 107 static struct dlm_attr dlm_attr_control = { 108 .attr = {.name = "control", .mode = S_IWUSR}, 109 .store = dlm_control_store 110 }; 111 112 static struct dlm_attr dlm_attr_event = { 113 .attr = {.name = "event_done", .mode = S_IWUSR}, 114 .store = dlm_event_store 115 }; 116 117 static struct dlm_attr dlm_attr_id = { 118 .attr = {.name = "id", .mode = S_IRUGO | S_IWUSR}, 119 .show = dlm_id_show, 120 .store = dlm_id_store 121 }; 122 123 static struct dlm_attr dlm_attr_nodir = { 124 .attr = {.name = "nodir", .mode = S_IRUGO | S_IWUSR}, 125 .show = dlm_nodir_show, 126 .store = dlm_nodir_store 127 }; 128 129 static struct dlm_attr dlm_attr_recover_status = { 130 .attr = {.name = "recover_status", .mode = S_IRUGO}, 131 .show = dlm_recover_status_show 132 }; 133 134 static struct dlm_attr dlm_attr_recover_nodeid = { 135 .attr = {.name = "recover_nodeid", .mode = S_IRUGO}, 136 .show = dlm_recover_nodeid_show 137 }; 138 139 static struct attribute *dlm_attrs[] = { 140 &dlm_attr_control.attr, 141 &dlm_attr_event.attr, 142 &dlm_attr_id.attr, 143 &dlm_attr_nodir.attr, 144 &dlm_attr_recover_status.attr, 145 &dlm_attr_recover_nodeid.attr, 146 NULL, 147 }; 148 149 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr, 150 char *buf) 151 { 152 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj); 153 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr); 154 return a->show ? a->show(ls, buf) : 0; 155 } 156 157 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr, 158 const char *buf, size_t len) 159 { 160 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj); 161 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr); 162 return a->store ? a->store(ls, buf, len) : len; 163 } 164 165 static void lockspace_kobj_release(struct kobject *k) 166 { 167 struct dlm_ls *ls = container_of(k, struct dlm_ls, ls_kobj); 168 kfree(ls); 169 } 170 171 static const struct sysfs_ops dlm_attr_ops = { 172 .show = dlm_attr_show, 173 .store = dlm_attr_store, 174 }; 175 176 static struct kobj_type dlm_ktype = { 177 .default_attrs = dlm_attrs, 178 .sysfs_ops = &dlm_attr_ops, 179 .release = lockspace_kobj_release, 180 }; 181 182 static struct kset *dlm_kset; 183 184 static int do_uevent(struct dlm_ls *ls, int in) 185 { 186 int error; 187 188 if (in) 189 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE); 190 else 191 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE); 192 193 log_debug(ls, "%s the lockspace group...", in ? "joining" : "leaving"); 194 195 /* dlm_controld will see the uevent, do the necessary group management 196 and then write to sysfs to wake us */ 197 198 error = wait_event_interruptible(ls->ls_uevent_wait, 199 test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags)); 200 201 log_debug(ls, "group event done %d %d", error, ls->ls_uevent_result); 202 203 if (error) 204 goto out; 205 206 error = ls->ls_uevent_result; 207 out: 208 if (error) 209 log_error(ls, "group %s failed %d %d", in ? "join" : "leave", 210 error, ls->ls_uevent_result); 211 return error; 212 } 213 214 static int dlm_uevent(struct kset *kset, struct kobject *kobj, 215 struct kobj_uevent_env *env) 216 { 217 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj); 218 219 add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name); 220 return 0; 221 } 222 223 static struct kset_uevent_ops dlm_uevent_ops = { 224 .uevent = dlm_uevent, 225 }; 226 227 int __init dlm_lockspace_init(void) 228 { 229 ls_count = 0; 230 mutex_init(&ls_lock); 231 INIT_LIST_HEAD(&lslist); 232 spin_lock_init(&lslist_lock); 233 234 dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj); 235 if (!dlm_kset) { 236 printk(KERN_WARNING "%s: can not create kset\n", __func__); 237 return -ENOMEM; 238 } 239 return 0; 240 } 241 242 void dlm_lockspace_exit(void) 243 { 244 kset_unregister(dlm_kset); 245 } 246 247 static struct dlm_ls *find_ls_to_scan(void) 248 { 249 struct dlm_ls *ls; 250 251 spin_lock(&lslist_lock); 252 list_for_each_entry(ls, &lslist, ls_list) { 253 if (time_after_eq(jiffies, ls->ls_scan_time + 254 dlm_config.ci_scan_secs * HZ)) { 255 spin_unlock(&lslist_lock); 256 return ls; 257 } 258 } 259 spin_unlock(&lslist_lock); 260 return NULL; 261 } 262 263 static int dlm_scand(void *data) 264 { 265 struct dlm_ls *ls; 266 267 while (!kthread_should_stop()) { 268 ls = find_ls_to_scan(); 269 if (ls) { 270 if (dlm_lock_recovery_try(ls)) { 271 ls->ls_scan_time = jiffies; 272 dlm_scan_rsbs(ls); 273 dlm_scan_timeout(ls); 274 dlm_scan_waiters(ls); 275 dlm_unlock_recovery(ls); 276 } else { 277 ls->ls_scan_time += HZ; 278 } 279 continue; 280 } 281 schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ); 282 } 283 return 0; 284 } 285 286 static int dlm_scand_start(void) 287 { 288 struct task_struct *p; 289 int error = 0; 290 291 p = kthread_run(dlm_scand, NULL, "dlm_scand"); 292 if (IS_ERR(p)) 293 error = PTR_ERR(p); 294 else 295 scand_task = p; 296 return error; 297 } 298 299 static void dlm_scand_stop(void) 300 { 301 kthread_stop(scand_task); 302 } 303 304 struct dlm_ls *dlm_find_lockspace_global(uint32_t id) 305 { 306 struct dlm_ls *ls; 307 308 spin_lock(&lslist_lock); 309 310 list_for_each_entry(ls, &lslist, ls_list) { 311 if (ls->ls_global_id == id) { 312 ls->ls_count++; 313 goto out; 314 } 315 } 316 ls = NULL; 317 out: 318 spin_unlock(&lslist_lock); 319 return ls; 320 } 321 322 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace) 323 { 324 struct dlm_ls *ls; 325 326 spin_lock(&lslist_lock); 327 list_for_each_entry(ls, &lslist, ls_list) { 328 if (ls->ls_local_handle == lockspace) { 329 ls->ls_count++; 330 goto out; 331 } 332 } 333 ls = NULL; 334 out: 335 spin_unlock(&lslist_lock); 336 return ls; 337 } 338 339 struct dlm_ls *dlm_find_lockspace_device(int minor) 340 { 341 struct dlm_ls *ls; 342 343 spin_lock(&lslist_lock); 344 list_for_each_entry(ls, &lslist, ls_list) { 345 if (ls->ls_device.minor == minor) { 346 ls->ls_count++; 347 goto out; 348 } 349 } 350 ls = NULL; 351 out: 352 spin_unlock(&lslist_lock); 353 return ls; 354 } 355 356 void dlm_put_lockspace(struct dlm_ls *ls) 357 { 358 spin_lock(&lslist_lock); 359 ls->ls_count--; 360 spin_unlock(&lslist_lock); 361 } 362 363 static void remove_lockspace(struct dlm_ls *ls) 364 { 365 for (;;) { 366 spin_lock(&lslist_lock); 367 if (ls->ls_count == 0) { 368 WARN_ON(ls->ls_create_count != 0); 369 list_del(&ls->ls_list); 370 spin_unlock(&lslist_lock); 371 return; 372 } 373 spin_unlock(&lslist_lock); 374 ssleep(1); 375 } 376 } 377 378 static int threads_start(void) 379 { 380 int error; 381 382 error = dlm_scand_start(); 383 if (error) { 384 log_print("cannot start dlm_scand thread %d", error); 385 goto fail; 386 } 387 388 /* Thread for sending/receiving messages for all lockspace's */ 389 error = dlm_lowcomms_start(); 390 if (error) { 391 log_print("cannot start dlm lowcomms %d", error); 392 goto scand_fail; 393 } 394 395 return 0; 396 397 scand_fail: 398 dlm_scand_stop(); 399 fail: 400 return error; 401 } 402 403 static void threads_stop(void) 404 { 405 dlm_scand_stop(); 406 dlm_lowcomms_stop(); 407 } 408 409 static int new_lockspace(const char *name, const char *cluster, 410 uint32_t flags, int lvblen, 411 const struct dlm_lockspace_ops *ops, void *ops_arg, 412 int *ops_result, dlm_lockspace_t **lockspace) 413 { 414 struct dlm_ls *ls; 415 int i, size, error; 416 int do_unreg = 0; 417 int namelen = strlen(name); 418 419 if (namelen > DLM_LOCKSPACE_LEN) 420 return -EINVAL; 421 422 if (!lvblen || (lvblen % 8)) 423 return -EINVAL; 424 425 if (!try_module_get(THIS_MODULE)) 426 return -EINVAL; 427 428 if (!dlm_user_daemon_available()) { 429 log_print("dlm user daemon not available"); 430 error = -EUNATCH; 431 goto out; 432 } 433 434 if (ops && ops_result) { 435 if (!dlm_config.ci_recover_callbacks) 436 *ops_result = -EOPNOTSUPP; 437 else 438 *ops_result = 0; 439 } 440 441 if (dlm_config.ci_recover_callbacks && cluster && 442 strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) { 443 log_print("dlm cluster name %s mismatch %s", 444 dlm_config.ci_cluster_name, cluster); 445 error = -EBADR; 446 goto out; 447 } 448 449 error = 0; 450 451 spin_lock(&lslist_lock); 452 list_for_each_entry(ls, &lslist, ls_list) { 453 WARN_ON(ls->ls_create_count <= 0); 454 if (ls->ls_namelen != namelen) 455 continue; 456 if (memcmp(ls->ls_name, name, namelen)) 457 continue; 458 if (flags & DLM_LSFL_NEWEXCL) { 459 error = -EEXIST; 460 break; 461 } 462 ls->ls_create_count++; 463 *lockspace = ls; 464 error = 1; 465 break; 466 } 467 spin_unlock(&lslist_lock); 468 469 if (error) 470 goto out; 471 472 error = -ENOMEM; 473 474 ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS); 475 if (!ls) 476 goto out; 477 memcpy(ls->ls_name, name, namelen); 478 ls->ls_namelen = namelen; 479 ls->ls_lvblen = lvblen; 480 ls->ls_count = 0; 481 ls->ls_flags = 0; 482 ls->ls_scan_time = jiffies; 483 484 if (ops && dlm_config.ci_recover_callbacks) { 485 ls->ls_ops = ops; 486 ls->ls_ops_arg = ops_arg; 487 } 488 489 if (flags & DLM_LSFL_TIMEWARN) 490 set_bit(LSFL_TIMEWARN, &ls->ls_flags); 491 492 /* ls_exflags are forced to match among nodes, and we don't 493 need to require all nodes to have some flags set */ 494 ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS | 495 DLM_LSFL_NEWEXCL)); 496 497 size = dlm_config.ci_rsbtbl_size; 498 ls->ls_rsbtbl_size = size; 499 500 ls->ls_rsbtbl = vmalloc(sizeof(struct dlm_rsbtable) * size); 501 if (!ls->ls_rsbtbl) 502 goto out_lsfree; 503 for (i = 0; i < size; i++) { 504 ls->ls_rsbtbl[i].keep.rb_node = NULL; 505 ls->ls_rsbtbl[i].toss.rb_node = NULL; 506 spin_lock_init(&ls->ls_rsbtbl[i].lock); 507 } 508 509 idr_init(&ls->ls_lkbidr); 510 spin_lock_init(&ls->ls_lkbidr_spin); 511 512 INIT_LIST_HEAD(&ls->ls_waiters); 513 mutex_init(&ls->ls_waiters_mutex); 514 INIT_LIST_HEAD(&ls->ls_orphans); 515 mutex_init(&ls->ls_orphans_mutex); 516 INIT_LIST_HEAD(&ls->ls_timeout); 517 mutex_init(&ls->ls_timeout_mutex); 518 519 INIT_LIST_HEAD(&ls->ls_new_rsb); 520 spin_lock_init(&ls->ls_new_rsb_spin); 521 522 INIT_LIST_HEAD(&ls->ls_nodes); 523 INIT_LIST_HEAD(&ls->ls_nodes_gone); 524 ls->ls_num_nodes = 0; 525 ls->ls_low_nodeid = 0; 526 ls->ls_total_weight = 0; 527 ls->ls_node_array = NULL; 528 529 memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb)); 530 ls->ls_stub_rsb.res_ls = ls; 531 532 ls->ls_debug_rsb_dentry = NULL; 533 ls->ls_debug_waiters_dentry = NULL; 534 535 init_waitqueue_head(&ls->ls_uevent_wait); 536 ls->ls_uevent_result = 0; 537 init_completion(&ls->ls_members_done); 538 ls->ls_members_result = -1; 539 540 mutex_init(&ls->ls_cb_mutex); 541 INIT_LIST_HEAD(&ls->ls_cb_delay); 542 543 ls->ls_recoverd_task = NULL; 544 mutex_init(&ls->ls_recoverd_active); 545 spin_lock_init(&ls->ls_recover_lock); 546 spin_lock_init(&ls->ls_rcom_spin); 547 get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t)); 548 ls->ls_recover_status = 0; 549 ls->ls_recover_seq = 0; 550 ls->ls_recover_args = NULL; 551 init_rwsem(&ls->ls_in_recovery); 552 init_rwsem(&ls->ls_recv_active); 553 INIT_LIST_HEAD(&ls->ls_requestqueue); 554 mutex_init(&ls->ls_requestqueue_mutex); 555 mutex_init(&ls->ls_clear_proc_locks); 556 557 ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS); 558 if (!ls->ls_recover_buf) 559 goto out_lkbfree; 560 561 ls->ls_slot = 0; 562 ls->ls_num_slots = 0; 563 ls->ls_slots_size = 0; 564 ls->ls_slots = NULL; 565 566 INIT_LIST_HEAD(&ls->ls_recover_list); 567 spin_lock_init(&ls->ls_recover_list_lock); 568 idr_init(&ls->ls_recover_idr); 569 spin_lock_init(&ls->ls_recover_idr_lock); 570 ls->ls_recover_list_count = 0; 571 ls->ls_local_handle = ls; 572 init_waitqueue_head(&ls->ls_wait_general); 573 INIT_LIST_HEAD(&ls->ls_root_list); 574 init_rwsem(&ls->ls_root_sem); 575 576 down_write(&ls->ls_in_recovery); 577 578 spin_lock(&lslist_lock); 579 ls->ls_create_count = 1; 580 list_add(&ls->ls_list, &lslist); 581 spin_unlock(&lslist_lock); 582 583 if (flags & DLM_LSFL_FS) { 584 error = dlm_callback_start(ls); 585 if (error) { 586 log_error(ls, "can't start dlm_callback %d", error); 587 goto out_delist; 588 } 589 } 590 591 /* needs to find ls in lslist */ 592 error = dlm_recoverd_start(ls); 593 if (error) { 594 log_error(ls, "can't start dlm_recoverd %d", error); 595 goto out_callback; 596 } 597 598 ls->ls_kobj.kset = dlm_kset; 599 error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL, 600 "%s", ls->ls_name); 601 if (error) 602 goto out_recoverd; 603 kobject_uevent(&ls->ls_kobj, KOBJ_ADD); 604 605 /* let kobject handle freeing of ls if there's an error */ 606 do_unreg = 1; 607 608 /* This uevent triggers dlm_controld in userspace to add us to the 609 group of nodes that are members of this lockspace (managed by the 610 cluster infrastructure.) Once it's done that, it tells us who the 611 current lockspace members are (via configfs) and then tells the 612 lockspace to start running (via sysfs) in dlm_ls_start(). */ 613 614 error = do_uevent(ls, 1); 615 if (error) 616 goto out_recoverd; 617 618 wait_for_completion(&ls->ls_members_done); 619 error = ls->ls_members_result; 620 if (error) 621 goto out_members; 622 623 dlm_create_debug_file(ls); 624 625 log_debug(ls, "join complete"); 626 *lockspace = ls; 627 return 0; 628 629 out_members: 630 do_uevent(ls, 0); 631 dlm_clear_members(ls); 632 kfree(ls->ls_node_array); 633 out_recoverd: 634 dlm_recoverd_stop(ls); 635 out_callback: 636 dlm_callback_stop(ls); 637 out_delist: 638 spin_lock(&lslist_lock); 639 list_del(&ls->ls_list); 640 spin_unlock(&lslist_lock); 641 idr_destroy(&ls->ls_recover_idr); 642 kfree(ls->ls_recover_buf); 643 out_lkbfree: 644 idr_destroy(&ls->ls_lkbidr); 645 vfree(ls->ls_rsbtbl); 646 out_lsfree: 647 if (do_unreg) 648 kobject_put(&ls->ls_kobj); 649 else 650 kfree(ls); 651 out: 652 module_put(THIS_MODULE); 653 return error; 654 } 655 656 int dlm_new_lockspace(const char *name, const char *cluster, 657 uint32_t flags, int lvblen, 658 const struct dlm_lockspace_ops *ops, void *ops_arg, 659 int *ops_result, dlm_lockspace_t **lockspace) 660 { 661 int error = 0; 662 663 mutex_lock(&ls_lock); 664 if (!ls_count) 665 error = threads_start(); 666 if (error) 667 goto out; 668 669 error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg, 670 ops_result, lockspace); 671 if (!error) 672 ls_count++; 673 if (error > 0) 674 error = 0; 675 if (!ls_count) 676 threads_stop(); 677 out: 678 mutex_unlock(&ls_lock); 679 return error; 680 } 681 682 static int lkb_idr_is_local(int id, void *p, void *data) 683 { 684 struct dlm_lkb *lkb = p; 685 686 if (!lkb->lkb_nodeid) 687 return 1; 688 return 0; 689 } 690 691 static int lkb_idr_is_any(int id, void *p, void *data) 692 { 693 return 1; 694 } 695 696 static int lkb_idr_free(int id, void *p, void *data) 697 { 698 struct dlm_lkb *lkb = p; 699 700 if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY) 701 dlm_free_lvb(lkb->lkb_lvbptr); 702 703 dlm_free_lkb(lkb); 704 return 0; 705 } 706 707 /* NOTE: We check the lkbidr here rather than the resource table. 708 This is because there may be LKBs queued as ASTs that have been unlinked 709 from their RSBs and are pending deletion once the AST has been delivered */ 710 711 static int lockspace_busy(struct dlm_ls *ls, int force) 712 { 713 int rv; 714 715 spin_lock(&ls->ls_lkbidr_spin); 716 if (force == 0) { 717 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls); 718 } else if (force == 1) { 719 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls); 720 } else { 721 rv = 0; 722 } 723 spin_unlock(&ls->ls_lkbidr_spin); 724 return rv; 725 } 726 727 static int release_lockspace(struct dlm_ls *ls, int force) 728 { 729 struct dlm_rsb *rsb; 730 struct rb_node *n; 731 int i, busy, rv; 732 733 busy = lockspace_busy(ls, force); 734 735 spin_lock(&lslist_lock); 736 if (ls->ls_create_count == 1) { 737 if (busy) { 738 rv = -EBUSY; 739 } else { 740 /* remove_lockspace takes ls off lslist */ 741 ls->ls_create_count = 0; 742 rv = 0; 743 } 744 } else if (ls->ls_create_count > 1) { 745 rv = --ls->ls_create_count; 746 } else { 747 rv = -EINVAL; 748 } 749 spin_unlock(&lslist_lock); 750 751 if (rv) { 752 log_debug(ls, "release_lockspace no remove %d", rv); 753 return rv; 754 } 755 756 dlm_device_deregister(ls); 757 758 if (force < 3 && dlm_user_daemon_available()) 759 do_uevent(ls, 0); 760 761 dlm_recoverd_stop(ls); 762 763 dlm_callback_stop(ls); 764 765 remove_lockspace(ls); 766 767 dlm_delete_debug_file(ls); 768 769 kfree(ls->ls_recover_buf); 770 771 /* 772 * Free all lkb's in idr 773 */ 774 775 idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls); 776 idr_remove_all(&ls->ls_lkbidr); 777 idr_destroy(&ls->ls_lkbidr); 778 779 /* 780 * Free all rsb's on rsbtbl[] lists 781 */ 782 783 for (i = 0; i < ls->ls_rsbtbl_size; i++) { 784 while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) { 785 rsb = rb_entry(n, struct dlm_rsb, res_hashnode); 786 rb_erase(n, &ls->ls_rsbtbl[i].keep); 787 dlm_free_rsb(rsb); 788 } 789 790 while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) { 791 rsb = rb_entry(n, struct dlm_rsb, res_hashnode); 792 rb_erase(n, &ls->ls_rsbtbl[i].toss); 793 dlm_free_rsb(rsb); 794 } 795 } 796 797 vfree(ls->ls_rsbtbl); 798 799 while (!list_empty(&ls->ls_new_rsb)) { 800 rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, 801 res_hashchain); 802 list_del(&rsb->res_hashchain); 803 dlm_free_rsb(rsb); 804 } 805 806 /* 807 * Free structures on any other lists 808 */ 809 810 dlm_purge_requestqueue(ls); 811 kfree(ls->ls_recover_args); 812 dlm_clear_members(ls); 813 dlm_clear_members_gone(ls); 814 kfree(ls->ls_node_array); 815 log_debug(ls, "release_lockspace final free"); 816 kobject_put(&ls->ls_kobj); 817 /* The ls structure will be freed when the kobject is done with */ 818 819 module_put(THIS_MODULE); 820 return 0; 821 } 822 823 /* 824 * Called when a system has released all its locks and is not going to use the 825 * lockspace any longer. We free everything we're managing for this lockspace. 826 * Remaining nodes will go through the recovery process as if we'd died. The 827 * lockspace must continue to function as usual, participating in recoveries, 828 * until this returns. 829 * 830 * Force has 4 possible values: 831 * 0 - don't destroy locksapce if it has any LKBs 832 * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs 833 * 2 - destroy lockspace regardless of LKBs 834 * 3 - destroy lockspace as part of a forced shutdown 835 */ 836 837 int dlm_release_lockspace(void *lockspace, int force) 838 { 839 struct dlm_ls *ls; 840 int error; 841 842 ls = dlm_find_lockspace_local(lockspace); 843 if (!ls) 844 return -EINVAL; 845 dlm_put_lockspace(ls); 846 847 mutex_lock(&ls_lock); 848 error = release_lockspace(ls, force); 849 if (!error) 850 ls_count--; 851 if (!ls_count) 852 threads_stop(); 853 mutex_unlock(&ls_lock); 854 855 return error; 856 } 857 858 void dlm_stop_lockspaces(void) 859 { 860 struct dlm_ls *ls; 861 862 restart: 863 spin_lock(&lslist_lock); 864 list_for_each_entry(ls, &lslist, ls_list) { 865 if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) 866 continue; 867 spin_unlock(&lslist_lock); 868 log_error(ls, "no userland control daemon, stopping lockspace"); 869 dlm_ls_stop(ls); 870 goto restart; 871 } 872 spin_unlock(&lslist_lock); 873 } 874 875