1 // SPDX-License-Identifier: GPL-2.0-or-later 2 /* -*- mode: c; c-basic-offset: 8; -*- 3 * vim: noexpandtab sw=8 ts=8 sts=0: 4 * 5 * dlmrecovery.c 6 * 7 * recovery stuff 8 * 9 * Copyright (C) 2004 Oracle. All rights reserved. 10 */ 11 12 13 #include <linux/module.h> 14 #include <linux/fs.h> 15 #include <linux/types.h> 16 #include <linux/slab.h> 17 #include <linux/highmem.h> 18 #include <linux/init.h> 19 #include <linux/sysctl.h> 20 #include <linux/random.h> 21 #include <linux/blkdev.h> 22 #include <linux/socket.h> 23 #include <linux/inet.h> 24 #include <linux/timer.h> 25 #include <linux/kthread.h> 26 #include <linux/delay.h> 27 28 29 #include "../cluster/heartbeat.h" 30 #include "../cluster/nodemanager.h" 31 #include "../cluster/tcp.h" 32 33 #include "dlmapi.h" 34 #include "dlmcommon.h" 35 #include "dlmdomain.h" 36 37 #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_RECOVERY) 38 #include "../cluster/masklog.h" 39 40 static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node); 41 42 static int dlm_recovery_thread(void *data); 43 static int dlm_do_recovery(struct dlm_ctxt *dlm); 44 45 static int dlm_pick_recovery_master(struct dlm_ctxt *dlm); 46 static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node); 47 static int dlm_init_recovery_area(struct dlm_ctxt *dlm, u8 dead_node); 48 static int dlm_request_all_locks(struct dlm_ctxt *dlm, 49 u8 request_from, u8 dead_node); 50 static void dlm_destroy_recovery_area(struct dlm_ctxt *dlm); 51 52 static inline int dlm_num_locks_in_lockres(struct dlm_lock_resource *res); 53 static void dlm_init_migratable_lockres(struct dlm_migratable_lockres *mres, 54 const char *lockname, int namelen, 55 int total_locks, u64 cookie, 56 u8 flags, u8 master); 57 static int dlm_send_mig_lockres_msg(struct dlm_ctxt *dlm, 58 struct dlm_migratable_lockres *mres, 59 u8 send_to, 60 struct dlm_lock_resource *res, 61 int total_locks); 62 static int dlm_process_recovery_data(struct dlm_ctxt *dlm, 63 struct dlm_lock_resource *res, 64 struct dlm_migratable_lockres *mres); 65 static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm); 66 static int dlm_send_all_done_msg(struct dlm_ctxt *dlm, 67 u8 dead_node, u8 send_to); 68 static int dlm_send_begin_reco_message(struct dlm_ctxt *dlm, u8 dead_node); 69 static void dlm_move_reco_locks_to_list(struct dlm_ctxt *dlm, 70 struct list_head *list, u8 dead_node); 71 static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm, 72 u8 dead_node, u8 new_master); 73 static void dlm_reco_ast(void *astdata); 74 static void dlm_reco_bast(void *astdata, int blocked_type); 75 static void dlm_reco_unlock_ast(void *astdata, enum dlm_status st); 76 static void dlm_request_all_locks_worker(struct dlm_work_item *item, 77 void *data); 78 static void dlm_mig_lockres_worker(struct dlm_work_item *item, void *data); 79 static int dlm_lockres_master_requery(struct dlm_ctxt *dlm, 80 struct dlm_lock_resource *res, 81 u8 *real_master); 82 83 static u64 dlm_get_next_mig_cookie(void); 84 85 static DEFINE_SPINLOCK(dlm_reco_state_lock); 86 static DEFINE_SPINLOCK(dlm_mig_cookie_lock); 87 static u64 dlm_mig_cookie = 1; 88 89 static u64 dlm_get_next_mig_cookie(void) 90 { 91 u64 c; 92 spin_lock(&dlm_mig_cookie_lock); 93 c = dlm_mig_cookie; 94 if (dlm_mig_cookie == (~0ULL)) 95 dlm_mig_cookie = 1; 96 else 97 dlm_mig_cookie++; 98 spin_unlock(&dlm_mig_cookie_lock); 99 return c; 100 } 101 102 static inline void dlm_set_reco_dead_node(struct dlm_ctxt *dlm, 103 u8 dead_node) 104 { 105 assert_spin_locked(&dlm->spinlock); 106 if (dlm->reco.dead_node != dead_node) 107 mlog(0, "%s: changing dead_node from %u to %u\n", 108 dlm->name, dlm->reco.dead_node, dead_node); 109 dlm->reco.dead_node = dead_node; 110 } 111 112 static inline void dlm_set_reco_master(struct dlm_ctxt *dlm, 113 u8 master) 114 { 115 assert_spin_locked(&dlm->spinlock); 116 mlog(0, "%s: changing new_master from %u to %u\n", 117 dlm->name, dlm->reco.new_master, master); 118 dlm->reco.new_master = master; 119 } 120 121 static inline void __dlm_reset_recovery(struct dlm_ctxt *dlm) 122 { 123 assert_spin_locked(&dlm->spinlock); 124 clear_bit(dlm->reco.dead_node, dlm->recovery_map); 125 dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM); 126 dlm_set_reco_master(dlm, O2NM_INVALID_NODE_NUM); 127 } 128 129 static inline void dlm_reset_recovery(struct dlm_ctxt *dlm) 130 { 131 spin_lock(&dlm->spinlock); 132 __dlm_reset_recovery(dlm); 133 spin_unlock(&dlm->spinlock); 134 } 135 136 /* Worker function used during recovery. */ 137 void dlm_dispatch_work(struct work_struct *work) 138 { 139 struct dlm_ctxt *dlm = 140 container_of(work, struct dlm_ctxt, dispatched_work); 141 LIST_HEAD(tmp_list); 142 struct dlm_work_item *item, *next; 143 dlm_workfunc_t *workfunc; 144 int tot=0; 145 146 spin_lock(&dlm->work_lock); 147 list_splice_init(&dlm->work_list, &tmp_list); 148 spin_unlock(&dlm->work_lock); 149 150 list_for_each_entry(item, &tmp_list, list) { 151 tot++; 152 } 153 mlog(0, "%s: work thread has %d work items\n", dlm->name, tot); 154 155 list_for_each_entry_safe(item, next, &tmp_list, list) { 156 workfunc = item->func; 157 list_del_init(&item->list); 158 159 /* already have ref on dlm to avoid having 160 * it disappear. just double-check. */ 161 BUG_ON(item->dlm != dlm); 162 163 /* this is allowed to sleep and 164 * call network stuff */ 165 workfunc(item, item->data); 166 167 dlm_put(dlm); 168 kfree(item); 169 } 170 } 171 172 /* 173 * RECOVERY THREAD 174 */ 175 176 void dlm_kick_recovery_thread(struct dlm_ctxt *dlm) 177 { 178 /* wake the recovery thread 179 * this will wake the reco thread in one of three places 180 * 1) sleeping with no recovery happening 181 * 2) sleeping with recovery mastered elsewhere 182 * 3) recovery mastered here, waiting on reco data */ 183 184 wake_up(&dlm->dlm_reco_thread_wq); 185 } 186 187 /* Launch the recovery thread */ 188 int dlm_launch_recovery_thread(struct dlm_ctxt *dlm) 189 { 190 mlog(0, "starting dlm recovery thread...\n"); 191 192 dlm->dlm_reco_thread_task = kthread_run(dlm_recovery_thread, dlm, 193 "dlm_reco-%s", dlm->name); 194 if (IS_ERR(dlm->dlm_reco_thread_task)) { 195 mlog_errno(PTR_ERR(dlm->dlm_reco_thread_task)); 196 dlm->dlm_reco_thread_task = NULL; 197 return -EINVAL; 198 } 199 200 return 0; 201 } 202 203 void dlm_complete_recovery_thread(struct dlm_ctxt *dlm) 204 { 205 if (dlm->dlm_reco_thread_task) { 206 mlog(0, "waiting for dlm recovery thread to exit\n"); 207 kthread_stop(dlm->dlm_reco_thread_task); 208 dlm->dlm_reco_thread_task = NULL; 209 } 210 } 211 212 213 214 /* 215 * this is lame, but here's how recovery works... 216 * 1) all recovery threads cluster wide will work on recovering 217 * ONE node at a time 218 * 2) negotiate who will take over all the locks for the dead node. 219 * thats right... ALL the locks. 220 * 3) once a new master is chosen, everyone scans all locks 221 * and moves aside those mastered by the dead guy 222 * 4) each of these locks should be locked until recovery is done 223 * 5) the new master collects up all of secondary lock queue info 224 * one lock at a time, forcing each node to communicate back 225 * before continuing 226 * 6) each secondary lock queue responds with the full known lock info 227 * 7) once the new master has run all its locks, it sends a ALLDONE! 228 * message to everyone 229 * 8) upon receiving this message, the secondary queue node unlocks 230 * and responds to the ALLDONE 231 * 9) once the new master gets responses from everyone, he unlocks 232 * everything and recovery for this dead node is done 233 *10) go back to 2) while there are still dead nodes 234 * 235 */ 236 237 static void dlm_print_reco_node_status(struct dlm_ctxt *dlm) 238 { 239 struct dlm_reco_node_data *ndata; 240 struct dlm_lock_resource *res; 241 242 mlog(ML_NOTICE, "%s(%d): recovery info, state=%s, dead=%u, master=%u\n", 243 dlm->name, task_pid_nr(dlm->dlm_reco_thread_task), 244 dlm->reco.state & DLM_RECO_STATE_ACTIVE ? "ACTIVE" : "inactive", 245 dlm->reco.dead_node, dlm->reco.new_master); 246 247 list_for_each_entry(ndata, &dlm->reco.node_data, list) { 248 char *st = "unknown"; 249 switch (ndata->state) { 250 case DLM_RECO_NODE_DATA_INIT: 251 st = "init"; 252 break; 253 case DLM_RECO_NODE_DATA_REQUESTING: 254 st = "requesting"; 255 break; 256 case DLM_RECO_NODE_DATA_DEAD: 257 st = "dead"; 258 break; 259 case DLM_RECO_NODE_DATA_RECEIVING: 260 st = "receiving"; 261 break; 262 case DLM_RECO_NODE_DATA_REQUESTED: 263 st = "requested"; 264 break; 265 case DLM_RECO_NODE_DATA_DONE: 266 st = "done"; 267 break; 268 case DLM_RECO_NODE_DATA_FINALIZE_SENT: 269 st = "finalize-sent"; 270 break; 271 default: 272 st = "bad"; 273 break; 274 } 275 mlog(ML_NOTICE, "%s: reco state, node %u, state=%s\n", 276 dlm->name, ndata->node_num, st); 277 } 278 list_for_each_entry(res, &dlm->reco.resources, recovering) { 279 mlog(ML_NOTICE, "%s: lockres %.*s on recovering list\n", 280 dlm->name, res->lockname.len, res->lockname.name); 281 } 282 } 283 284 #define DLM_RECO_THREAD_TIMEOUT_MS (5 * 1000) 285 286 static int dlm_recovery_thread(void *data) 287 { 288 int status; 289 struct dlm_ctxt *dlm = data; 290 unsigned long timeout = msecs_to_jiffies(DLM_RECO_THREAD_TIMEOUT_MS); 291 292 mlog(0, "dlm thread running for %s...\n", dlm->name); 293 294 while (!kthread_should_stop()) { 295 if (dlm_domain_fully_joined(dlm)) { 296 status = dlm_do_recovery(dlm); 297 if (status == -EAGAIN) { 298 /* do not sleep, recheck immediately. */ 299 continue; 300 } 301 if (status < 0) 302 mlog_errno(status); 303 } 304 305 wait_event_interruptible_timeout(dlm->dlm_reco_thread_wq, 306 kthread_should_stop(), 307 timeout); 308 } 309 310 mlog(0, "quitting DLM recovery thread\n"); 311 return 0; 312 } 313 314 /* returns true when the recovery master has contacted us */ 315 static int dlm_reco_master_ready(struct dlm_ctxt *dlm) 316 { 317 int ready; 318 spin_lock(&dlm->spinlock); 319 ready = (dlm->reco.new_master != O2NM_INVALID_NODE_NUM); 320 spin_unlock(&dlm->spinlock); 321 return ready; 322 } 323 324 /* returns true if node is no longer in the domain 325 * could be dead or just not joined */ 326 int dlm_is_node_dead(struct dlm_ctxt *dlm, u8 node) 327 { 328 int dead; 329 spin_lock(&dlm->spinlock); 330 dead = !test_bit(node, dlm->domain_map); 331 spin_unlock(&dlm->spinlock); 332 return dead; 333 } 334 335 /* returns true if node is no longer in the domain 336 * could be dead or just not joined */ 337 static int dlm_is_node_recovered(struct dlm_ctxt *dlm, u8 node) 338 { 339 int recovered; 340 spin_lock(&dlm->spinlock); 341 recovered = !test_bit(node, dlm->recovery_map); 342 spin_unlock(&dlm->spinlock); 343 return recovered; 344 } 345 346 347 void dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout) 348 { 349 if (dlm_is_node_dead(dlm, node)) 350 return; 351 352 printk(KERN_NOTICE "o2dlm: Waiting on the death of node %u in " 353 "domain %s\n", node, dlm->name); 354 355 if (timeout) 356 wait_event_timeout(dlm->dlm_reco_thread_wq, 357 dlm_is_node_dead(dlm, node), 358 msecs_to_jiffies(timeout)); 359 else 360 wait_event(dlm->dlm_reco_thread_wq, 361 dlm_is_node_dead(dlm, node)); 362 } 363 364 void dlm_wait_for_node_recovery(struct dlm_ctxt *dlm, u8 node, int timeout) 365 { 366 if (dlm_is_node_recovered(dlm, node)) 367 return; 368 369 printk(KERN_NOTICE "o2dlm: Waiting on the recovery of node %u in " 370 "domain %s\n", node, dlm->name); 371 372 if (timeout) 373 wait_event_timeout(dlm->dlm_reco_thread_wq, 374 dlm_is_node_recovered(dlm, node), 375 msecs_to_jiffies(timeout)); 376 else 377 wait_event(dlm->dlm_reco_thread_wq, 378 dlm_is_node_recovered(dlm, node)); 379 } 380 381 /* callers of the top-level api calls (dlmlock/dlmunlock) should 382 * block on the dlm->reco.event when recovery is in progress. 383 * the dlm recovery thread will set this state when it begins 384 * recovering a dead node (as the new master or not) and clear 385 * the state and wake as soon as all affected lock resources have 386 * been marked with the RECOVERY flag */ 387 static int dlm_in_recovery(struct dlm_ctxt *dlm) 388 { 389 int in_recovery; 390 spin_lock(&dlm->spinlock); 391 in_recovery = !!(dlm->reco.state & DLM_RECO_STATE_ACTIVE); 392 spin_unlock(&dlm->spinlock); 393 return in_recovery; 394 } 395 396 397 void dlm_wait_for_recovery(struct dlm_ctxt *dlm) 398 { 399 if (dlm_in_recovery(dlm)) { 400 mlog(0, "%s: reco thread %d in recovery: " 401 "state=%d, master=%u, dead=%u\n", 402 dlm->name, task_pid_nr(dlm->dlm_reco_thread_task), 403 dlm->reco.state, dlm->reco.new_master, 404 dlm->reco.dead_node); 405 } 406 wait_event(dlm->reco.event, !dlm_in_recovery(dlm)); 407 } 408 409 static void dlm_begin_recovery(struct dlm_ctxt *dlm) 410 { 411 assert_spin_locked(&dlm->spinlock); 412 BUG_ON(dlm->reco.state & DLM_RECO_STATE_ACTIVE); 413 printk(KERN_NOTICE "o2dlm: Begin recovery on domain %s for node %u\n", 414 dlm->name, dlm->reco.dead_node); 415 dlm->reco.state |= DLM_RECO_STATE_ACTIVE; 416 } 417 418 static void dlm_end_recovery(struct dlm_ctxt *dlm) 419 { 420 spin_lock(&dlm->spinlock); 421 BUG_ON(!(dlm->reco.state & DLM_RECO_STATE_ACTIVE)); 422 dlm->reco.state &= ~DLM_RECO_STATE_ACTIVE; 423 spin_unlock(&dlm->spinlock); 424 printk(KERN_NOTICE "o2dlm: End recovery on domain %s\n", dlm->name); 425 wake_up(&dlm->reco.event); 426 } 427 428 static void dlm_print_recovery_master(struct dlm_ctxt *dlm) 429 { 430 printk(KERN_NOTICE "o2dlm: Node %u (%s) is the Recovery Master for the " 431 "dead node %u in domain %s\n", dlm->reco.new_master, 432 (dlm->node_num == dlm->reco.new_master ? "me" : "he"), 433 dlm->reco.dead_node, dlm->name); 434 } 435 436 static int dlm_do_recovery(struct dlm_ctxt *dlm) 437 { 438 int status = 0; 439 int ret; 440 441 spin_lock(&dlm->spinlock); 442 443 if (dlm->migrate_done) { 444 mlog(0, "%s: no need do recovery after migrating all " 445 "lock resources\n", dlm->name); 446 spin_unlock(&dlm->spinlock); 447 return 0; 448 } 449 450 /* check to see if the new master has died */ 451 if (dlm->reco.new_master != O2NM_INVALID_NODE_NUM && 452 test_bit(dlm->reco.new_master, dlm->recovery_map)) { 453 mlog(0, "new master %u died while recovering %u!\n", 454 dlm->reco.new_master, dlm->reco.dead_node); 455 /* unset the new_master, leave dead_node */ 456 dlm_set_reco_master(dlm, O2NM_INVALID_NODE_NUM); 457 } 458 459 /* select a target to recover */ 460 if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) { 461 int bit; 462 463 bit = find_next_bit (dlm->recovery_map, O2NM_MAX_NODES, 0); 464 if (bit >= O2NM_MAX_NODES || bit < 0) 465 dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM); 466 else 467 dlm_set_reco_dead_node(dlm, bit); 468 } else if (!test_bit(dlm->reco.dead_node, dlm->recovery_map)) { 469 /* BUG? */ 470 mlog(ML_ERROR, "dead_node %u no longer in recovery map!\n", 471 dlm->reco.dead_node); 472 dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM); 473 } 474 475 if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) { 476 // mlog(0, "nothing to recover! sleeping now!\n"); 477 spin_unlock(&dlm->spinlock); 478 /* return to main thread loop and sleep. */ 479 return 0; 480 } 481 mlog(0, "%s(%d):recovery thread found node %u in the recovery map!\n", 482 dlm->name, task_pid_nr(dlm->dlm_reco_thread_task), 483 dlm->reco.dead_node); 484 485 /* take write barrier */ 486 /* (stops the list reshuffling thread, proxy ast handling) */ 487 dlm_begin_recovery(dlm); 488 489 spin_unlock(&dlm->spinlock); 490 491 if (dlm->reco.new_master == dlm->node_num) 492 goto master_here; 493 494 if (dlm->reco.new_master == O2NM_INVALID_NODE_NUM) { 495 /* choose a new master, returns 0 if this node 496 * is the master, -EEXIST if it's another node. 497 * this does not return until a new master is chosen 498 * or recovery completes entirely. */ 499 ret = dlm_pick_recovery_master(dlm); 500 if (!ret) { 501 /* already notified everyone. go. */ 502 goto master_here; 503 } 504 mlog(0, "another node will master this recovery session.\n"); 505 } 506 507 dlm_print_recovery_master(dlm); 508 509 /* it is safe to start everything back up here 510 * because all of the dead node's lock resources 511 * have been marked as in-recovery */ 512 dlm_end_recovery(dlm); 513 514 /* sleep out in main dlm_recovery_thread loop. */ 515 return 0; 516 517 master_here: 518 dlm_print_recovery_master(dlm); 519 520 status = dlm_remaster_locks(dlm, dlm->reco.dead_node); 521 if (status < 0) { 522 /* we should never hit this anymore */ 523 mlog(ML_ERROR, "%s: Error %d remastering locks for node %u, " 524 "retrying.\n", dlm->name, status, dlm->reco.dead_node); 525 /* yield a bit to allow any final network messages 526 * to get handled on remaining nodes */ 527 msleep(100); 528 } else { 529 /* success! see if any other nodes need recovery */ 530 mlog(0, "DONE mastering recovery of %s:%u here(this=%u)!\n", 531 dlm->name, dlm->reco.dead_node, dlm->node_num); 532 spin_lock(&dlm->spinlock); 533 __dlm_reset_recovery(dlm); 534 dlm->reco.state &= ~DLM_RECO_STATE_FINALIZE; 535 spin_unlock(&dlm->spinlock); 536 } 537 dlm_end_recovery(dlm); 538 539 /* continue and look for another dead node */ 540 return -EAGAIN; 541 } 542 543 static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node) 544 { 545 int status = 0; 546 struct dlm_reco_node_data *ndata; 547 int all_nodes_done; 548 int destroy = 0; 549 int pass = 0; 550 551 do { 552 /* we have become recovery master. there is no escaping 553 * this, so just keep trying until we get it. */ 554 status = dlm_init_recovery_area(dlm, dead_node); 555 if (status < 0) { 556 mlog(ML_ERROR, "%s: failed to alloc recovery area, " 557 "retrying\n", dlm->name); 558 msleep(1000); 559 } 560 } while (status != 0); 561 562 /* safe to access the node data list without a lock, since this 563 * process is the only one to change the list */ 564 list_for_each_entry(ndata, &dlm->reco.node_data, list) { 565 BUG_ON(ndata->state != DLM_RECO_NODE_DATA_INIT); 566 ndata->state = DLM_RECO_NODE_DATA_REQUESTING; 567 568 mlog(0, "%s: Requesting lock info from node %u\n", dlm->name, 569 ndata->node_num); 570 571 if (ndata->node_num == dlm->node_num) { 572 ndata->state = DLM_RECO_NODE_DATA_DONE; 573 continue; 574 } 575 576 do { 577 status = dlm_request_all_locks(dlm, ndata->node_num, 578 dead_node); 579 if (status < 0) { 580 mlog_errno(status); 581 if (dlm_is_host_down(status)) { 582 /* node died, ignore it for recovery */ 583 status = 0; 584 ndata->state = DLM_RECO_NODE_DATA_DEAD; 585 /* wait for the domain map to catch up 586 * with the network state. */ 587 wait_event_timeout(dlm->dlm_reco_thread_wq, 588 dlm_is_node_dead(dlm, 589 ndata->node_num), 590 msecs_to_jiffies(1000)); 591 mlog(0, "waited 1 sec for %u, " 592 "dead? %s\n", ndata->node_num, 593 dlm_is_node_dead(dlm, ndata->node_num) ? 594 "yes" : "no"); 595 } else { 596 /* -ENOMEM on the other node */ 597 mlog(0, "%s: node %u returned " 598 "%d during recovery, retrying " 599 "after a short wait\n", 600 dlm->name, ndata->node_num, 601 status); 602 msleep(100); 603 } 604 } 605 } while (status != 0); 606 607 spin_lock(&dlm_reco_state_lock); 608 switch (ndata->state) { 609 case DLM_RECO_NODE_DATA_INIT: 610 case DLM_RECO_NODE_DATA_FINALIZE_SENT: 611 case DLM_RECO_NODE_DATA_REQUESTED: 612 BUG(); 613 break; 614 case DLM_RECO_NODE_DATA_DEAD: 615 mlog(0, "node %u died after requesting " 616 "recovery info for node %u\n", 617 ndata->node_num, dead_node); 618 /* fine. don't need this node's info. 619 * continue without it. */ 620 break; 621 case DLM_RECO_NODE_DATA_REQUESTING: 622 ndata->state = DLM_RECO_NODE_DATA_REQUESTED; 623 mlog(0, "now receiving recovery data from " 624 "node %u for dead node %u\n", 625 ndata->node_num, dead_node); 626 break; 627 case DLM_RECO_NODE_DATA_RECEIVING: 628 mlog(0, "already receiving recovery data from " 629 "node %u for dead node %u\n", 630 ndata->node_num, dead_node); 631 break; 632 case DLM_RECO_NODE_DATA_DONE: 633 mlog(0, "already DONE receiving recovery data " 634 "from node %u for dead node %u\n", 635 ndata->node_num, dead_node); 636 break; 637 } 638 spin_unlock(&dlm_reco_state_lock); 639 } 640 641 mlog(0, "%s: Done requesting all lock info\n", dlm->name); 642 643 /* nodes should be sending reco data now 644 * just need to wait */ 645 646 while (1) { 647 /* check all the nodes now to see if we are 648 * done, or if anyone died */ 649 all_nodes_done = 1; 650 spin_lock(&dlm_reco_state_lock); 651 list_for_each_entry(ndata, &dlm->reco.node_data, list) { 652 mlog(0, "checking recovery state of node %u\n", 653 ndata->node_num); 654 switch (ndata->state) { 655 case DLM_RECO_NODE_DATA_INIT: 656 case DLM_RECO_NODE_DATA_REQUESTING: 657 mlog(ML_ERROR, "bad ndata state for " 658 "node %u: state=%d\n", 659 ndata->node_num, ndata->state); 660 BUG(); 661 break; 662 case DLM_RECO_NODE_DATA_DEAD: 663 mlog(0, "node %u died after " 664 "requesting recovery info for " 665 "node %u\n", ndata->node_num, 666 dead_node); 667 break; 668 case DLM_RECO_NODE_DATA_RECEIVING: 669 case DLM_RECO_NODE_DATA_REQUESTED: 670 mlog(0, "%s: node %u still in state %s\n", 671 dlm->name, ndata->node_num, 672 ndata->state==DLM_RECO_NODE_DATA_RECEIVING ? 673 "receiving" : "requested"); 674 all_nodes_done = 0; 675 break; 676 case DLM_RECO_NODE_DATA_DONE: 677 mlog(0, "%s: node %u state is done\n", 678 dlm->name, ndata->node_num); 679 break; 680 case DLM_RECO_NODE_DATA_FINALIZE_SENT: 681 mlog(0, "%s: node %u state is finalize\n", 682 dlm->name, ndata->node_num); 683 break; 684 } 685 } 686 spin_unlock(&dlm_reco_state_lock); 687 688 mlog(0, "pass #%d, all_nodes_done?: %s\n", ++pass, 689 all_nodes_done?"yes":"no"); 690 if (all_nodes_done) { 691 int ret; 692 693 /* Set this flag on recovery master to avoid 694 * a new recovery for another dead node start 695 * before the recovery is not done. That may 696 * cause recovery hung.*/ 697 spin_lock(&dlm->spinlock); 698 dlm->reco.state |= DLM_RECO_STATE_FINALIZE; 699 spin_unlock(&dlm->spinlock); 700 701 /* all nodes are now in DLM_RECO_NODE_DATA_DONE state 702 * just send a finalize message to everyone and 703 * clean up */ 704 mlog(0, "all nodes are done! send finalize\n"); 705 ret = dlm_send_finalize_reco_message(dlm); 706 if (ret < 0) 707 mlog_errno(ret); 708 709 spin_lock(&dlm->spinlock); 710 dlm_finish_local_lockres_recovery(dlm, dead_node, 711 dlm->node_num); 712 spin_unlock(&dlm->spinlock); 713 mlog(0, "should be done with recovery!\n"); 714 715 mlog(0, "finishing recovery of %s at %lu, " 716 "dead=%u, this=%u, new=%u\n", dlm->name, 717 jiffies, dlm->reco.dead_node, 718 dlm->node_num, dlm->reco.new_master); 719 destroy = 1; 720 status = 0; 721 /* rescan everything marked dirty along the way */ 722 dlm_kick_thread(dlm, NULL); 723 break; 724 } 725 /* wait to be signalled, with periodic timeout 726 * to check for node death */ 727 wait_event_interruptible_timeout(dlm->dlm_reco_thread_wq, 728 kthread_should_stop(), 729 msecs_to_jiffies(DLM_RECO_THREAD_TIMEOUT_MS)); 730 731 } 732 733 if (destroy) 734 dlm_destroy_recovery_area(dlm); 735 736 return status; 737 } 738 739 static int dlm_init_recovery_area(struct dlm_ctxt *dlm, u8 dead_node) 740 { 741 int num=0; 742 struct dlm_reco_node_data *ndata; 743 744 spin_lock(&dlm->spinlock); 745 memcpy(dlm->reco.node_map, dlm->domain_map, sizeof(dlm->domain_map)); 746 /* nodes can only be removed (by dying) after dropping 747 * this lock, and death will be trapped later, so this should do */ 748 spin_unlock(&dlm->spinlock); 749 750 while (1) { 751 num = find_next_bit (dlm->reco.node_map, O2NM_MAX_NODES, num); 752 if (num >= O2NM_MAX_NODES) { 753 break; 754 } 755 BUG_ON(num == dead_node); 756 757 ndata = kzalloc(sizeof(*ndata), GFP_NOFS); 758 if (!ndata) { 759 dlm_destroy_recovery_area(dlm); 760 return -ENOMEM; 761 } 762 ndata->node_num = num; 763 ndata->state = DLM_RECO_NODE_DATA_INIT; 764 spin_lock(&dlm_reco_state_lock); 765 list_add_tail(&ndata->list, &dlm->reco.node_data); 766 spin_unlock(&dlm_reco_state_lock); 767 num++; 768 } 769 770 return 0; 771 } 772 773 static void dlm_destroy_recovery_area(struct dlm_ctxt *dlm) 774 { 775 struct dlm_reco_node_data *ndata, *next; 776 LIST_HEAD(tmplist); 777 778 spin_lock(&dlm_reco_state_lock); 779 list_splice_init(&dlm->reco.node_data, &tmplist); 780 spin_unlock(&dlm_reco_state_lock); 781 782 list_for_each_entry_safe(ndata, next, &tmplist, list) { 783 list_del_init(&ndata->list); 784 kfree(ndata); 785 } 786 } 787 788 static int dlm_request_all_locks(struct dlm_ctxt *dlm, u8 request_from, 789 u8 dead_node) 790 { 791 struct dlm_lock_request lr; 792 int ret; 793 int status; 794 795 mlog(0, "\n"); 796 797 798 mlog(0, "dlm_request_all_locks: dead node is %u, sending request " 799 "to %u\n", dead_node, request_from); 800 801 memset(&lr, 0, sizeof(lr)); 802 lr.node_idx = dlm->node_num; 803 lr.dead_node = dead_node; 804 805 // send message 806 ret = o2net_send_message(DLM_LOCK_REQUEST_MSG, dlm->key, 807 &lr, sizeof(lr), request_from, &status); 808 809 /* negative status is handled by caller */ 810 if (ret < 0) 811 mlog(ML_ERROR, "%s: Error %d send LOCK_REQUEST to node %u " 812 "to recover dead node %u\n", dlm->name, ret, 813 request_from, dead_node); 814 else 815 ret = status; 816 // return from here, then 817 // sleep until all received or error 818 return ret; 819 820 } 821 822 int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data, 823 void **ret_data) 824 { 825 struct dlm_ctxt *dlm = data; 826 struct dlm_lock_request *lr = (struct dlm_lock_request *)msg->buf; 827 char *buf = NULL; 828 struct dlm_work_item *item = NULL; 829 830 if (!dlm_grab(dlm)) 831 return -EINVAL; 832 833 if (lr->dead_node != dlm->reco.dead_node) { 834 mlog(ML_ERROR, "%s: node %u sent dead_node=%u, but local " 835 "dead_node is %u\n", dlm->name, lr->node_idx, 836 lr->dead_node, dlm->reco.dead_node); 837 dlm_print_reco_node_status(dlm); 838 /* this is a hack */ 839 dlm_put(dlm); 840 return -ENOMEM; 841 } 842 BUG_ON(lr->dead_node != dlm->reco.dead_node); 843 844 item = kzalloc(sizeof(*item), GFP_NOFS); 845 if (!item) { 846 dlm_put(dlm); 847 return -ENOMEM; 848 } 849 850 /* this will get freed by dlm_request_all_locks_worker */ 851 buf = (char *) __get_free_page(GFP_NOFS); 852 if (!buf) { 853 kfree(item); 854 dlm_put(dlm); 855 return -ENOMEM; 856 } 857 858 /* queue up work for dlm_request_all_locks_worker */ 859 dlm_grab(dlm); /* get an extra ref for the work item */ 860 dlm_init_work_item(dlm, item, dlm_request_all_locks_worker, buf); 861 item->u.ral.reco_master = lr->node_idx; 862 item->u.ral.dead_node = lr->dead_node; 863 spin_lock(&dlm->work_lock); 864 list_add_tail(&item->list, &dlm->work_list); 865 spin_unlock(&dlm->work_lock); 866 queue_work(dlm->dlm_worker, &dlm->dispatched_work); 867 868 dlm_put(dlm); 869 return 0; 870 } 871 872 static void dlm_request_all_locks_worker(struct dlm_work_item *item, void *data) 873 { 874 struct dlm_migratable_lockres *mres; 875 struct dlm_lock_resource *res; 876 struct dlm_ctxt *dlm; 877 LIST_HEAD(resources); 878 int ret; 879 u8 dead_node, reco_master; 880 int skip_all_done = 0; 881 882 dlm = item->dlm; 883 dead_node = item->u.ral.dead_node; 884 reco_master = item->u.ral.reco_master; 885 mres = (struct dlm_migratable_lockres *)data; 886 887 mlog(0, "%s: recovery worker started, dead=%u, master=%u\n", 888 dlm->name, dead_node, reco_master); 889 890 if (dead_node != dlm->reco.dead_node || 891 reco_master != dlm->reco.new_master) { 892 /* worker could have been created before the recovery master 893 * died. if so, do not continue, but do not error. */ 894 if (dlm->reco.new_master == O2NM_INVALID_NODE_NUM) { 895 mlog(ML_NOTICE, "%s: will not send recovery state, " 896 "recovery master %u died, thread=(dead=%u,mas=%u)" 897 " current=(dead=%u,mas=%u)\n", dlm->name, 898 reco_master, dead_node, reco_master, 899 dlm->reco.dead_node, dlm->reco.new_master); 900 } else { 901 mlog(ML_NOTICE, "%s: reco state invalid: reco(dead=%u, " 902 "master=%u), request(dead=%u, master=%u)\n", 903 dlm->name, dlm->reco.dead_node, 904 dlm->reco.new_master, dead_node, reco_master); 905 } 906 goto leave; 907 } 908 909 /* lock resources should have already been moved to the 910 * dlm->reco.resources list. now move items from that list 911 * to a temp list if the dead owner matches. note that the 912 * whole cluster recovers only one node at a time, so we 913 * can safely move UNKNOWN lock resources for each recovery 914 * session. */ 915 dlm_move_reco_locks_to_list(dlm, &resources, dead_node); 916 917 /* now we can begin blasting lockreses without the dlm lock */ 918 919 /* any errors returned will be due to the new_master dying, 920 * the dlm_reco_thread should detect this */ 921 list_for_each_entry(res, &resources, recovering) { 922 ret = dlm_send_one_lockres(dlm, res, mres, reco_master, 923 DLM_MRES_RECOVERY); 924 if (ret < 0) { 925 mlog(ML_ERROR, "%s: node %u went down while sending " 926 "recovery state for dead node %u, ret=%d\n", dlm->name, 927 reco_master, dead_node, ret); 928 skip_all_done = 1; 929 break; 930 } 931 } 932 933 /* move the resources back to the list */ 934 spin_lock(&dlm->spinlock); 935 list_splice_init(&resources, &dlm->reco.resources); 936 spin_unlock(&dlm->spinlock); 937 938 if (!skip_all_done) { 939 ret = dlm_send_all_done_msg(dlm, dead_node, reco_master); 940 if (ret < 0) { 941 mlog(ML_ERROR, "%s: node %u went down while sending " 942 "recovery all-done for dead node %u, ret=%d\n", 943 dlm->name, reco_master, dead_node, ret); 944 } 945 } 946 leave: 947 free_page((unsigned long)data); 948 } 949 950 951 static int dlm_send_all_done_msg(struct dlm_ctxt *dlm, u8 dead_node, u8 send_to) 952 { 953 int ret, tmpret; 954 struct dlm_reco_data_done done_msg; 955 956 memset(&done_msg, 0, sizeof(done_msg)); 957 done_msg.node_idx = dlm->node_num; 958 done_msg.dead_node = dead_node; 959 mlog(0, "sending DATA DONE message to %u, " 960 "my node=%u, dead node=%u\n", send_to, done_msg.node_idx, 961 done_msg.dead_node); 962 963 ret = o2net_send_message(DLM_RECO_DATA_DONE_MSG, dlm->key, &done_msg, 964 sizeof(done_msg), send_to, &tmpret); 965 if (ret < 0) { 966 mlog(ML_ERROR, "%s: Error %d send RECO_DATA_DONE to node %u " 967 "to recover dead node %u\n", dlm->name, ret, send_to, 968 dead_node); 969 if (!dlm_is_host_down(ret)) { 970 BUG(); 971 } 972 } else 973 ret = tmpret; 974 return ret; 975 } 976 977 978 int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data, 979 void **ret_data) 980 { 981 struct dlm_ctxt *dlm = data; 982 struct dlm_reco_data_done *done = (struct dlm_reco_data_done *)msg->buf; 983 struct dlm_reco_node_data *ndata = NULL; 984 int ret = -EINVAL; 985 986 if (!dlm_grab(dlm)) 987 return -EINVAL; 988 989 mlog(0, "got DATA DONE: dead_node=%u, reco.dead_node=%u, " 990 "node_idx=%u, this node=%u\n", done->dead_node, 991 dlm->reco.dead_node, done->node_idx, dlm->node_num); 992 993 mlog_bug_on_msg((done->dead_node != dlm->reco.dead_node), 994 "Got DATA DONE: dead_node=%u, reco.dead_node=%u, " 995 "node_idx=%u, this node=%u\n", done->dead_node, 996 dlm->reco.dead_node, done->node_idx, dlm->node_num); 997 998 spin_lock(&dlm_reco_state_lock); 999 list_for_each_entry(ndata, &dlm->reco.node_data, list) { 1000 if (ndata->node_num != done->node_idx) 1001 continue; 1002 1003 switch (ndata->state) { 1004 /* should have moved beyond INIT but not to FINALIZE yet */ 1005 case DLM_RECO_NODE_DATA_INIT: 1006 case DLM_RECO_NODE_DATA_DEAD: 1007 case DLM_RECO_NODE_DATA_FINALIZE_SENT: 1008 mlog(ML_ERROR, "bad ndata state for node %u:" 1009 " state=%d\n", ndata->node_num, 1010 ndata->state); 1011 BUG(); 1012 break; 1013 /* these states are possible at this point, anywhere along 1014 * the line of recovery */ 1015 case DLM_RECO_NODE_DATA_DONE: 1016 case DLM_RECO_NODE_DATA_RECEIVING: 1017 case DLM_RECO_NODE_DATA_REQUESTED: 1018 case DLM_RECO_NODE_DATA_REQUESTING: 1019 mlog(0, "node %u is DONE sending " 1020 "recovery data!\n", 1021 ndata->node_num); 1022 1023 ndata->state = DLM_RECO_NODE_DATA_DONE; 1024 ret = 0; 1025 break; 1026 } 1027 } 1028 spin_unlock(&dlm_reco_state_lock); 1029 1030 /* wake the recovery thread, some node is done */ 1031 if (!ret) 1032 dlm_kick_recovery_thread(dlm); 1033 1034 if (ret < 0) 1035 mlog(ML_ERROR, "failed to find recovery node data for node " 1036 "%u\n", done->node_idx); 1037 dlm_put(dlm); 1038 1039 mlog(0, "leaving reco data done handler, ret=%d\n", ret); 1040 return ret; 1041 } 1042 1043 static void dlm_move_reco_locks_to_list(struct dlm_ctxt *dlm, 1044 struct list_head *list, 1045 u8 dead_node) 1046 { 1047 struct dlm_lock_resource *res, *next; 1048 struct dlm_lock *lock; 1049 1050 spin_lock(&dlm->spinlock); 1051 list_for_each_entry_safe(res, next, &dlm->reco.resources, recovering) { 1052 /* always prune any $RECOVERY entries for dead nodes, 1053 * otherwise hangs can occur during later recovery */ 1054 if (dlm_is_recovery_lock(res->lockname.name, 1055 res->lockname.len)) { 1056 spin_lock(&res->spinlock); 1057 list_for_each_entry(lock, &res->granted, list) { 1058 if (lock->ml.node == dead_node) { 1059 mlog(0, "AHA! there was " 1060 "a $RECOVERY lock for dead " 1061 "node %u (%s)!\n", 1062 dead_node, dlm->name); 1063 list_del_init(&lock->list); 1064 dlm_lock_put(lock); 1065 /* Can't schedule DLM_UNLOCK_FREE_LOCK 1066 * - do manually */ 1067 dlm_lock_put(lock); 1068 break; 1069 } 1070 } 1071 spin_unlock(&res->spinlock); 1072 continue; 1073 } 1074 1075 if (res->owner == dead_node) { 1076 mlog(0, "found lockres owned by dead node while " 1077 "doing recovery for node %u. sending it.\n", 1078 dead_node); 1079 list_move_tail(&res->recovering, list); 1080 } else if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) { 1081 mlog(0, "found UNKNOWN owner while doing recovery " 1082 "for node %u. sending it.\n", dead_node); 1083 list_move_tail(&res->recovering, list); 1084 } 1085 } 1086 spin_unlock(&dlm->spinlock); 1087 } 1088 1089 static inline int dlm_num_locks_in_lockres(struct dlm_lock_resource *res) 1090 { 1091 int total_locks = 0; 1092 struct list_head *iter, *queue = &res->granted; 1093 int i; 1094 1095 for (i=0; i<3; i++) { 1096 list_for_each(iter, queue) 1097 total_locks++; 1098 queue++; 1099 } 1100 return total_locks; 1101 } 1102 1103 1104 static int dlm_send_mig_lockres_msg(struct dlm_ctxt *dlm, 1105 struct dlm_migratable_lockres *mres, 1106 u8 send_to, 1107 struct dlm_lock_resource *res, 1108 int total_locks) 1109 { 1110 u64 mig_cookie = be64_to_cpu(mres->mig_cookie); 1111 int mres_total_locks = be32_to_cpu(mres->total_locks); 1112 int ret = 0, status = 0; 1113 u8 orig_flags = mres->flags, 1114 orig_master = mres->master; 1115 1116 BUG_ON(mres->num_locks > DLM_MAX_MIGRATABLE_LOCKS); 1117 if (!mres->num_locks) 1118 return 0; 1119 1120 /* add an all-done flag if we reached the last lock */ 1121 orig_flags = mres->flags; 1122 BUG_ON(total_locks > mres_total_locks); 1123 if (total_locks == mres_total_locks) 1124 mres->flags |= DLM_MRES_ALL_DONE; 1125 1126 mlog(0, "%s:%.*s: sending mig lockres (%s) to %u\n", 1127 dlm->name, res->lockname.len, res->lockname.name, 1128 orig_flags & DLM_MRES_MIGRATION ? "migration" : "recovery", 1129 send_to); 1130 1131 /* send it */ 1132 ret = o2net_send_message(DLM_MIG_LOCKRES_MSG, dlm->key, mres, 1133 struct_size(mres, ml, mres->num_locks), 1134 send_to, &status); 1135 if (ret < 0) { 1136 /* XXX: negative status is not handled. 1137 * this will end up killing this node. */ 1138 mlog(ML_ERROR, "%s: res %.*s, Error %d send MIG_LOCKRES to " 1139 "node %u (%s)\n", dlm->name, mres->lockname_len, 1140 mres->lockname, ret, send_to, 1141 (orig_flags & DLM_MRES_MIGRATION ? 1142 "migration" : "recovery")); 1143 } else { 1144 /* might get an -ENOMEM back here */ 1145 ret = status; 1146 if (ret < 0) { 1147 mlog_errno(ret); 1148 1149 if (ret == -EFAULT) { 1150 mlog(ML_ERROR, "node %u told me to kill " 1151 "myself!\n", send_to); 1152 BUG(); 1153 } 1154 } 1155 } 1156 1157 /* zero and reinit the message buffer */ 1158 dlm_init_migratable_lockres(mres, res->lockname.name, 1159 res->lockname.len, mres_total_locks, 1160 mig_cookie, orig_flags, orig_master); 1161 return ret; 1162 } 1163 1164 static void dlm_init_migratable_lockres(struct dlm_migratable_lockres *mres, 1165 const char *lockname, int namelen, 1166 int total_locks, u64 cookie, 1167 u8 flags, u8 master) 1168 { 1169 /* mres here is one full page */ 1170 clear_page(mres); 1171 mres->lockname_len = namelen; 1172 memcpy(mres->lockname, lockname, namelen); 1173 mres->num_locks = 0; 1174 mres->total_locks = cpu_to_be32(total_locks); 1175 mres->mig_cookie = cpu_to_be64(cookie); 1176 mres->flags = flags; 1177 mres->master = master; 1178 } 1179 1180 static void dlm_prepare_lvb_for_migration(struct dlm_lock *lock, 1181 struct dlm_migratable_lockres *mres, 1182 int queue) 1183 { 1184 if (!lock->lksb) 1185 return; 1186 1187 /* Ignore lvb in all locks in the blocked list */ 1188 if (queue == DLM_BLOCKED_LIST) 1189 return; 1190 1191 /* Only consider lvbs in locks with granted EX or PR lock levels */ 1192 if (lock->ml.type != LKM_EXMODE && lock->ml.type != LKM_PRMODE) 1193 return; 1194 1195 if (dlm_lvb_is_empty(mres->lvb)) { 1196 memcpy(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN); 1197 return; 1198 } 1199 1200 /* Ensure the lvb copied for migration matches in other valid locks */ 1201 if (!memcmp(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN)) 1202 return; 1203 1204 mlog(ML_ERROR, "Mismatched lvb in lock cookie=%u:%llu, name=%.*s, " 1205 "node=%u\n", 1206 dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)), 1207 dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)), 1208 lock->lockres->lockname.len, lock->lockres->lockname.name, 1209 lock->ml.node); 1210 dlm_print_one_lock_resource(lock->lockres); 1211 BUG(); 1212 } 1213 1214 /* returns 1 if this lock fills the network structure, 1215 * 0 otherwise */ 1216 static int dlm_add_lock_to_array(struct dlm_lock *lock, 1217 struct dlm_migratable_lockres *mres, int queue) 1218 { 1219 struct dlm_migratable_lock *ml; 1220 int lock_num = mres->num_locks; 1221 1222 ml = &(mres->ml[lock_num]); 1223 ml->cookie = lock->ml.cookie; 1224 ml->type = lock->ml.type; 1225 ml->convert_type = lock->ml.convert_type; 1226 ml->highest_blocked = lock->ml.highest_blocked; 1227 ml->list = queue; 1228 if (lock->lksb) { 1229 ml->flags = lock->lksb->flags; 1230 dlm_prepare_lvb_for_migration(lock, mres, queue); 1231 } 1232 ml->node = lock->ml.node; 1233 mres->num_locks++; 1234 /* we reached the max, send this network message */ 1235 if (mres->num_locks == DLM_MAX_MIGRATABLE_LOCKS) 1236 return 1; 1237 return 0; 1238 } 1239 1240 static void dlm_add_dummy_lock(struct dlm_ctxt *dlm, 1241 struct dlm_migratable_lockres *mres) 1242 { 1243 struct dlm_lock dummy; 1244 memset(&dummy, 0, sizeof(dummy)); 1245 dummy.ml.cookie = 0; 1246 dummy.ml.type = LKM_IVMODE; 1247 dummy.ml.convert_type = LKM_IVMODE; 1248 dummy.ml.highest_blocked = LKM_IVMODE; 1249 dummy.lksb = NULL; 1250 dummy.ml.node = dlm->node_num; 1251 dlm_add_lock_to_array(&dummy, mres, DLM_BLOCKED_LIST); 1252 } 1253 1254 static inline int dlm_is_dummy_lock(struct dlm_ctxt *dlm, 1255 struct dlm_migratable_lock *ml, 1256 u8 *nodenum) 1257 { 1258 if (unlikely(ml->cookie == 0 && 1259 ml->type == LKM_IVMODE && 1260 ml->convert_type == LKM_IVMODE && 1261 ml->highest_blocked == LKM_IVMODE && 1262 ml->list == DLM_BLOCKED_LIST)) { 1263 *nodenum = ml->node; 1264 return 1; 1265 } 1266 return 0; 1267 } 1268 1269 int dlm_send_one_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, 1270 struct dlm_migratable_lockres *mres, 1271 u8 send_to, u8 flags) 1272 { 1273 struct list_head *queue; 1274 int total_locks, i; 1275 u64 mig_cookie = 0; 1276 struct dlm_lock *lock; 1277 int ret = 0; 1278 1279 BUG_ON(!(flags & (DLM_MRES_RECOVERY|DLM_MRES_MIGRATION))); 1280 1281 mlog(0, "sending to %u\n", send_to); 1282 1283 total_locks = dlm_num_locks_in_lockres(res); 1284 if (total_locks > DLM_MAX_MIGRATABLE_LOCKS) { 1285 /* rare, but possible */ 1286 mlog(0, "argh. lockres has %d locks. this will " 1287 "require more than one network packet to " 1288 "migrate\n", total_locks); 1289 mig_cookie = dlm_get_next_mig_cookie(); 1290 } 1291 1292 dlm_init_migratable_lockres(mres, res->lockname.name, 1293 res->lockname.len, total_locks, 1294 mig_cookie, flags, res->owner); 1295 1296 total_locks = 0; 1297 for (i=DLM_GRANTED_LIST; i<=DLM_BLOCKED_LIST; i++) { 1298 queue = dlm_list_idx_to_ptr(res, i); 1299 list_for_each_entry(lock, queue, list) { 1300 /* add another lock. */ 1301 total_locks++; 1302 if (!dlm_add_lock_to_array(lock, mres, i)) 1303 continue; 1304 1305 /* this filled the lock message, 1306 * we must send it immediately. */ 1307 ret = dlm_send_mig_lockres_msg(dlm, mres, send_to, 1308 res, total_locks); 1309 if (ret < 0) 1310 goto error; 1311 } 1312 } 1313 if (total_locks == 0) { 1314 /* send a dummy lock to indicate a mastery reference only */ 1315 mlog(0, "%s:%.*s: sending dummy lock to %u, %s\n", 1316 dlm->name, res->lockname.len, res->lockname.name, 1317 send_to, flags & DLM_MRES_RECOVERY ? "recovery" : 1318 "migration"); 1319 dlm_add_dummy_lock(dlm, mres); 1320 } 1321 /* flush any remaining locks */ 1322 ret = dlm_send_mig_lockres_msg(dlm, mres, send_to, res, total_locks); 1323 if (ret < 0) 1324 goto error; 1325 return ret; 1326 1327 error: 1328 mlog(ML_ERROR, "%s: dlm_send_mig_lockres_msg returned %d\n", 1329 dlm->name, ret); 1330 if (!dlm_is_host_down(ret)) 1331 BUG(); 1332 mlog(0, "%s: node %u went down while sending %s " 1333 "lockres %.*s\n", dlm->name, send_to, 1334 flags & DLM_MRES_RECOVERY ? "recovery" : "migration", 1335 res->lockname.len, res->lockname.name); 1336 return ret; 1337 } 1338 1339 1340 1341 /* 1342 * this message will contain no more than one page worth of 1343 * recovery data, and it will work on only one lockres. 1344 * there may be many locks in this page, and we may need to wait 1345 * for additional packets to complete all the locks (rare, but 1346 * possible). 1347 */ 1348 /* 1349 * NOTE: the allocation error cases here are scary 1350 * we really cannot afford to fail an alloc in recovery 1351 * do we spin? returning an error only delays the problem really 1352 */ 1353 1354 int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data, 1355 void **ret_data) 1356 { 1357 struct dlm_ctxt *dlm = data; 1358 struct dlm_migratable_lockres *mres = 1359 (struct dlm_migratable_lockres *)msg->buf; 1360 int ret = 0; 1361 u8 real_master; 1362 u8 extra_refs = 0; 1363 char *buf = NULL; 1364 struct dlm_work_item *item = NULL; 1365 struct dlm_lock_resource *res = NULL; 1366 unsigned int hash; 1367 1368 if (!dlm_grab(dlm)) 1369 return -EINVAL; 1370 1371 if (!dlm_joined(dlm)) { 1372 mlog(ML_ERROR, "Domain %s not joined! " 1373 "lockres %.*s, master %u\n", 1374 dlm->name, mres->lockname_len, 1375 mres->lockname, mres->master); 1376 dlm_put(dlm); 1377 return -EINVAL; 1378 } 1379 1380 BUG_ON(!(mres->flags & (DLM_MRES_RECOVERY|DLM_MRES_MIGRATION))); 1381 1382 real_master = mres->master; 1383 if (real_master == DLM_LOCK_RES_OWNER_UNKNOWN) { 1384 /* cannot migrate a lockres with no master */ 1385 BUG_ON(!(mres->flags & DLM_MRES_RECOVERY)); 1386 } 1387 1388 mlog(0, "%s message received from node %u\n", 1389 (mres->flags & DLM_MRES_RECOVERY) ? 1390 "recovery" : "migration", mres->master); 1391 if (mres->flags & DLM_MRES_ALL_DONE) 1392 mlog(0, "all done flag. all lockres data received!\n"); 1393 1394 ret = -ENOMEM; 1395 buf = kmalloc(be16_to_cpu(msg->data_len), GFP_NOFS); 1396 item = kzalloc(sizeof(*item), GFP_NOFS); 1397 if (!buf || !item) 1398 goto leave; 1399 1400 /* lookup the lock to see if we have a secondary queue for this 1401 * already... just add the locks in and this will have its owner 1402 * and RECOVERY flag changed when it completes. */ 1403 hash = dlm_lockid_hash(mres->lockname, mres->lockname_len); 1404 spin_lock(&dlm->spinlock); 1405 res = __dlm_lookup_lockres_full(dlm, mres->lockname, mres->lockname_len, 1406 hash); 1407 if (res) { 1408 /* this will get a ref on res */ 1409 /* mark it as recovering/migrating and hash it */ 1410 spin_lock(&res->spinlock); 1411 if (res->state & DLM_LOCK_RES_DROPPING_REF) { 1412 mlog(0, "%s: node is attempting to migrate " 1413 "lockres %.*s, but marked as dropping " 1414 " ref!\n", dlm->name, 1415 mres->lockname_len, mres->lockname); 1416 ret = -EINVAL; 1417 spin_unlock(&res->spinlock); 1418 spin_unlock(&dlm->spinlock); 1419 dlm_lockres_put(res); 1420 goto leave; 1421 } 1422 1423 if (mres->flags & DLM_MRES_RECOVERY) { 1424 res->state |= DLM_LOCK_RES_RECOVERING; 1425 } else { 1426 if (res->state & DLM_LOCK_RES_MIGRATING) { 1427 /* this is at least the second 1428 * lockres message */ 1429 mlog(0, "lock %.*s is already migrating\n", 1430 mres->lockname_len, 1431 mres->lockname); 1432 } else if (res->state & DLM_LOCK_RES_RECOVERING) { 1433 /* caller should BUG */ 1434 mlog(ML_ERROR, "node is attempting to migrate " 1435 "lock %.*s, but marked as recovering!\n", 1436 mres->lockname_len, mres->lockname); 1437 ret = -EFAULT; 1438 spin_unlock(&res->spinlock); 1439 spin_unlock(&dlm->spinlock); 1440 dlm_lockres_put(res); 1441 goto leave; 1442 } 1443 res->state |= DLM_LOCK_RES_MIGRATING; 1444 } 1445 spin_unlock(&res->spinlock); 1446 spin_unlock(&dlm->spinlock); 1447 } else { 1448 spin_unlock(&dlm->spinlock); 1449 /* need to allocate, just like if it was 1450 * mastered here normally */ 1451 res = dlm_new_lockres(dlm, mres->lockname, mres->lockname_len); 1452 if (!res) 1453 goto leave; 1454 1455 /* to match the ref that we would have gotten if 1456 * dlm_lookup_lockres had succeeded */ 1457 dlm_lockres_get(res); 1458 1459 /* mark it as recovering/migrating and hash it */ 1460 if (mres->flags & DLM_MRES_RECOVERY) 1461 res->state |= DLM_LOCK_RES_RECOVERING; 1462 else 1463 res->state |= DLM_LOCK_RES_MIGRATING; 1464 1465 spin_lock(&dlm->spinlock); 1466 __dlm_insert_lockres(dlm, res); 1467 spin_unlock(&dlm->spinlock); 1468 1469 /* Add an extra ref for this lock-less lockres lest the 1470 * dlm_thread purges it before we get the chance to add 1471 * locks to it */ 1472 dlm_lockres_get(res); 1473 1474 /* There are three refs that need to be put. 1475 * 1. Taken above. 1476 * 2. kref_init in dlm_new_lockres()->dlm_init_lockres(). 1477 * 3. dlm_lookup_lockres() 1478 * The first one is handled at the end of this function. The 1479 * other two are handled in the worker thread after locks have 1480 * been attached. Yes, we don't wait for purge time to match 1481 * kref_init. The lockres will still have atleast one ref 1482 * added because it is in the hash __dlm_insert_lockres() */ 1483 extra_refs++; 1484 1485 /* now that the new lockres is inserted, 1486 * make it usable by other processes */ 1487 spin_lock(&res->spinlock); 1488 res->state &= ~DLM_LOCK_RES_IN_PROGRESS; 1489 spin_unlock(&res->spinlock); 1490 wake_up(&res->wq); 1491 } 1492 1493 /* at this point we have allocated everything we need, 1494 * and we have a hashed lockres with an extra ref and 1495 * the proper res->state flags. */ 1496 ret = 0; 1497 spin_lock(&res->spinlock); 1498 /* drop this either when master requery finds a different master 1499 * or when a lock is added by the recovery worker */ 1500 dlm_lockres_grab_inflight_ref(dlm, res); 1501 if (mres->master == DLM_LOCK_RES_OWNER_UNKNOWN) { 1502 /* migration cannot have an unknown master */ 1503 BUG_ON(!(mres->flags & DLM_MRES_RECOVERY)); 1504 mlog(0, "recovery has passed me a lockres with an " 1505 "unknown owner.. will need to requery: " 1506 "%.*s\n", mres->lockname_len, mres->lockname); 1507 } else { 1508 /* take a reference now to pin the lockres, drop it 1509 * when locks are added in the worker */ 1510 dlm_change_lockres_owner(dlm, res, dlm->node_num); 1511 } 1512 spin_unlock(&res->spinlock); 1513 1514 /* queue up work for dlm_mig_lockres_worker */ 1515 dlm_grab(dlm); /* get an extra ref for the work item */ 1516 memcpy(buf, msg->buf, be16_to_cpu(msg->data_len)); /* copy the whole message */ 1517 dlm_init_work_item(dlm, item, dlm_mig_lockres_worker, buf); 1518 item->u.ml.lockres = res; /* already have a ref */ 1519 item->u.ml.real_master = real_master; 1520 item->u.ml.extra_ref = extra_refs; 1521 spin_lock(&dlm->work_lock); 1522 list_add_tail(&item->list, &dlm->work_list); 1523 spin_unlock(&dlm->work_lock); 1524 queue_work(dlm->dlm_worker, &dlm->dispatched_work); 1525 1526 leave: 1527 /* One extra ref taken needs to be put here */ 1528 if (extra_refs) 1529 dlm_lockres_put(res); 1530 1531 dlm_put(dlm); 1532 if (ret < 0) { 1533 kfree(buf); 1534 kfree(item); 1535 mlog_errno(ret); 1536 } 1537 1538 return ret; 1539 } 1540 1541 1542 static void dlm_mig_lockres_worker(struct dlm_work_item *item, void *data) 1543 { 1544 struct dlm_ctxt *dlm; 1545 struct dlm_migratable_lockres *mres; 1546 int ret = 0; 1547 struct dlm_lock_resource *res; 1548 u8 real_master; 1549 u8 extra_ref; 1550 1551 dlm = item->dlm; 1552 mres = (struct dlm_migratable_lockres *)data; 1553 1554 res = item->u.ml.lockres; 1555 real_master = item->u.ml.real_master; 1556 extra_ref = item->u.ml.extra_ref; 1557 1558 if (real_master == DLM_LOCK_RES_OWNER_UNKNOWN) { 1559 /* this case is super-rare. only occurs if 1560 * node death happens during migration. */ 1561 again: 1562 ret = dlm_lockres_master_requery(dlm, res, &real_master); 1563 if (ret < 0) { 1564 mlog(0, "dlm_lockres_master_requery ret=%d\n", 1565 ret); 1566 goto again; 1567 } 1568 if (real_master == DLM_LOCK_RES_OWNER_UNKNOWN) { 1569 mlog(0, "lockres %.*s not claimed. " 1570 "this node will take it.\n", 1571 res->lockname.len, res->lockname.name); 1572 } else { 1573 spin_lock(&res->spinlock); 1574 dlm_lockres_drop_inflight_ref(dlm, res); 1575 spin_unlock(&res->spinlock); 1576 mlog(0, "master needs to respond to sender " 1577 "that node %u still owns %.*s\n", 1578 real_master, res->lockname.len, 1579 res->lockname.name); 1580 /* cannot touch this lockres */ 1581 goto leave; 1582 } 1583 } 1584 1585 ret = dlm_process_recovery_data(dlm, res, mres); 1586 if (ret < 0) 1587 mlog(0, "dlm_process_recovery_data returned %d\n", ret); 1588 else 1589 mlog(0, "dlm_process_recovery_data succeeded\n"); 1590 1591 if ((mres->flags & (DLM_MRES_MIGRATION|DLM_MRES_ALL_DONE)) == 1592 (DLM_MRES_MIGRATION|DLM_MRES_ALL_DONE)) { 1593 ret = dlm_finish_migration(dlm, res, mres->master); 1594 if (ret < 0) 1595 mlog_errno(ret); 1596 } 1597 1598 leave: 1599 /* See comment in dlm_mig_lockres_handler() */ 1600 if (res) { 1601 if (extra_ref) 1602 dlm_lockres_put(res); 1603 dlm_lockres_put(res); 1604 } 1605 kfree(data); 1606 } 1607 1608 1609 1610 static int dlm_lockres_master_requery(struct dlm_ctxt *dlm, 1611 struct dlm_lock_resource *res, 1612 u8 *real_master) 1613 { 1614 struct dlm_node_iter iter; 1615 int nodenum; 1616 int ret = 0; 1617 1618 *real_master = DLM_LOCK_RES_OWNER_UNKNOWN; 1619 1620 /* we only reach here if one of the two nodes in a 1621 * migration died while the migration was in progress. 1622 * at this point we need to requery the master. we 1623 * know that the new_master got as far as creating 1624 * an mle on at least one node, but we do not know 1625 * if any nodes had actually cleared the mle and set 1626 * the master to the new_master. the old master 1627 * is supposed to set the owner to UNKNOWN in the 1628 * event of a new_master death, so the only possible 1629 * responses that we can get from nodes here are 1630 * that the master is new_master, or that the master 1631 * is UNKNOWN. 1632 * if all nodes come back with UNKNOWN then we know 1633 * the lock needs remastering here. 1634 * if any node comes back with a valid master, check 1635 * to see if that master is the one that we are 1636 * recovering. if so, then the new_master died and 1637 * we need to remaster this lock. if not, then the 1638 * new_master survived and that node will respond to 1639 * other nodes about the owner. 1640 * if there is an owner, this node needs to dump this 1641 * lockres and alert the sender that this lockres 1642 * was rejected. */ 1643 spin_lock(&dlm->spinlock); 1644 dlm_node_iter_init(dlm->domain_map, &iter); 1645 spin_unlock(&dlm->spinlock); 1646 1647 while ((nodenum = dlm_node_iter_next(&iter)) >= 0) { 1648 /* do not send to self */ 1649 if (nodenum == dlm->node_num) 1650 continue; 1651 ret = dlm_do_master_requery(dlm, res, nodenum, real_master); 1652 if (ret < 0) { 1653 mlog_errno(ret); 1654 if (!dlm_is_host_down(ret)) 1655 BUG(); 1656 /* host is down, so answer for that node would be 1657 * DLM_LOCK_RES_OWNER_UNKNOWN. continue. */ 1658 } 1659 if (*real_master != DLM_LOCK_RES_OWNER_UNKNOWN) { 1660 mlog(0, "lock master is %u\n", *real_master); 1661 break; 1662 } 1663 } 1664 return ret; 1665 } 1666 1667 1668 int dlm_do_master_requery(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, 1669 u8 nodenum, u8 *real_master) 1670 { 1671 int ret; 1672 struct dlm_master_requery req; 1673 int status = DLM_LOCK_RES_OWNER_UNKNOWN; 1674 1675 memset(&req, 0, sizeof(req)); 1676 req.node_idx = dlm->node_num; 1677 req.namelen = res->lockname.len; 1678 memcpy(req.name, res->lockname.name, res->lockname.len); 1679 1680 resend: 1681 ret = o2net_send_message(DLM_MASTER_REQUERY_MSG, dlm->key, 1682 &req, sizeof(req), nodenum, &status); 1683 if (ret < 0) 1684 mlog(ML_ERROR, "Error %d when sending message %u (key " 1685 "0x%x) to node %u\n", ret, DLM_MASTER_REQUERY_MSG, 1686 dlm->key, nodenum); 1687 else if (status == -ENOMEM) { 1688 mlog_errno(status); 1689 msleep(50); 1690 goto resend; 1691 } else { 1692 BUG_ON(status < 0); 1693 BUG_ON(status > DLM_LOCK_RES_OWNER_UNKNOWN); 1694 *real_master = (u8) (status & 0xff); 1695 mlog(0, "node %u responded to master requery with %u\n", 1696 nodenum, *real_master); 1697 ret = 0; 1698 } 1699 return ret; 1700 } 1701 1702 1703 /* this function cannot error, so unless the sending 1704 * or receiving of the message failed, the owner can 1705 * be trusted */ 1706 int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data, 1707 void **ret_data) 1708 { 1709 struct dlm_ctxt *dlm = data; 1710 struct dlm_master_requery *req = (struct dlm_master_requery *)msg->buf; 1711 struct dlm_lock_resource *res = NULL; 1712 unsigned int hash; 1713 int master = DLM_LOCK_RES_OWNER_UNKNOWN; 1714 u32 flags = DLM_ASSERT_MASTER_REQUERY; 1715 int dispatched = 0; 1716 1717 if (!dlm_grab(dlm)) { 1718 /* since the domain has gone away on this 1719 * node, the proper response is UNKNOWN */ 1720 return master; 1721 } 1722 1723 hash = dlm_lockid_hash(req->name, req->namelen); 1724 1725 spin_lock(&dlm->spinlock); 1726 res = __dlm_lookup_lockres(dlm, req->name, req->namelen, hash); 1727 if (res) { 1728 spin_lock(&res->spinlock); 1729 master = res->owner; 1730 if (master == dlm->node_num) { 1731 int ret = dlm_dispatch_assert_master(dlm, res, 1732 0, 0, flags); 1733 if (ret < 0) { 1734 mlog_errno(ret); 1735 spin_unlock(&res->spinlock); 1736 dlm_lockres_put(res); 1737 spin_unlock(&dlm->spinlock); 1738 dlm_put(dlm); 1739 /* sender will take care of this and retry */ 1740 return ret; 1741 } else { 1742 dispatched = 1; 1743 __dlm_lockres_grab_inflight_worker(dlm, res); 1744 spin_unlock(&res->spinlock); 1745 } 1746 } else { 1747 /* put.. incase we are not the master */ 1748 spin_unlock(&res->spinlock); 1749 dlm_lockres_put(res); 1750 } 1751 } 1752 spin_unlock(&dlm->spinlock); 1753 1754 if (!dispatched) 1755 dlm_put(dlm); 1756 return master; 1757 } 1758 1759 static inline struct list_head * 1760 dlm_list_num_to_pointer(struct dlm_lock_resource *res, int list_num) 1761 { 1762 struct list_head *ret; 1763 BUG_ON(list_num < 0); 1764 BUG_ON(list_num > 2); 1765 ret = &(res->granted); 1766 ret += list_num; 1767 return ret; 1768 } 1769 /* TODO: do ast flush business 1770 * TODO: do MIGRATING and RECOVERING spinning 1771 */ 1772 1773 /* 1774 * NOTE about in-flight requests during migration: 1775 * 1776 * Before attempting the migrate, the master has marked the lockres as 1777 * MIGRATING and then flushed all of its pending ASTS. So any in-flight 1778 * requests either got queued before the MIGRATING flag got set, in which 1779 * case the lock data will reflect the change and a return message is on 1780 * the way, or the request failed to get in before MIGRATING got set. In 1781 * this case, the caller will be told to spin and wait for the MIGRATING 1782 * flag to be dropped, then recheck the master. 1783 * This holds true for the convert, cancel and unlock cases, and since lvb 1784 * updates are tied to these same messages, it applies to lvb updates as 1785 * well. For the lock case, there is no way a lock can be on the master 1786 * queue and not be on the secondary queue since the lock is always added 1787 * locally first. This means that the new target node will never be sent 1788 * a lock that he doesn't already have on the list. 1789 * In total, this means that the local lock is correct and should not be 1790 * updated to match the one sent by the master. Any messages sent back 1791 * from the master before the MIGRATING flag will bring the lock properly 1792 * up-to-date, and the change will be ordered properly for the waiter. 1793 * We will *not* attempt to modify the lock underneath the waiter. 1794 */ 1795 1796 static int dlm_process_recovery_data(struct dlm_ctxt *dlm, 1797 struct dlm_lock_resource *res, 1798 struct dlm_migratable_lockres *mres) 1799 { 1800 struct dlm_migratable_lock *ml; 1801 struct list_head *queue, *iter; 1802 struct list_head *tmpq = NULL; 1803 struct dlm_lock *newlock = NULL; 1804 struct dlm_lockstatus *lksb = NULL; 1805 int ret = 0; 1806 int i, j, bad; 1807 struct dlm_lock *lock; 1808 u8 from = O2NM_MAX_NODES; 1809 __be64 c; 1810 1811 mlog(0, "running %d locks for this lockres\n", mres->num_locks); 1812 for (i=0; i<mres->num_locks; i++) { 1813 ml = &(mres->ml[i]); 1814 1815 if (dlm_is_dummy_lock(dlm, ml, &from)) { 1816 /* placeholder, just need to set the refmap bit */ 1817 BUG_ON(mres->num_locks != 1); 1818 mlog(0, "%s:%.*s: dummy lock for %u\n", 1819 dlm->name, mres->lockname_len, mres->lockname, 1820 from); 1821 spin_lock(&res->spinlock); 1822 dlm_lockres_set_refmap_bit(dlm, res, from); 1823 spin_unlock(&res->spinlock); 1824 break; 1825 } 1826 BUG_ON(ml->highest_blocked != LKM_IVMODE); 1827 newlock = NULL; 1828 lksb = NULL; 1829 1830 queue = dlm_list_num_to_pointer(res, ml->list); 1831 tmpq = NULL; 1832 1833 /* if the lock is for the local node it needs to 1834 * be moved to the proper location within the queue. 1835 * do not allocate a new lock structure. */ 1836 if (ml->node == dlm->node_num) { 1837 /* MIGRATION ONLY! */ 1838 BUG_ON(!(mres->flags & DLM_MRES_MIGRATION)); 1839 1840 lock = NULL; 1841 spin_lock(&res->spinlock); 1842 for (j = DLM_GRANTED_LIST; j <= DLM_BLOCKED_LIST; j++) { 1843 tmpq = dlm_list_idx_to_ptr(res, j); 1844 list_for_each(iter, tmpq) { 1845 lock = list_entry(iter, 1846 struct dlm_lock, list); 1847 if (lock->ml.cookie == ml->cookie) 1848 break; 1849 lock = NULL; 1850 } 1851 if (lock) 1852 break; 1853 } 1854 1855 /* lock is always created locally first, and 1856 * destroyed locally last. it must be on the list */ 1857 if (!lock) { 1858 c = ml->cookie; 1859 mlog(ML_ERROR, "Could not find local lock " 1860 "with cookie %u:%llu, node %u, " 1861 "list %u, flags 0x%x, type %d, " 1862 "conv %d, highest blocked %d\n", 1863 dlm_get_lock_cookie_node(be64_to_cpu(c)), 1864 dlm_get_lock_cookie_seq(be64_to_cpu(c)), 1865 ml->node, ml->list, ml->flags, ml->type, 1866 ml->convert_type, ml->highest_blocked); 1867 __dlm_print_one_lock_resource(res); 1868 BUG(); 1869 } 1870 1871 if (lock->ml.node != ml->node) { 1872 c = lock->ml.cookie; 1873 mlog(ML_ERROR, "Mismatched node# in lock " 1874 "cookie %u:%llu, name %.*s, node %u\n", 1875 dlm_get_lock_cookie_node(be64_to_cpu(c)), 1876 dlm_get_lock_cookie_seq(be64_to_cpu(c)), 1877 res->lockname.len, res->lockname.name, 1878 lock->ml.node); 1879 c = ml->cookie; 1880 mlog(ML_ERROR, "Migrate lock cookie %u:%llu, " 1881 "node %u, list %u, flags 0x%x, type %d, " 1882 "conv %d, highest blocked %d\n", 1883 dlm_get_lock_cookie_node(be64_to_cpu(c)), 1884 dlm_get_lock_cookie_seq(be64_to_cpu(c)), 1885 ml->node, ml->list, ml->flags, ml->type, 1886 ml->convert_type, ml->highest_blocked); 1887 __dlm_print_one_lock_resource(res); 1888 BUG(); 1889 } 1890 1891 if (tmpq != queue) { 1892 c = ml->cookie; 1893 mlog(0, "Lock cookie %u:%llu was on list %u " 1894 "instead of list %u for %.*s\n", 1895 dlm_get_lock_cookie_node(be64_to_cpu(c)), 1896 dlm_get_lock_cookie_seq(be64_to_cpu(c)), 1897 j, ml->list, res->lockname.len, 1898 res->lockname.name); 1899 __dlm_print_one_lock_resource(res); 1900 spin_unlock(&res->spinlock); 1901 continue; 1902 } 1903 1904 /* see NOTE above about why we do not update 1905 * to match the master here */ 1906 1907 /* move the lock to its proper place */ 1908 /* do not alter lock refcount. switching lists. */ 1909 list_move_tail(&lock->list, queue); 1910 spin_unlock(&res->spinlock); 1911 1912 mlog(0, "just reordered a local lock!\n"); 1913 continue; 1914 } 1915 1916 /* lock is for another node. */ 1917 newlock = dlm_new_lock(ml->type, ml->node, 1918 be64_to_cpu(ml->cookie), NULL); 1919 if (!newlock) { 1920 ret = -ENOMEM; 1921 goto leave; 1922 } 1923 lksb = newlock->lksb; 1924 dlm_lock_attach_lockres(newlock, res); 1925 1926 if (ml->convert_type != LKM_IVMODE) { 1927 BUG_ON(queue != &res->converting); 1928 newlock->ml.convert_type = ml->convert_type; 1929 } 1930 lksb->flags |= (ml->flags & 1931 (DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB)); 1932 1933 if (ml->type == LKM_NLMODE) 1934 goto skip_lvb; 1935 1936 /* 1937 * If the lock is in the blocked list it can't have a valid lvb, 1938 * so skip it 1939 */ 1940 if (ml->list == DLM_BLOCKED_LIST) 1941 goto skip_lvb; 1942 1943 if (!dlm_lvb_is_empty(mres->lvb)) { 1944 if (lksb->flags & DLM_LKSB_PUT_LVB) { 1945 /* other node was trying to update 1946 * lvb when node died. recreate the 1947 * lksb with the updated lvb. */ 1948 memcpy(lksb->lvb, mres->lvb, DLM_LVB_LEN); 1949 /* the lock resource lvb update must happen 1950 * NOW, before the spinlock is dropped. 1951 * we no longer wait for the AST to update 1952 * the lvb. */ 1953 memcpy(res->lvb, mres->lvb, DLM_LVB_LEN); 1954 } else { 1955 /* otherwise, the node is sending its 1956 * most recent valid lvb info */ 1957 BUG_ON(ml->type != LKM_EXMODE && 1958 ml->type != LKM_PRMODE); 1959 if (!dlm_lvb_is_empty(res->lvb) && 1960 (ml->type == LKM_EXMODE || 1961 memcmp(res->lvb, mres->lvb, DLM_LVB_LEN))) { 1962 int i; 1963 mlog(ML_ERROR, "%s:%.*s: received bad " 1964 "lvb! type=%d\n", dlm->name, 1965 res->lockname.len, 1966 res->lockname.name, ml->type); 1967 printk("lockres lvb=["); 1968 for (i=0; i<DLM_LVB_LEN; i++) 1969 printk("%02x", res->lvb[i]); 1970 printk("]\nmigrated lvb=["); 1971 for (i=0; i<DLM_LVB_LEN; i++) 1972 printk("%02x", mres->lvb[i]); 1973 printk("]\n"); 1974 dlm_print_one_lock_resource(res); 1975 BUG(); 1976 } 1977 memcpy(res->lvb, mres->lvb, DLM_LVB_LEN); 1978 } 1979 } 1980 skip_lvb: 1981 1982 /* NOTE: 1983 * wrt lock queue ordering and recovery: 1984 * 1. order of locks on granted queue is 1985 * meaningless. 1986 * 2. order of locks on converting queue is 1987 * LOST with the node death. sorry charlie. 1988 * 3. order of locks on the blocked queue is 1989 * also LOST. 1990 * order of locks does not affect integrity, it 1991 * just means that a lock request may get pushed 1992 * back in line as a result of the node death. 1993 * also note that for a given node the lock order 1994 * for its secondary queue locks is preserved 1995 * relative to each other, but clearly *not* 1996 * preserved relative to locks from other nodes. 1997 */ 1998 bad = 0; 1999 spin_lock(&res->spinlock); 2000 list_for_each_entry(lock, queue, list) { 2001 if (lock->ml.cookie == ml->cookie) { 2002 c = lock->ml.cookie; 2003 mlog(ML_ERROR, "%s:%.*s: %u:%llu: lock already " 2004 "exists on this lockres!\n", dlm->name, 2005 res->lockname.len, res->lockname.name, 2006 dlm_get_lock_cookie_node(be64_to_cpu(c)), 2007 dlm_get_lock_cookie_seq(be64_to_cpu(c))); 2008 2009 mlog(ML_NOTICE, "sent lock: type=%d, conv=%d, " 2010 "node=%u, cookie=%u:%llu, queue=%d\n", 2011 ml->type, ml->convert_type, ml->node, 2012 dlm_get_lock_cookie_node(be64_to_cpu(ml->cookie)), 2013 dlm_get_lock_cookie_seq(be64_to_cpu(ml->cookie)), 2014 ml->list); 2015 2016 __dlm_print_one_lock_resource(res); 2017 bad = 1; 2018 break; 2019 } 2020 } 2021 if (!bad) { 2022 dlm_lock_get(newlock); 2023 if (mres->flags & DLM_MRES_RECOVERY && 2024 ml->list == DLM_CONVERTING_LIST && 2025 newlock->ml.type > 2026 newlock->ml.convert_type) { 2027 /* newlock is doing downconvert, add it to the 2028 * head of converting list */ 2029 list_add(&newlock->list, queue); 2030 } else 2031 list_add_tail(&newlock->list, queue); 2032 mlog(0, "%s:%.*s: added lock for node %u, " 2033 "setting refmap bit\n", dlm->name, 2034 res->lockname.len, res->lockname.name, ml->node); 2035 dlm_lockres_set_refmap_bit(dlm, res, ml->node); 2036 } 2037 spin_unlock(&res->spinlock); 2038 } 2039 mlog(0, "done running all the locks\n"); 2040 2041 leave: 2042 /* balance the ref taken when the work was queued */ 2043 spin_lock(&res->spinlock); 2044 dlm_lockres_drop_inflight_ref(dlm, res); 2045 spin_unlock(&res->spinlock); 2046 2047 if (ret < 0) 2048 mlog_errno(ret); 2049 2050 return ret; 2051 } 2052 2053 void dlm_move_lockres_to_recovery_list(struct dlm_ctxt *dlm, 2054 struct dlm_lock_resource *res) 2055 { 2056 int i; 2057 struct list_head *queue; 2058 struct dlm_lock *lock, *next; 2059 2060 assert_spin_locked(&dlm->spinlock); 2061 assert_spin_locked(&res->spinlock); 2062 res->state |= DLM_LOCK_RES_RECOVERING; 2063 if (!list_empty(&res->recovering)) { 2064 mlog(0, 2065 "Recovering res %s:%.*s, is already on recovery list!\n", 2066 dlm->name, res->lockname.len, res->lockname.name); 2067 list_del_init(&res->recovering); 2068 dlm_lockres_put(res); 2069 } 2070 /* We need to hold a reference while on the recovery list */ 2071 dlm_lockres_get(res); 2072 list_add_tail(&res->recovering, &dlm->reco.resources); 2073 2074 /* find any pending locks and put them back on proper list */ 2075 for (i=DLM_BLOCKED_LIST; i>=DLM_GRANTED_LIST; i--) { 2076 queue = dlm_list_idx_to_ptr(res, i); 2077 list_for_each_entry_safe(lock, next, queue, list) { 2078 dlm_lock_get(lock); 2079 if (lock->convert_pending) { 2080 /* move converting lock back to granted */ 2081 mlog(0, "node died with convert pending " 2082 "on %.*s. move back to granted list.\n", 2083 res->lockname.len, res->lockname.name); 2084 dlm_revert_pending_convert(res, lock); 2085 lock->convert_pending = 0; 2086 } else if (lock->lock_pending) { 2087 /* remove pending lock requests completely */ 2088 BUG_ON(i != DLM_BLOCKED_LIST); 2089 mlog(0, "node died with lock pending " 2090 "on %.*s. remove from blocked list and skip.\n", 2091 res->lockname.len, res->lockname.name); 2092 /* lock will be floating until ref in 2093 * dlmlock_remote is freed after the network 2094 * call returns. ok for it to not be on any 2095 * list since no ast can be called 2096 * (the master is dead). */ 2097 dlm_revert_pending_lock(res, lock); 2098 lock->lock_pending = 0; 2099 } else if (lock->unlock_pending) { 2100 /* if an unlock was in progress, treat as 2101 * if this had completed successfully 2102 * before sending this lock state to the 2103 * new master. note that the dlm_unlock 2104 * call is still responsible for calling 2105 * the unlockast. that will happen after 2106 * the network call times out. for now, 2107 * just move lists to prepare the new 2108 * recovery master. */ 2109 BUG_ON(i != DLM_GRANTED_LIST); 2110 mlog(0, "node died with unlock pending " 2111 "on %.*s. remove from blocked list and skip.\n", 2112 res->lockname.len, res->lockname.name); 2113 dlm_commit_pending_unlock(res, lock); 2114 lock->unlock_pending = 0; 2115 } else if (lock->cancel_pending) { 2116 /* if a cancel was in progress, treat as 2117 * if this had completed successfully 2118 * before sending this lock state to the 2119 * new master */ 2120 BUG_ON(i != DLM_CONVERTING_LIST); 2121 mlog(0, "node died with cancel pending " 2122 "on %.*s. move back to granted list.\n", 2123 res->lockname.len, res->lockname.name); 2124 dlm_commit_pending_cancel(res, lock); 2125 lock->cancel_pending = 0; 2126 } 2127 dlm_lock_put(lock); 2128 } 2129 } 2130 } 2131 2132 2133 2134 /* removes all recovered locks from the recovery list. 2135 * sets the res->owner to the new master. 2136 * unsets the RECOVERY flag and wakes waiters. */ 2137 static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm, 2138 u8 dead_node, u8 new_master) 2139 { 2140 int i; 2141 struct hlist_head *bucket; 2142 struct dlm_lock_resource *res, *next; 2143 2144 assert_spin_locked(&dlm->spinlock); 2145 2146 list_for_each_entry_safe(res, next, &dlm->reco.resources, recovering) { 2147 if (res->owner == dead_node) { 2148 mlog(0, "%s: res %.*s, Changing owner from %u to %u\n", 2149 dlm->name, res->lockname.len, res->lockname.name, 2150 res->owner, new_master); 2151 list_del_init(&res->recovering); 2152 spin_lock(&res->spinlock); 2153 /* new_master has our reference from 2154 * the lock state sent during recovery */ 2155 dlm_change_lockres_owner(dlm, res, new_master); 2156 res->state &= ~DLM_LOCK_RES_RECOVERING; 2157 if (__dlm_lockres_has_locks(res)) 2158 __dlm_dirty_lockres(dlm, res); 2159 spin_unlock(&res->spinlock); 2160 wake_up(&res->wq); 2161 dlm_lockres_put(res); 2162 } 2163 } 2164 2165 /* this will become unnecessary eventually, but 2166 * for now we need to run the whole hash, clear 2167 * the RECOVERING state and set the owner 2168 * if necessary */ 2169 for (i = 0; i < DLM_HASH_BUCKETS; i++) { 2170 bucket = dlm_lockres_hash(dlm, i); 2171 hlist_for_each_entry(res, bucket, hash_node) { 2172 if (res->state & DLM_LOCK_RES_RECOVERY_WAITING) { 2173 spin_lock(&res->spinlock); 2174 res->state &= ~DLM_LOCK_RES_RECOVERY_WAITING; 2175 spin_unlock(&res->spinlock); 2176 wake_up(&res->wq); 2177 } 2178 2179 if (!(res->state & DLM_LOCK_RES_RECOVERING)) 2180 continue; 2181 2182 if (res->owner != dead_node && 2183 res->owner != dlm->node_num) 2184 continue; 2185 2186 if (!list_empty(&res->recovering)) { 2187 list_del_init(&res->recovering); 2188 dlm_lockres_put(res); 2189 } 2190 2191 /* new_master has our reference from 2192 * the lock state sent during recovery */ 2193 mlog(0, "%s: res %.*s, Changing owner from %u to %u\n", 2194 dlm->name, res->lockname.len, res->lockname.name, 2195 res->owner, new_master); 2196 spin_lock(&res->spinlock); 2197 dlm_change_lockres_owner(dlm, res, new_master); 2198 res->state &= ~DLM_LOCK_RES_RECOVERING; 2199 if (__dlm_lockres_has_locks(res)) 2200 __dlm_dirty_lockres(dlm, res); 2201 spin_unlock(&res->spinlock); 2202 wake_up(&res->wq); 2203 } 2204 } 2205 } 2206 2207 static inline int dlm_lvb_needs_invalidation(struct dlm_lock *lock, int local) 2208 { 2209 if (local) { 2210 if (lock->ml.type != LKM_EXMODE && 2211 lock->ml.type != LKM_PRMODE) 2212 return 1; 2213 } else if (lock->ml.type == LKM_EXMODE) 2214 return 1; 2215 return 0; 2216 } 2217 2218 static void dlm_revalidate_lvb(struct dlm_ctxt *dlm, 2219 struct dlm_lock_resource *res, u8 dead_node) 2220 { 2221 struct list_head *queue; 2222 struct dlm_lock *lock; 2223 int blank_lvb = 0, local = 0; 2224 int i; 2225 u8 search_node; 2226 2227 assert_spin_locked(&dlm->spinlock); 2228 assert_spin_locked(&res->spinlock); 2229 2230 if (res->owner == dlm->node_num) 2231 /* if this node owned the lockres, and if the dead node 2232 * had an EX when he died, blank out the lvb */ 2233 search_node = dead_node; 2234 else { 2235 /* if this is a secondary lockres, and we had no EX or PR 2236 * locks granted, we can no longer trust the lvb */ 2237 search_node = dlm->node_num; 2238 local = 1; /* check local state for valid lvb */ 2239 } 2240 2241 for (i=DLM_GRANTED_LIST; i<=DLM_CONVERTING_LIST; i++) { 2242 queue = dlm_list_idx_to_ptr(res, i); 2243 list_for_each_entry(lock, queue, list) { 2244 if (lock->ml.node == search_node) { 2245 if (dlm_lvb_needs_invalidation(lock, local)) { 2246 /* zero the lksb lvb and lockres lvb */ 2247 blank_lvb = 1; 2248 memset(lock->lksb->lvb, 0, DLM_LVB_LEN); 2249 } 2250 } 2251 } 2252 } 2253 2254 if (blank_lvb) { 2255 mlog(0, "clearing %.*s lvb, dead node %u had EX\n", 2256 res->lockname.len, res->lockname.name, dead_node); 2257 memset(res->lvb, 0, DLM_LVB_LEN); 2258 } 2259 } 2260 2261 static void dlm_free_dead_locks(struct dlm_ctxt *dlm, 2262 struct dlm_lock_resource *res, u8 dead_node) 2263 { 2264 struct dlm_lock *lock, *next; 2265 unsigned int freed = 0; 2266 2267 /* this node is the lockres master: 2268 * 1) remove any stale locks for the dead node 2269 * 2) if the dead node had an EX when he died, blank out the lvb 2270 */ 2271 assert_spin_locked(&dlm->spinlock); 2272 assert_spin_locked(&res->spinlock); 2273 2274 /* We do two dlm_lock_put(). One for removing from list and the other is 2275 * to force the DLM_UNLOCK_FREE_LOCK action so as to free the locks */ 2276 2277 /* TODO: check pending_asts, pending_basts here */ 2278 list_for_each_entry_safe(lock, next, &res->granted, list) { 2279 if (lock->ml.node == dead_node) { 2280 list_del_init(&lock->list); 2281 dlm_lock_put(lock); 2282 /* Can't schedule DLM_UNLOCK_FREE_LOCK - do manually */ 2283 dlm_lock_put(lock); 2284 freed++; 2285 } 2286 } 2287 list_for_each_entry_safe(lock, next, &res->converting, list) { 2288 if (lock->ml.node == dead_node) { 2289 list_del_init(&lock->list); 2290 dlm_lock_put(lock); 2291 /* Can't schedule DLM_UNLOCK_FREE_LOCK - do manually */ 2292 dlm_lock_put(lock); 2293 freed++; 2294 } 2295 } 2296 list_for_each_entry_safe(lock, next, &res->blocked, list) { 2297 if (lock->ml.node == dead_node) { 2298 list_del_init(&lock->list); 2299 dlm_lock_put(lock); 2300 /* Can't schedule DLM_UNLOCK_FREE_LOCK - do manually */ 2301 dlm_lock_put(lock); 2302 freed++; 2303 } 2304 } 2305 2306 if (freed) { 2307 mlog(0, "%s:%.*s: freed %u locks for dead node %u, " 2308 "dropping ref from lockres\n", dlm->name, 2309 res->lockname.len, res->lockname.name, freed, dead_node); 2310 if(!test_bit(dead_node, res->refmap)) { 2311 mlog(ML_ERROR, "%s:%.*s: freed %u locks for dead node %u, " 2312 "but ref was not set\n", dlm->name, 2313 res->lockname.len, res->lockname.name, freed, dead_node); 2314 __dlm_print_one_lock_resource(res); 2315 } 2316 res->state |= DLM_LOCK_RES_RECOVERY_WAITING; 2317 dlm_lockres_clear_refmap_bit(dlm, res, dead_node); 2318 } else if (test_bit(dead_node, res->refmap)) { 2319 mlog(0, "%s:%.*s: dead node %u had a ref, but had " 2320 "no locks and had not purged before dying\n", dlm->name, 2321 res->lockname.len, res->lockname.name, dead_node); 2322 dlm_lockres_clear_refmap_bit(dlm, res, dead_node); 2323 } 2324 2325 /* do not kick thread yet */ 2326 __dlm_dirty_lockres(dlm, res); 2327 } 2328 2329 static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node) 2330 { 2331 struct dlm_lock_resource *res; 2332 int i; 2333 struct hlist_head *bucket; 2334 struct hlist_node *tmp; 2335 struct dlm_lock *lock; 2336 2337 2338 /* purge any stale mles */ 2339 dlm_clean_master_list(dlm, dead_node); 2340 2341 /* 2342 * now clean up all lock resources. there are two rules: 2343 * 2344 * 1) if the dead node was the master, move the lockres 2345 * to the recovering list. set the RECOVERING flag. 2346 * this lockres needs to be cleaned up before it can 2347 * be used further. 2348 * 2349 * 2) if this node was the master, remove all locks from 2350 * each of the lockres queues that were owned by the 2351 * dead node. once recovery finishes, the dlm thread 2352 * can be kicked again to see if any ASTs or BASTs 2353 * need to be fired as a result. 2354 */ 2355 for (i = 0; i < DLM_HASH_BUCKETS; i++) { 2356 bucket = dlm_lockres_hash(dlm, i); 2357 hlist_for_each_entry_safe(res, tmp, bucket, hash_node) { 2358 /* always prune any $RECOVERY entries for dead nodes, 2359 * otherwise hangs can occur during later recovery */ 2360 if (dlm_is_recovery_lock(res->lockname.name, 2361 res->lockname.len)) { 2362 spin_lock(&res->spinlock); 2363 list_for_each_entry(lock, &res->granted, list) { 2364 if (lock->ml.node == dead_node) { 2365 mlog(0, "AHA! there was " 2366 "a $RECOVERY lock for dead " 2367 "node %u (%s)!\n", 2368 dead_node, dlm->name); 2369 list_del_init(&lock->list); 2370 dlm_lock_put(lock); 2371 /* Can't schedule 2372 * DLM_UNLOCK_FREE_LOCK 2373 * - do manually */ 2374 dlm_lock_put(lock); 2375 break; 2376 } 2377 } 2378 2379 if ((res->owner == dead_node) && 2380 (res->state & DLM_LOCK_RES_DROPPING_REF)) { 2381 dlm_lockres_get(res); 2382 __dlm_do_purge_lockres(dlm, res); 2383 spin_unlock(&res->spinlock); 2384 wake_up(&res->wq); 2385 dlm_lockres_put(res); 2386 continue; 2387 } else if (res->owner == dlm->node_num) 2388 dlm_lockres_clear_refmap_bit(dlm, res, dead_node); 2389 spin_unlock(&res->spinlock); 2390 continue; 2391 } 2392 spin_lock(&res->spinlock); 2393 /* zero the lvb if necessary */ 2394 dlm_revalidate_lvb(dlm, res, dead_node); 2395 if (res->owner == dead_node) { 2396 if (res->state & DLM_LOCK_RES_DROPPING_REF) { 2397 mlog(0, "%s:%.*s: owned by " 2398 "dead node %u, this node was " 2399 "dropping its ref when master died. " 2400 "continue, purging the lockres.\n", 2401 dlm->name, res->lockname.len, 2402 res->lockname.name, dead_node); 2403 dlm_lockres_get(res); 2404 __dlm_do_purge_lockres(dlm, res); 2405 spin_unlock(&res->spinlock); 2406 wake_up(&res->wq); 2407 dlm_lockres_put(res); 2408 continue; 2409 } 2410 dlm_move_lockres_to_recovery_list(dlm, res); 2411 } else if (res->owner == dlm->node_num) { 2412 dlm_free_dead_locks(dlm, res, dead_node); 2413 __dlm_lockres_calc_usage(dlm, res); 2414 } else if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) { 2415 if (test_bit(dead_node, res->refmap)) { 2416 mlog(0, "%s:%.*s: dead node %u had a ref, but had " 2417 "no locks and had not purged before dying\n", 2418 dlm->name, res->lockname.len, 2419 res->lockname.name, dead_node); 2420 dlm_lockres_clear_refmap_bit(dlm, res, dead_node); 2421 } 2422 } 2423 spin_unlock(&res->spinlock); 2424 } 2425 } 2426 2427 } 2428 2429 static void __dlm_hb_node_down(struct dlm_ctxt *dlm, int idx) 2430 { 2431 assert_spin_locked(&dlm->spinlock); 2432 2433 if (dlm->reco.new_master == idx) { 2434 mlog(0, "%s: recovery master %d just died\n", 2435 dlm->name, idx); 2436 if (dlm->reco.state & DLM_RECO_STATE_FINALIZE) { 2437 /* finalize1 was reached, so it is safe to clear 2438 * the new_master and dead_node. that recovery 2439 * is complete. */ 2440 mlog(0, "%s: dead master %d had reached " 2441 "finalize1 state, clearing\n", dlm->name, idx); 2442 dlm->reco.state &= ~DLM_RECO_STATE_FINALIZE; 2443 __dlm_reset_recovery(dlm); 2444 } 2445 } 2446 2447 /* Clean up join state on node death. */ 2448 if (dlm->joining_node == idx) { 2449 mlog(0, "Clearing join state for node %u\n", idx); 2450 __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN); 2451 } 2452 2453 /* check to see if the node is already considered dead */ 2454 if (!test_bit(idx, dlm->live_nodes_map)) { 2455 mlog(0, "for domain %s, node %d is already dead. " 2456 "another node likely did recovery already.\n", 2457 dlm->name, idx); 2458 return; 2459 } 2460 2461 /* check to see if we do not care about this node */ 2462 if (!test_bit(idx, dlm->domain_map)) { 2463 /* This also catches the case that we get a node down 2464 * but haven't joined the domain yet. */ 2465 mlog(0, "node %u already removed from domain!\n", idx); 2466 return; 2467 } 2468 2469 clear_bit(idx, dlm->live_nodes_map); 2470 2471 /* make sure local cleanup occurs before the heartbeat events */ 2472 if (!test_bit(idx, dlm->recovery_map)) 2473 dlm_do_local_recovery_cleanup(dlm, idx); 2474 2475 /* notify anything attached to the heartbeat events */ 2476 dlm_hb_event_notify_attached(dlm, idx, 0); 2477 2478 mlog(0, "node %u being removed from domain map!\n", idx); 2479 clear_bit(idx, dlm->domain_map); 2480 clear_bit(idx, dlm->exit_domain_map); 2481 /* wake up migration waiters if a node goes down. 2482 * perhaps later we can genericize this for other waiters. */ 2483 wake_up(&dlm->migration_wq); 2484 2485 set_bit(idx, dlm->recovery_map); 2486 } 2487 2488 void dlm_hb_node_down_cb(struct o2nm_node *node, int idx, void *data) 2489 { 2490 struct dlm_ctxt *dlm = data; 2491 2492 if (!dlm_grab(dlm)) 2493 return; 2494 2495 /* 2496 * This will notify any dlm users that a node in our domain 2497 * went away without notifying us first. 2498 */ 2499 if (test_bit(idx, dlm->domain_map)) 2500 dlm_fire_domain_eviction_callbacks(dlm, idx); 2501 2502 spin_lock(&dlm->spinlock); 2503 __dlm_hb_node_down(dlm, idx); 2504 spin_unlock(&dlm->spinlock); 2505 2506 dlm_put(dlm); 2507 } 2508 2509 void dlm_hb_node_up_cb(struct o2nm_node *node, int idx, void *data) 2510 { 2511 struct dlm_ctxt *dlm = data; 2512 2513 if (!dlm_grab(dlm)) 2514 return; 2515 2516 spin_lock(&dlm->spinlock); 2517 set_bit(idx, dlm->live_nodes_map); 2518 /* do NOT notify mle attached to the heartbeat events. 2519 * new nodes are not interesting in mastery until joined. */ 2520 spin_unlock(&dlm->spinlock); 2521 2522 dlm_put(dlm); 2523 } 2524 2525 static void dlm_reco_ast(void *astdata) 2526 { 2527 struct dlm_ctxt *dlm = astdata; 2528 mlog(0, "ast for recovery lock fired!, this=%u, dlm=%s\n", 2529 dlm->node_num, dlm->name); 2530 } 2531 static void dlm_reco_bast(void *astdata, int blocked_type) 2532 { 2533 struct dlm_ctxt *dlm = astdata; 2534 mlog(0, "bast for recovery lock fired!, this=%u, dlm=%s\n", 2535 dlm->node_num, dlm->name); 2536 } 2537 static void dlm_reco_unlock_ast(void *astdata, enum dlm_status st) 2538 { 2539 mlog(0, "unlockast for recovery lock fired!\n"); 2540 } 2541 2542 /* 2543 * dlm_pick_recovery_master will continually attempt to use 2544 * dlmlock() on the special "$RECOVERY" lockres with the 2545 * LKM_NOQUEUE flag to get an EX. every thread that enters 2546 * this function on each node racing to become the recovery 2547 * master will not stop attempting this until either: 2548 * a) this node gets the EX (and becomes the recovery master), 2549 * or b) dlm->reco.new_master gets set to some nodenum 2550 * != O2NM_INVALID_NODE_NUM (another node will do the reco). 2551 * so each time a recovery master is needed, the entire cluster 2552 * will sync at this point. if the new master dies, that will 2553 * be detected in dlm_do_recovery */ 2554 static int dlm_pick_recovery_master(struct dlm_ctxt *dlm) 2555 { 2556 enum dlm_status ret; 2557 struct dlm_lockstatus lksb; 2558 int status = -EINVAL; 2559 2560 mlog(0, "starting recovery of %s at %lu, dead=%u, this=%u\n", 2561 dlm->name, jiffies, dlm->reco.dead_node, dlm->node_num); 2562 again: 2563 memset(&lksb, 0, sizeof(lksb)); 2564 2565 ret = dlmlock(dlm, LKM_EXMODE, &lksb, LKM_NOQUEUE|LKM_RECOVERY, 2566 DLM_RECOVERY_LOCK_NAME, DLM_RECOVERY_LOCK_NAME_LEN, 2567 dlm_reco_ast, dlm, dlm_reco_bast); 2568 2569 mlog(0, "%s: dlmlock($RECOVERY) returned %d, lksb=%d\n", 2570 dlm->name, ret, lksb.status); 2571 2572 if (ret == DLM_NORMAL) { 2573 mlog(0, "dlm=%s dlmlock says I got it (this=%u)\n", 2574 dlm->name, dlm->node_num); 2575 2576 /* got the EX lock. check to see if another node 2577 * just became the reco master */ 2578 if (dlm_reco_master_ready(dlm)) { 2579 mlog(0, "%s: got reco EX lock, but %u will " 2580 "do the recovery\n", dlm->name, 2581 dlm->reco.new_master); 2582 status = -EEXIST; 2583 } else { 2584 status = 0; 2585 2586 /* see if recovery was already finished elsewhere */ 2587 spin_lock(&dlm->spinlock); 2588 if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) { 2589 status = -EINVAL; 2590 mlog(0, "%s: got reco EX lock, but " 2591 "node got recovered already\n", dlm->name); 2592 if (dlm->reco.new_master != O2NM_INVALID_NODE_NUM) { 2593 mlog(ML_ERROR, "%s: new master is %u " 2594 "but no dead node!\n", 2595 dlm->name, dlm->reco.new_master); 2596 BUG(); 2597 } 2598 } 2599 spin_unlock(&dlm->spinlock); 2600 } 2601 2602 /* if this node has actually become the recovery master, 2603 * set the master and send the messages to begin recovery */ 2604 if (!status) { 2605 mlog(0, "%s: dead=%u, this=%u, sending " 2606 "begin_reco now\n", dlm->name, 2607 dlm->reco.dead_node, dlm->node_num); 2608 status = dlm_send_begin_reco_message(dlm, 2609 dlm->reco.dead_node); 2610 /* this always succeeds */ 2611 BUG_ON(status); 2612 2613 /* set the new_master to this node */ 2614 spin_lock(&dlm->spinlock); 2615 dlm_set_reco_master(dlm, dlm->node_num); 2616 spin_unlock(&dlm->spinlock); 2617 } 2618 2619 /* recovery lock is a special case. ast will not get fired, 2620 * so just go ahead and unlock it. */ 2621 ret = dlmunlock(dlm, &lksb, 0, dlm_reco_unlock_ast, dlm); 2622 if (ret == DLM_DENIED) { 2623 mlog(0, "got DLM_DENIED, trying LKM_CANCEL\n"); 2624 ret = dlmunlock(dlm, &lksb, LKM_CANCEL, dlm_reco_unlock_ast, dlm); 2625 } 2626 if (ret != DLM_NORMAL) { 2627 /* this would really suck. this could only happen 2628 * if there was a network error during the unlock 2629 * because of node death. this means the unlock 2630 * is actually "done" and the lock structure is 2631 * even freed. we can continue, but only 2632 * because this specific lock name is special. */ 2633 mlog(ML_ERROR, "dlmunlock returned %d\n", ret); 2634 } 2635 } else if (ret == DLM_NOTQUEUED) { 2636 mlog(0, "dlm=%s dlmlock says another node got it (this=%u)\n", 2637 dlm->name, dlm->node_num); 2638 /* another node is master. wait on 2639 * reco.new_master != O2NM_INVALID_NODE_NUM 2640 * for at most one second */ 2641 wait_event_timeout(dlm->dlm_reco_thread_wq, 2642 dlm_reco_master_ready(dlm), 2643 msecs_to_jiffies(1000)); 2644 if (!dlm_reco_master_ready(dlm)) { 2645 mlog(0, "%s: reco master taking awhile\n", 2646 dlm->name); 2647 goto again; 2648 } 2649 /* another node has informed this one that it is reco master */ 2650 mlog(0, "%s: reco master %u is ready to recover %u\n", 2651 dlm->name, dlm->reco.new_master, dlm->reco.dead_node); 2652 status = -EEXIST; 2653 } else if (ret == DLM_RECOVERING) { 2654 mlog(0, "dlm=%s dlmlock says master node died (this=%u)\n", 2655 dlm->name, dlm->node_num); 2656 goto again; 2657 } else { 2658 struct dlm_lock_resource *res; 2659 2660 /* dlmlock returned something other than NOTQUEUED or NORMAL */ 2661 mlog(ML_ERROR, "%s: got %s from dlmlock($RECOVERY), " 2662 "lksb.status=%s\n", dlm->name, dlm_errname(ret), 2663 dlm_errname(lksb.status)); 2664 res = dlm_lookup_lockres(dlm, DLM_RECOVERY_LOCK_NAME, 2665 DLM_RECOVERY_LOCK_NAME_LEN); 2666 if (res) { 2667 dlm_print_one_lock_resource(res); 2668 dlm_lockres_put(res); 2669 } else { 2670 mlog(ML_ERROR, "recovery lock not found\n"); 2671 } 2672 BUG(); 2673 } 2674 2675 return status; 2676 } 2677 2678 static int dlm_send_begin_reco_message(struct dlm_ctxt *dlm, u8 dead_node) 2679 { 2680 struct dlm_begin_reco br; 2681 int ret = 0; 2682 struct dlm_node_iter iter; 2683 int nodenum; 2684 int status; 2685 2686 mlog(0, "%s: dead node is %u\n", dlm->name, dead_node); 2687 2688 spin_lock(&dlm->spinlock); 2689 dlm_node_iter_init(dlm->domain_map, &iter); 2690 spin_unlock(&dlm->spinlock); 2691 2692 clear_bit(dead_node, iter.node_map); 2693 2694 memset(&br, 0, sizeof(br)); 2695 br.node_idx = dlm->node_num; 2696 br.dead_node = dead_node; 2697 2698 while ((nodenum = dlm_node_iter_next(&iter)) >= 0) { 2699 ret = 0; 2700 if (nodenum == dead_node) { 2701 mlog(0, "not sending begin reco to dead node " 2702 "%u\n", dead_node); 2703 continue; 2704 } 2705 if (nodenum == dlm->node_num) { 2706 mlog(0, "not sending begin reco to self\n"); 2707 continue; 2708 } 2709 retry: 2710 ret = -EINVAL; 2711 mlog(0, "attempting to send begin reco msg to %d\n", 2712 nodenum); 2713 ret = o2net_send_message(DLM_BEGIN_RECO_MSG, dlm->key, 2714 &br, sizeof(br), nodenum, &status); 2715 /* negative status is handled ok by caller here */ 2716 if (ret >= 0) 2717 ret = status; 2718 if (dlm_is_host_down(ret)) { 2719 /* node is down. not involved in recovery 2720 * so just keep going */ 2721 mlog(ML_NOTICE, "%s: node %u was down when sending " 2722 "begin reco msg (%d)\n", dlm->name, nodenum, ret); 2723 ret = 0; 2724 } 2725 2726 /* 2727 * Prior to commit aad1b15310b9bcd59fa81ab8f2b1513b59553ea8, 2728 * dlm_begin_reco_handler() returned EAGAIN and not -EAGAIN. 2729 * We are handling both for compatibility reasons. 2730 */ 2731 if (ret == -EAGAIN || ret == EAGAIN) { 2732 mlog(0, "%s: trying to start recovery of node " 2733 "%u, but node %u is waiting for last recovery " 2734 "to complete, backoff for a bit\n", dlm->name, 2735 dead_node, nodenum); 2736 msleep(100); 2737 goto retry; 2738 } 2739 if (ret < 0) { 2740 struct dlm_lock_resource *res; 2741 2742 /* this is now a serious problem, possibly ENOMEM 2743 * in the network stack. must retry */ 2744 mlog_errno(ret); 2745 mlog(ML_ERROR, "begin reco of dlm %s to node %u " 2746 "returned %d\n", dlm->name, nodenum, ret); 2747 res = dlm_lookup_lockres(dlm, DLM_RECOVERY_LOCK_NAME, 2748 DLM_RECOVERY_LOCK_NAME_LEN); 2749 if (res) { 2750 dlm_print_one_lock_resource(res); 2751 dlm_lockres_put(res); 2752 } else { 2753 mlog(ML_ERROR, "recovery lock not found\n"); 2754 } 2755 /* sleep for a bit in hopes that we can avoid 2756 * another ENOMEM */ 2757 msleep(100); 2758 goto retry; 2759 } 2760 } 2761 2762 return ret; 2763 } 2764 2765 int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data, 2766 void **ret_data) 2767 { 2768 struct dlm_ctxt *dlm = data; 2769 struct dlm_begin_reco *br = (struct dlm_begin_reco *)msg->buf; 2770 2771 /* ok to return 0, domain has gone away */ 2772 if (!dlm_grab(dlm)) 2773 return 0; 2774 2775 spin_lock(&dlm->spinlock); 2776 if (dlm->reco.state & DLM_RECO_STATE_FINALIZE) { 2777 mlog(0, "%s: node %u wants to recover node %u (%u:%u) " 2778 "but this node is in finalize state, waiting on finalize2\n", 2779 dlm->name, br->node_idx, br->dead_node, 2780 dlm->reco.dead_node, dlm->reco.new_master); 2781 spin_unlock(&dlm->spinlock); 2782 dlm_put(dlm); 2783 return -EAGAIN; 2784 } 2785 spin_unlock(&dlm->spinlock); 2786 2787 mlog(0, "%s: node %u wants to recover node %u (%u:%u)\n", 2788 dlm->name, br->node_idx, br->dead_node, 2789 dlm->reco.dead_node, dlm->reco.new_master); 2790 2791 dlm_fire_domain_eviction_callbacks(dlm, br->dead_node); 2792 2793 spin_lock(&dlm->spinlock); 2794 if (dlm->reco.new_master != O2NM_INVALID_NODE_NUM) { 2795 if (test_bit(dlm->reco.new_master, dlm->recovery_map)) { 2796 mlog(0, "%s: new_master %u died, changing " 2797 "to %u\n", dlm->name, dlm->reco.new_master, 2798 br->node_idx); 2799 } else { 2800 mlog(0, "%s: new_master %u NOT DEAD, changing " 2801 "to %u\n", dlm->name, dlm->reco.new_master, 2802 br->node_idx); 2803 /* may not have seen the new master as dead yet */ 2804 } 2805 } 2806 if (dlm->reco.dead_node != O2NM_INVALID_NODE_NUM) { 2807 mlog(ML_NOTICE, "%s: dead_node previously set to %u, " 2808 "node %u changing it to %u\n", dlm->name, 2809 dlm->reco.dead_node, br->node_idx, br->dead_node); 2810 } 2811 dlm_set_reco_master(dlm, br->node_idx); 2812 dlm_set_reco_dead_node(dlm, br->dead_node); 2813 if (!test_bit(br->dead_node, dlm->recovery_map)) { 2814 mlog(0, "recovery master %u sees %u as dead, but this " 2815 "node has not yet. marking %u as dead\n", 2816 br->node_idx, br->dead_node, br->dead_node); 2817 if (!test_bit(br->dead_node, dlm->domain_map) || 2818 !test_bit(br->dead_node, dlm->live_nodes_map)) 2819 mlog(0, "%u not in domain/live_nodes map " 2820 "so setting it in reco map manually\n", 2821 br->dead_node); 2822 /* force the recovery cleanup in __dlm_hb_node_down 2823 * both of these will be cleared in a moment */ 2824 set_bit(br->dead_node, dlm->domain_map); 2825 set_bit(br->dead_node, dlm->live_nodes_map); 2826 __dlm_hb_node_down(dlm, br->dead_node); 2827 } 2828 spin_unlock(&dlm->spinlock); 2829 2830 dlm_kick_recovery_thread(dlm); 2831 2832 mlog(0, "%s: recovery started by node %u, for %u (%u:%u)\n", 2833 dlm->name, br->node_idx, br->dead_node, 2834 dlm->reco.dead_node, dlm->reco.new_master); 2835 2836 dlm_put(dlm); 2837 return 0; 2838 } 2839 2840 #define DLM_FINALIZE_STAGE2 0x01 2841 static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm) 2842 { 2843 int ret = 0; 2844 struct dlm_finalize_reco fr; 2845 struct dlm_node_iter iter; 2846 int nodenum; 2847 int status; 2848 int stage = 1; 2849 2850 mlog(0, "finishing recovery for node %s:%u, " 2851 "stage %d\n", dlm->name, dlm->reco.dead_node, stage); 2852 2853 spin_lock(&dlm->spinlock); 2854 dlm_node_iter_init(dlm->domain_map, &iter); 2855 spin_unlock(&dlm->spinlock); 2856 2857 stage2: 2858 memset(&fr, 0, sizeof(fr)); 2859 fr.node_idx = dlm->node_num; 2860 fr.dead_node = dlm->reco.dead_node; 2861 if (stage == 2) 2862 fr.flags |= DLM_FINALIZE_STAGE2; 2863 2864 while ((nodenum = dlm_node_iter_next(&iter)) >= 0) { 2865 if (nodenum == dlm->node_num) 2866 continue; 2867 ret = o2net_send_message(DLM_FINALIZE_RECO_MSG, dlm->key, 2868 &fr, sizeof(fr), nodenum, &status); 2869 if (ret >= 0) 2870 ret = status; 2871 if (ret < 0) { 2872 mlog(ML_ERROR, "Error %d when sending message %u (key " 2873 "0x%x) to node %u\n", ret, DLM_FINALIZE_RECO_MSG, 2874 dlm->key, nodenum); 2875 if (dlm_is_host_down(ret)) { 2876 /* this has no effect on this recovery 2877 * session, so set the status to zero to 2878 * finish out the last recovery */ 2879 mlog(ML_ERROR, "node %u went down after this " 2880 "node finished recovery.\n", nodenum); 2881 ret = 0; 2882 continue; 2883 } 2884 break; 2885 } 2886 } 2887 if (stage == 1) { 2888 /* reset the node_iter back to the top and send finalize2 */ 2889 iter.curnode = -1; 2890 stage = 2; 2891 goto stage2; 2892 } 2893 2894 return ret; 2895 } 2896 2897 int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data, 2898 void **ret_data) 2899 { 2900 struct dlm_ctxt *dlm = data; 2901 struct dlm_finalize_reco *fr = (struct dlm_finalize_reco *)msg->buf; 2902 int stage = 1; 2903 2904 /* ok to return 0, domain has gone away */ 2905 if (!dlm_grab(dlm)) 2906 return 0; 2907 2908 if (fr->flags & DLM_FINALIZE_STAGE2) 2909 stage = 2; 2910 2911 mlog(0, "%s: node %u finalizing recovery stage%d of " 2912 "node %u (%u:%u)\n", dlm->name, fr->node_idx, stage, 2913 fr->dead_node, dlm->reco.dead_node, dlm->reco.new_master); 2914 2915 spin_lock(&dlm->spinlock); 2916 2917 if (dlm->reco.new_master != fr->node_idx) { 2918 mlog(ML_ERROR, "node %u sent recovery finalize msg, but node " 2919 "%u is supposed to be the new master, dead=%u\n", 2920 fr->node_idx, dlm->reco.new_master, fr->dead_node); 2921 BUG(); 2922 } 2923 if (dlm->reco.dead_node != fr->dead_node) { 2924 mlog(ML_ERROR, "node %u sent recovery finalize msg for dead " 2925 "node %u, but node %u is supposed to be dead\n", 2926 fr->node_idx, fr->dead_node, dlm->reco.dead_node); 2927 BUG(); 2928 } 2929 2930 switch (stage) { 2931 case 1: 2932 dlm_finish_local_lockres_recovery(dlm, fr->dead_node, fr->node_idx); 2933 if (dlm->reco.state & DLM_RECO_STATE_FINALIZE) { 2934 mlog(ML_ERROR, "%s: received finalize1 from " 2935 "new master %u for dead node %u, but " 2936 "this node has already received it!\n", 2937 dlm->name, fr->node_idx, fr->dead_node); 2938 dlm_print_reco_node_status(dlm); 2939 BUG(); 2940 } 2941 dlm->reco.state |= DLM_RECO_STATE_FINALIZE; 2942 spin_unlock(&dlm->spinlock); 2943 break; 2944 case 2: 2945 if (!(dlm->reco.state & DLM_RECO_STATE_FINALIZE)) { 2946 mlog(ML_ERROR, "%s: received finalize2 from " 2947 "new master %u for dead node %u, but " 2948 "this node did not have finalize1!\n", 2949 dlm->name, fr->node_idx, fr->dead_node); 2950 dlm_print_reco_node_status(dlm); 2951 BUG(); 2952 } 2953 dlm->reco.state &= ~DLM_RECO_STATE_FINALIZE; 2954 __dlm_reset_recovery(dlm); 2955 spin_unlock(&dlm->spinlock); 2956 dlm_kick_recovery_thread(dlm); 2957 break; 2958 } 2959 2960 mlog(0, "%s: recovery done, reco master was %u, dead now %u, master now %u\n", 2961 dlm->name, fr->node_idx, dlm->reco.dead_node, dlm->reco.new_master); 2962 2963 dlm_put(dlm); 2964 return 0; 2965 } 2966