1 /****************************************************************************** 2 ******************************************************************************* 3 ** 4 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 5 ** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved. 6 ** 7 ** This copyrighted material is made available to anyone wishing to use, 8 ** modify, copy, or redistribute it subject to the terms and conditions 9 ** of the GNU General Public License v.2. 10 ** 11 ******************************************************************************* 12 ******************************************************************************/ 13 14 #include "dlm_internal.h" 15 #include "lockspace.h" 16 #include "dir.h" 17 #include "config.h" 18 #include "ast.h" 19 #include "memory.h" 20 #include "rcom.h" 21 #include "lock.h" 22 #include "lowcomms.h" 23 #include "member.h" 24 #include "recover.h" 25 26 27 /* 28 * Recovery waiting routines: these functions wait for a particular reply from 29 * a remote node, or for the remote node to report a certain status. They need 30 * to abort if the lockspace is stopped indicating a node has failed (perhaps 31 * the one being waited for). 32 */ 33 34 /* 35 * Wait until given function returns non-zero or lockspace is stopped 36 * (LS_RECOVERY_STOP set due to failure of a node in ls_nodes). When another 37 * function thinks it could have completed the waited-on task, they should wake 38 * up ls_wait_general to get an immediate response rather than waiting for the 39 * timeout. This uses a timeout so it can check periodically if the wait 40 * should abort due to node failure (which doesn't cause a wake_up). 41 * This should only be called by the dlm_recoverd thread. 42 */ 43 44 int dlm_wait_function(struct dlm_ls *ls, int (*testfn) (struct dlm_ls *ls)) 45 { 46 int error = 0; 47 int rv; 48 49 while (1) { 50 rv = wait_event_timeout(ls->ls_wait_general, 51 testfn(ls) || dlm_recovery_stopped(ls), 52 dlm_config.ci_recover_timer * HZ); 53 if (rv) 54 break; 55 } 56 57 if (dlm_recovery_stopped(ls)) { 58 log_debug(ls, "dlm_wait_function aborted"); 59 error = -EINTR; 60 } 61 return error; 62 } 63 64 /* 65 * An efficient way for all nodes to wait for all others to have a certain 66 * status. The node with the lowest nodeid polls all the others for their 67 * status (wait_status_all) and all the others poll the node with the low id 68 * for its accumulated result (wait_status_low). When all nodes have set 69 * status flag X, then status flag X_ALL will be set on the low nodeid. 70 */ 71 72 uint32_t dlm_recover_status(struct dlm_ls *ls) 73 { 74 uint32_t status; 75 spin_lock(&ls->ls_recover_lock); 76 status = ls->ls_recover_status; 77 spin_unlock(&ls->ls_recover_lock); 78 return status; 79 } 80 81 static void _set_recover_status(struct dlm_ls *ls, uint32_t status) 82 { 83 ls->ls_recover_status |= status; 84 } 85 86 void dlm_set_recover_status(struct dlm_ls *ls, uint32_t status) 87 { 88 spin_lock(&ls->ls_recover_lock); 89 _set_recover_status(ls, status); 90 spin_unlock(&ls->ls_recover_lock); 91 } 92 93 static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status, 94 int save_slots) 95 { 96 struct dlm_rcom *rc = ls->ls_recover_buf; 97 struct dlm_member *memb; 98 int error = 0, delay; 99 100 list_for_each_entry(memb, &ls->ls_nodes, list) { 101 delay = 0; 102 for (;;) { 103 if (dlm_recovery_stopped(ls)) { 104 error = -EINTR; 105 goto out; 106 } 107 108 error = dlm_rcom_status(ls, memb->nodeid, 0); 109 if (error) 110 goto out; 111 112 if (save_slots) 113 dlm_slot_save(ls, rc, memb); 114 115 if (rc->rc_result & wait_status) 116 break; 117 if (delay < 1000) 118 delay += 20; 119 msleep(delay); 120 } 121 } 122 out: 123 return error; 124 } 125 126 static int wait_status_low(struct dlm_ls *ls, uint32_t wait_status, 127 uint32_t status_flags) 128 { 129 struct dlm_rcom *rc = ls->ls_recover_buf; 130 int error = 0, delay = 0, nodeid = ls->ls_low_nodeid; 131 132 for (;;) { 133 if (dlm_recovery_stopped(ls)) { 134 error = -EINTR; 135 goto out; 136 } 137 138 error = dlm_rcom_status(ls, nodeid, status_flags); 139 if (error) 140 break; 141 142 if (rc->rc_result & wait_status) 143 break; 144 if (delay < 1000) 145 delay += 20; 146 msleep(delay); 147 } 148 out: 149 return error; 150 } 151 152 static int wait_status(struct dlm_ls *ls, uint32_t status) 153 { 154 uint32_t status_all = status << 1; 155 int error; 156 157 if (ls->ls_low_nodeid == dlm_our_nodeid()) { 158 error = wait_status_all(ls, status, 0); 159 if (!error) 160 dlm_set_recover_status(ls, status_all); 161 } else 162 error = wait_status_low(ls, status_all, 0); 163 164 return error; 165 } 166 167 int dlm_recover_members_wait(struct dlm_ls *ls) 168 { 169 struct dlm_member *memb; 170 struct dlm_slot *slots; 171 int num_slots, slots_size; 172 int error, rv; 173 uint32_t gen; 174 175 list_for_each_entry(memb, &ls->ls_nodes, list) { 176 memb->slot = -1; 177 memb->generation = 0; 178 } 179 180 if (ls->ls_low_nodeid == dlm_our_nodeid()) { 181 error = wait_status_all(ls, DLM_RS_NODES, 1); 182 if (error) 183 goto out; 184 185 /* slots array is sparse, slots_size may be > num_slots */ 186 187 rv = dlm_slots_assign(ls, &num_slots, &slots_size, &slots, &gen); 188 if (!rv) { 189 spin_lock(&ls->ls_recover_lock); 190 _set_recover_status(ls, DLM_RS_NODES_ALL); 191 ls->ls_num_slots = num_slots; 192 ls->ls_slots_size = slots_size; 193 ls->ls_slots = slots; 194 ls->ls_generation = gen; 195 spin_unlock(&ls->ls_recover_lock); 196 } else { 197 dlm_set_recover_status(ls, DLM_RS_NODES_ALL); 198 } 199 } else { 200 error = wait_status_low(ls, DLM_RS_NODES_ALL, DLM_RSF_NEED_SLOTS); 201 if (error) 202 goto out; 203 204 dlm_slots_copy_in(ls); 205 } 206 out: 207 return error; 208 } 209 210 int dlm_recover_directory_wait(struct dlm_ls *ls) 211 { 212 return wait_status(ls, DLM_RS_DIR); 213 } 214 215 int dlm_recover_locks_wait(struct dlm_ls *ls) 216 { 217 return wait_status(ls, DLM_RS_LOCKS); 218 } 219 220 int dlm_recover_done_wait(struct dlm_ls *ls) 221 { 222 return wait_status(ls, DLM_RS_DONE); 223 } 224 225 /* 226 * The recover_list contains all the rsb's for which we've requested the new 227 * master nodeid. As replies are returned from the resource directories the 228 * rsb's are removed from the list. When the list is empty we're done. 229 * 230 * The recover_list is later similarly used for all rsb's for which we've sent 231 * new lkb's and need to receive new corresponding lkid's. 232 * 233 * We use the address of the rsb struct as a simple local identifier for the 234 * rsb so we can match an rcom reply with the rsb it was sent for. 235 */ 236 237 static int recover_list_empty(struct dlm_ls *ls) 238 { 239 int empty; 240 241 spin_lock(&ls->ls_recover_list_lock); 242 empty = list_empty(&ls->ls_recover_list); 243 spin_unlock(&ls->ls_recover_list_lock); 244 245 return empty; 246 } 247 248 static void recover_list_add(struct dlm_rsb *r) 249 { 250 struct dlm_ls *ls = r->res_ls; 251 252 spin_lock(&ls->ls_recover_list_lock); 253 if (list_empty(&r->res_recover_list)) { 254 list_add_tail(&r->res_recover_list, &ls->ls_recover_list); 255 ls->ls_recover_list_count++; 256 dlm_hold_rsb(r); 257 } 258 spin_unlock(&ls->ls_recover_list_lock); 259 } 260 261 static void recover_list_del(struct dlm_rsb *r) 262 { 263 struct dlm_ls *ls = r->res_ls; 264 265 spin_lock(&ls->ls_recover_list_lock); 266 list_del_init(&r->res_recover_list); 267 ls->ls_recover_list_count--; 268 spin_unlock(&ls->ls_recover_list_lock); 269 270 dlm_put_rsb(r); 271 } 272 273 static void recover_list_clear(struct dlm_ls *ls) 274 { 275 struct dlm_rsb *r, *s; 276 277 spin_lock(&ls->ls_recover_list_lock); 278 list_for_each_entry_safe(r, s, &ls->ls_recover_list, res_recover_list) { 279 list_del_init(&r->res_recover_list); 280 r->res_recover_locks_count = 0; 281 dlm_put_rsb(r); 282 ls->ls_recover_list_count--; 283 } 284 285 if (ls->ls_recover_list_count != 0) { 286 log_error(ls, "warning: recover_list_count %d", 287 ls->ls_recover_list_count); 288 ls->ls_recover_list_count = 0; 289 } 290 spin_unlock(&ls->ls_recover_list_lock); 291 } 292 293 static int recover_idr_empty(struct dlm_ls *ls) 294 { 295 int empty = 1; 296 297 spin_lock(&ls->ls_recover_idr_lock); 298 if (ls->ls_recover_list_count) 299 empty = 0; 300 spin_unlock(&ls->ls_recover_idr_lock); 301 302 return empty; 303 } 304 305 static int recover_idr_add(struct dlm_rsb *r) 306 { 307 struct dlm_ls *ls = r->res_ls; 308 int rv; 309 310 idr_preload(GFP_NOFS); 311 spin_lock(&ls->ls_recover_idr_lock); 312 if (r->res_id) { 313 rv = -1; 314 goto out_unlock; 315 } 316 rv = idr_alloc(&ls->ls_recover_idr, r, 1, 0, GFP_NOWAIT); 317 if (rv < 0) 318 goto out_unlock; 319 320 r->res_id = rv; 321 ls->ls_recover_list_count++; 322 dlm_hold_rsb(r); 323 rv = 0; 324 out_unlock: 325 spin_unlock(&ls->ls_recover_idr_lock); 326 idr_preload_end(); 327 return rv; 328 } 329 330 static void recover_idr_del(struct dlm_rsb *r) 331 { 332 struct dlm_ls *ls = r->res_ls; 333 334 spin_lock(&ls->ls_recover_idr_lock); 335 idr_remove(&ls->ls_recover_idr, r->res_id); 336 r->res_id = 0; 337 ls->ls_recover_list_count--; 338 spin_unlock(&ls->ls_recover_idr_lock); 339 340 dlm_put_rsb(r); 341 } 342 343 static struct dlm_rsb *recover_idr_find(struct dlm_ls *ls, uint64_t id) 344 { 345 struct dlm_rsb *r; 346 347 spin_lock(&ls->ls_recover_idr_lock); 348 r = idr_find(&ls->ls_recover_idr, (int)id); 349 spin_unlock(&ls->ls_recover_idr_lock); 350 return r; 351 } 352 353 static void recover_idr_clear(struct dlm_ls *ls) 354 { 355 struct dlm_rsb *r; 356 int id; 357 358 spin_lock(&ls->ls_recover_idr_lock); 359 360 idr_for_each_entry(&ls->ls_recover_idr, r, id) { 361 idr_remove(&ls->ls_recover_idr, id); 362 r->res_id = 0; 363 r->res_recover_locks_count = 0; 364 ls->ls_recover_list_count--; 365 366 dlm_put_rsb(r); 367 } 368 369 if (ls->ls_recover_list_count != 0) { 370 log_error(ls, "warning: recover_list_count %d", 371 ls->ls_recover_list_count); 372 ls->ls_recover_list_count = 0; 373 } 374 spin_unlock(&ls->ls_recover_idr_lock); 375 } 376 377 378 /* Master recovery: find new master node for rsb's that were 379 mastered on nodes that have been removed. 380 381 dlm_recover_masters 382 recover_master 383 dlm_send_rcom_lookup -> receive_rcom_lookup 384 dlm_dir_lookup 385 receive_rcom_lookup_reply <- 386 dlm_recover_master_reply 387 set_new_master 388 set_master_lkbs 389 set_lock_master 390 */ 391 392 /* 393 * Set the lock master for all LKBs in a lock queue 394 * If we are the new master of the rsb, we may have received new 395 * MSTCPY locks from other nodes already which we need to ignore 396 * when setting the new nodeid. 397 */ 398 399 static void set_lock_master(struct list_head *queue, int nodeid) 400 { 401 struct dlm_lkb *lkb; 402 403 list_for_each_entry(lkb, queue, lkb_statequeue) { 404 if (!(lkb->lkb_flags & DLM_IFL_MSTCPY)) { 405 lkb->lkb_nodeid = nodeid; 406 lkb->lkb_remid = 0; 407 } 408 } 409 } 410 411 static void set_master_lkbs(struct dlm_rsb *r) 412 { 413 set_lock_master(&r->res_grantqueue, r->res_nodeid); 414 set_lock_master(&r->res_convertqueue, r->res_nodeid); 415 set_lock_master(&r->res_waitqueue, r->res_nodeid); 416 } 417 418 /* 419 * Propagate the new master nodeid to locks 420 * The NEW_MASTER flag tells dlm_recover_locks() which rsb's to consider. 421 * The NEW_MASTER2 flag tells recover_lvb() and recover_grant() which 422 * rsb's to consider. 423 */ 424 425 static void set_new_master(struct dlm_rsb *r) 426 { 427 set_master_lkbs(r); 428 rsb_set_flag(r, RSB_NEW_MASTER); 429 rsb_set_flag(r, RSB_NEW_MASTER2); 430 } 431 432 /* 433 * We do async lookups on rsb's that need new masters. The rsb's 434 * waiting for a lookup reply are kept on the recover_list. 435 * 436 * Another node recovering the master may have sent us a rcom lookup, 437 * and our dlm_master_lookup() set it as the new master, along with 438 * NEW_MASTER so that we'll recover it here (this implies dir_nodeid 439 * equals our_nodeid below). 440 */ 441 442 static int recover_master(struct dlm_rsb *r, unsigned int *count) 443 { 444 struct dlm_ls *ls = r->res_ls; 445 int our_nodeid, dir_nodeid; 446 int is_removed = 0; 447 int error; 448 449 if (is_master(r)) 450 return 0; 451 452 is_removed = dlm_is_removed(ls, r->res_nodeid); 453 454 if (!is_removed && !rsb_flag(r, RSB_NEW_MASTER)) 455 return 0; 456 457 our_nodeid = dlm_our_nodeid(); 458 dir_nodeid = dlm_dir_nodeid(r); 459 460 if (dir_nodeid == our_nodeid) { 461 if (is_removed) { 462 r->res_master_nodeid = our_nodeid; 463 r->res_nodeid = 0; 464 } 465 466 /* set master of lkbs to ourself when is_removed, or to 467 another new master which we set along with NEW_MASTER 468 in dlm_master_lookup */ 469 set_new_master(r); 470 error = 0; 471 } else { 472 recover_idr_add(r); 473 error = dlm_send_rcom_lookup(r, dir_nodeid); 474 } 475 476 (*count)++; 477 return error; 478 } 479 480 /* 481 * All MSTCPY locks are purged and rebuilt, even if the master stayed the same. 482 * This is necessary because recovery can be started, aborted and restarted, 483 * causing the master nodeid to briefly change during the aborted recovery, and 484 * change back to the original value in the second recovery. The MSTCPY locks 485 * may or may not have been purged during the aborted recovery. Another node 486 * with an outstanding request in waiters list and a request reply saved in the 487 * requestqueue, cannot know whether it should ignore the reply and resend the 488 * request, or accept the reply and complete the request. It must do the 489 * former if the remote node purged MSTCPY locks, and it must do the later if 490 * the remote node did not. This is solved by always purging MSTCPY locks, in 491 * which case, the request reply would always be ignored and the request 492 * resent. 493 */ 494 495 static int recover_master_static(struct dlm_rsb *r, unsigned int *count) 496 { 497 int dir_nodeid = dlm_dir_nodeid(r); 498 int new_master = dir_nodeid; 499 500 if (dir_nodeid == dlm_our_nodeid()) 501 new_master = 0; 502 503 dlm_purge_mstcpy_locks(r); 504 r->res_master_nodeid = dir_nodeid; 505 r->res_nodeid = new_master; 506 set_new_master(r); 507 (*count)++; 508 return 0; 509 } 510 511 /* 512 * Go through local root resources and for each rsb which has a master which 513 * has departed, get the new master nodeid from the directory. The dir will 514 * assign mastery to the first node to look up the new master. That means 515 * we'll discover in this lookup if we're the new master of any rsb's. 516 * 517 * We fire off all the dir lookup requests individually and asynchronously to 518 * the correct dir node. 519 */ 520 521 int dlm_recover_masters(struct dlm_ls *ls) 522 { 523 struct dlm_rsb *r; 524 unsigned int total = 0; 525 unsigned int count = 0; 526 int nodir = dlm_no_directory(ls); 527 int error; 528 529 log_rinfo(ls, "dlm_recover_masters"); 530 531 down_read(&ls->ls_root_sem); 532 list_for_each_entry(r, &ls->ls_root_list, res_root_list) { 533 if (dlm_recovery_stopped(ls)) { 534 up_read(&ls->ls_root_sem); 535 error = -EINTR; 536 goto out; 537 } 538 539 lock_rsb(r); 540 if (nodir) 541 error = recover_master_static(r, &count); 542 else 543 error = recover_master(r, &count); 544 unlock_rsb(r); 545 cond_resched(); 546 total++; 547 548 if (error) { 549 up_read(&ls->ls_root_sem); 550 goto out; 551 } 552 } 553 up_read(&ls->ls_root_sem); 554 555 log_rinfo(ls, "dlm_recover_masters %u of %u", count, total); 556 557 error = dlm_wait_function(ls, &recover_idr_empty); 558 out: 559 if (error) 560 recover_idr_clear(ls); 561 return error; 562 } 563 564 int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc) 565 { 566 struct dlm_rsb *r; 567 int ret_nodeid, new_master; 568 569 r = recover_idr_find(ls, rc->rc_id); 570 if (!r) { 571 log_error(ls, "dlm_recover_master_reply no id %llx", 572 (unsigned long long)rc->rc_id); 573 goto out; 574 } 575 576 ret_nodeid = rc->rc_result; 577 578 if (ret_nodeid == dlm_our_nodeid()) 579 new_master = 0; 580 else 581 new_master = ret_nodeid; 582 583 lock_rsb(r); 584 r->res_master_nodeid = ret_nodeid; 585 r->res_nodeid = new_master; 586 set_new_master(r); 587 unlock_rsb(r); 588 recover_idr_del(r); 589 590 if (recover_idr_empty(ls)) 591 wake_up(&ls->ls_wait_general); 592 out: 593 return 0; 594 } 595 596 597 /* Lock recovery: rebuild the process-copy locks we hold on a 598 remastered rsb on the new rsb master. 599 600 dlm_recover_locks 601 recover_locks 602 recover_locks_queue 603 dlm_send_rcom_lock -> receive_rcom_lock 604 dlm_recover_master_copy 605 receive_rcom_lock_reply <- 606 dlm_recover_process_copy 607 */ 608 609 610 /* 611 * keep a count of the number of lkb's we send to the new master; when we get 612 * an equal number of replies then recovery for the rsb is done 613 */ 614 615 static int recover_locks_queue(struct dlm_rsb *r, struct list_head *head) 616 { 617 struct dlm_lkb *lkb; 618 int error = 0; 619 620 list_for_each_entry(lkb, head, lkb_statequeue) { 621 error = dlm_send_rcom_lock(r, lkb); 622 if (error) 623 break; 624 r->res_recover_locks_count++; 625 } 626 627 return error; 628 } 629 630 static int recover_locks(struct dlm_rsb *r) 631 { 632 int error = 0; 633 634 lock_rsb(r); 635 636 DLM_ASSERT(!r->res_recover_locks_count, dlm_dump_rsb(r);); 637 638 error = recover_locks_queue(r, &r->res_grantqueue); 639 if (error) 640 goto out; 641 error = recover_locks_queue(r, &r->res_convertqueue); 642 if (error) 643 goto out; 644 error = recover_locks_queue(r, &r->res_waitqueue); 645 if (error) 646 goto out; 647 648 if (r->res_recover_locks_count) 649 recover_list_add(r); 650 else 651 rsb_clear_flag(r, RSB_NEW_MASTER); 652 out: 653 unlock_rsb(r); 654 return error; 655 } 656 657 int dlm_recover_locks(struct dlm_ls *ls) 658 { 659 struct dlm_rsb *r; 660 int error, count = 0; 661 662 down_read(&ls->ls_root_sem); 663 list_for_each_entry(r, &ls->ls_root_list, res_root_list) { 664 if (is_master(r)) { 665 rsb_clear_flag(r, RSB_NEW_MASTER); 666 continue; 667 } 668 669 if (!rsb_flag(r, RSB_NEW_MASTER)) 670 continue; 671 672 if (dlm_recovery_stopped(ls)) { 673 error = -EINTR; 674 up_read(&ls->ls_root_sem); 675 goto out; 676 } 677 678 error = recover_locks(r); 679 if (error) { 680 up_read(&ls->ls_root_sem); 681 goto out; 682 } 683 684 count += r->res_recover_locks_count; 685 } 686 up_read(&ls->ls_root_sem); 687 688 log_rinfo(ls, "dlm_recover_locks %d out", count); 689 690 error = dlm_wait_function(ls, &recover_list_empty); 691 out: 692 if (error) 693 recover_list_clear(ls); 694 return error; 695 } 696 697 void dlm_recovered_lock(struct dlm_rsb *r) 698 { 699 DLM_ASSERT(rsb_flag(r, RSB_NEW_MASTER), dlm_dump_rsb(r);); 700 701 r->res_recover_locks_count--; 702 if (!r->res_recover_locks_count) { 703 rsb_clear_flag(r, RSB_NEW_MASTER); 704 recover_list_del(r); 705 } 706 707 if (recover_list_empty(r->res_ls)) 708 wake_up(&r->res_ls->ls_wait_general); 709 } 710 711 /* 712 * The lvb needs to be recovered on all master rsb's. This includes setting 713 * the VALNOTVALID flag if necessary, and determining the correct lvb contents 714 * based on the lvb's of the locks held on the rsb. 715 * 716 * RSB_VALNOTVALID is set in two cases: 717 * 718 * 1. we are master, but not new, and we purged an EX/PW lock held by a 719 * failed node (in dlm_recover_purge which set RSB_RECOVER_LVB_INVAL) 720 * 721 * 2. we are a new master, and there are only NL/CR locks left. 722 * (We could probably improve this by only invaliding in this way when 723 * the previous master left uncleanly. VMS docs mention that.) 724 * 725 * The LVB contents are only considered for changing when this is a new master 726 * of the rsb (NEW_MASTER2). Then, the rsb's lvb is taken from any lkb with 727 * mode > CR. If no lkb's exist with mode above CR, the lvb contents are taken 728 * from the lkb with the largest lvb sequence number. 729 */ 730 731 static void recover_lvb(struct dlm_rsb *r) 732 { 733 struct dlm_lkb *lkb, *high_lkb = NULL; 734 uint32_t high_seq = 0; 735 int lock_lvb_exists = 0; 736 int big_lock_exists = 0; 737 int lvblen = r->res_ls->ls_lvblen; 738 739 if (!rsb_flag(r, RSB_NEW_MASTER2) && 740 rsb_flag(r, RSB_RECOVER_LVB_INVAL)) { 741 /* case 1 above */ 742 rsb_set_flag(r, RSB_VALNOTVALID); 743 return; 744 } 745 746 if (!rsb_flag(r, RSB_NEW_MASTER2)) 747 return; 748 749 /* we are the new master, so figure out if VALNOTVALID should 750 be set, and set the rsb lvb from the best lkb available. */ 751 752 list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue) { 753 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK)) 754 continue; 755 756 lock_lvb_exists = 1; 757 758 if (lkb->lkb_grmode > DLM_LOCK_CR) { 759 big_lock_exists = 1; 760 goto setflag; 761 } 762 763 if (((int)lkb->lkb_lvbseq - (int)high_seq) >= 0) { 764 high_lkb = lkb; 765 high_seq = lkb->lkb_lvbseq; 766 } 767 } 768 769 list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue) { 770 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK)) 771 continue; 772 773 lock_lvb_exists = 1; 774 775 if (lkb->lkb_grmode > DLM_LOCK_CR) { 776 big_lock_exists = 1; 777 goto setflag; 778 } 779 780 if (((int)lkb->lkb_lvbseq - (int)high_seq) >= 0) { 781 high_lkb = lkb; 782 high_seq = lkb->lkb_lvbseq; 783 } 784 } 785 786 setflag: 787 if (!lock_lvb_exists) 788 goto out; 789 790 /* lvb is invalidated if only NL/CR locks remain */ 791 if (!big_lock_exists) 792 rsb_set_flag(r, RSB_VALNOTVALID); 793 794 if (!r->res_lvbptr) { 795 r->res_lvbptr = dlm_allocate_lvb(r->res_ls); 796 if (!r->res_lvbptr) 797 goto out; 798 } 799 800 if (big_lock_exists) { 801 r->res_lvbseq = lkb->lkb_lvbseq; 802 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, lvblen); 803 } else if (high_lkb) { 804 r->res_lvbseq = high_lkb->lkb_lvbseq; 805 memcpy(r->res_lvbptr, high_lkb->lkb_lvbptr, lvblen); 806 } else { 807 r->res_lvbseq = 0; 808 memset(r->res_lvbptr, 0, lvblen); 809 } 810 out: 811 return; 812 } 813 814 /* All master rsb's flagged RECOVER_CONVERT need to be looked at. The locks 815 converting PR->CW or CW->PR need to have their lkb_grmode set. */ 816 817 static void recover_conversion(struct dlm_rsb *r) 818 { 819 struct dlm_ls *ls = r->res_ls; 820 struct dlm_lkb *lkb; 821 int grmode = -1; 822 823 list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue) { 824 if (lkb->lkb_grmode == DLM_LOCK_PR || 825 lkb->lkb_grmode == DLM_LOCK_CW) { 826 grmode = lkb->lkb_grmode; 827 break; 828 } 829 } 830 831 list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue) { 832 if (lkb->lkb_grmode != DLM_LOCK_IV) 833 continue; 834 if (grmode == -1) { 835 log_debug(ls, "recover_conversion %x set gr to rq %d", 836 lkb->lkb_id, lkb->lkb_rqmode); 837 lkb->lkb_grmode = lkb->lkb_rqmode; 838 } else { 839 log_debug(ls, "recover_conversion %x set gr %d", 840 lkb->lkb_id, grmode); 841 lkb->lkb_grmode = grmode; 842 } 843 } 844 } 845 846 /* We've become the new master for this rsb and waiting/converting locks may 847 need to be granted in dlm_recover_grant() due to locks that may have 848 existed from a removed node. */ 849 850 static void recover_grant(struct dlm_rsb *r) 851 { 852 if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue)) 853 rsb_set_flag(r, RSB_RECOVER_GRANT); 854 } 855 856 void dlm_recover_rsbs(struct dlm_ls *ls) 857 { 858 struct dlm_rsb *r; 859 unsigned int count = 0; 860 861 down_read(&ls->ls_root_sem); 862 list_for_each_entry(r, &ls->ls_root_list, res_root_list) { 863 lock_rsb(r); 864 if (is_master(r)) { 865 if (rsb_flag(r, RSB_RECOVER_CONVERT)) 866 recover_conversion(r); 867 868 /* recover lvb before granting locks so the updated 869 lvb/VALNOTVALID is presented in the completion */ 870 recover_lvb(r); 871 872 if (rsb_flag(r, RSB_NEW_MASTER2)) 873 recover_grant(r); 874 count++; 875 } else { 876 rsb_clear_flag(r, RSB_VALNOTVALID); 877 } 878 rsb_clear_flag(r, RSB_RECOVER_CONVERT); 879 rsb_clear_flag(r, RSB_RECOVER_LVB_INVAL); 880 rsb_clear_flag(r, RSB_NEW_MASTER2); 881 unlock_rsb(r); 882 } 883 up_read(&ls->ls_root_sem); 884 885 if (count) 886 log_rinfo(ls, "dlm_recover_rsbs %d done", count); 887 } 888 889 /* Create a single list of all root rsb's to be used during recovery */ 890 891 int dlm_create_root_list(struct dlm_ls *ls) 892 { 893 struct rb_node *n; 894 struct dlm_rsb *r; 895 int i, error = 0; 896 897 down_write(&ls->ls_root_sem); 898 if (!list_empty(&ls->ls_root_list)) { 899 log_error(ls, "root list not empty"); 900 error = -EINVAL; 901 goto out; 902 } 903 904 for (i = 0; i < ls->ls_rsbtbl_size; i++) { 905 spin_lock(&ls->ls_rsbtbl[i].lock); 906 for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) { 907 r = rb_entry(n, struct dlm_rsb, res_hashnode); 908 list_add(&r->res_root_list, &ls->ls_root_list); 909 dlm_hold_rsb(r); 910 } 911 912 if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[i].toss)) 913 log_error(ls, "dlm_create_root_list toss not empty"); 914 spin_unlock(&ls->ls_rsbtbl[i].lock); 915 } 916 out: 917 up_write(&ls->ls_root_sem); 918 return error; 919 } 920 921 void dlm_release_root_list(struct dlm_ls *ls) 922 { 923 struct dlm_rsb *r, *safe; 924 925 down_write(&ls->ls_root_sem); 926 list_for_each_entry_safe(r, safe, &ls->ls_root_list, res_root_list) { 927 list_del_init(&r->res_root_list); 928 dlm_put_rsb(r); 929 } 930 up_write(&ls->ls_root_sem); 931 } 932 933 void dlm_clear_toss(struct dlm_ls *ls) 934 { 935 struct rb_node *n, *next; 936 struct dlm_rsb *r; 937 unsigned int count = 0; 938 int i; 939 940 for (i = 0; i < ls->ls_rsbtbl_size; i++) { 941 spin_lock(&ls->ls_rsbtbl[i].lock); 942 for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = next) { 943 next = rb_next(n); 944 r = rb_entry(n, struct dlm_rsb, res_hashnode); 945 rb_erase(n, &ls->ls_rsbtbl[i].toss); 946 dlm_free_rsb(r); 947 count++; 948 } 949 spin_unlock(&ls->ls_rsbtbl[i].lock); 950 } 951 952 if (count) 953 log_rinfo(ls, "dlm_clear_toss %u done", count); 954 } 955 956