1 // SPDX-License-Identifier: GPL-2.0-only 2 /****************************************************************************** 3 ******************************************************************************* 4 ** 5 ** Copyright (C) 2005-2010 Red Hat, Inc. All rights reserved. 6 ** 7 ** 8 ******************************************************************************* 9 ******************************************************************************/ 10 11 /* Central locking logic has four stages: 12 13 dlm_lock() 14 dlm_unlock() 15 16 request_lock(ls, lkb) 17 convert_lock(ls, lkb) 18 unlock_lock(ls, lkb) 19 cancel_lock(ls, lkb) 20 21 _request_lock(r, lkb) 22 _convert_lock(r, lkb) 23 _unlock_lock(r, lkb) 24 _cancel_lock(r, lkb) 25 26 do_request(r, lkb) 27 do_convert(r, lkb) 28 do_unlock(r, lkb) 29 do_cancel(r, lkb) 30 31 Stage 1 (lock, unlock) is mainly about checking input args and 32 splitting into one of the four main operations: 33 34 dlm_lock = request_lock 35 dlm_lock+CONVERT = convert_lock 36 dlm_unlock = unlock_lock 37 dlm_unlock+CANCEL = cancel_lock 38 39 Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is 40 provided to the next stage. 41 42 Stage 3, _xxxx_lock(), determines if the operation is local or remote. 43 When remote, it calls send_xxxx(), when local it calls do_xxxx(). 44 45 Stage 4, do_xxxx(), is the guts of the operation. It manipulates the 46 given rsb and lkb and queues callbacks. 47 48 For remote operations, send_xxxx() results in the corresponding do_xxxx() 49 function being executed on the remote node. The connecting send/receive 50 calls on local (L) and remote (R) nodes: 51 52 L: send_xxxx() -> R: receive_xxxx() 53 R: do_xxxx() 54 L: receive_xxxx_reply() <- R: send_xxxx_reply() 55 */ 56 #include <trace/events/dlm.h> 57 58 #include <linux/types.h> 59 #include <linux/rbtree.h> 60 #include <linux/slab.h> 61 #include "dlm_internal.h" 62 #include <linux/dlm_device.h> 63 #include "memory.h" 64 #include "midcomms.h" 65 #include "requestqueue.h" 66 #include "util.h" 67 #include "dir.h" 68 #include "member.h" 69 #include "lockspace.h" 70 #include "ast.h" 71 #include "lock.h" 72 #include "rcom.h" 73 #include "recover.h" 74 #include "lvb_table.h" 75 #include "user.h" 76 #include "config.h" 77 78 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb); 79 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb); 80 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb); 81 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb); 82 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb); 83 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode); 84 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb); 85 static int send_remove(struct dlm_rsb *r); 86 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb); 87 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb); 88 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, 89 struct dlm_message *ms, bool local); 90 static int receive_extralen(struct dlm_message *ms); 91 static void do_purge(struct dlm_ls *ls, int nodeid, int pid); 92 static void toss_rsb(struct kref *kref); 93 94 /* 95 * Lock compatibilty matrix - thanks Steve 96 * UN = Unlocked state. Not really a state, used as a flag 97 * PD = Padding. Used to make the matrix a nice power of two in size 98 * Other states are the same as the VMS DLM. 99 * Usage: matrix[grmode+1][rqmode+1] (although m[rq+1][gr+1] is the same) 100 */ 101 102 static const int __dlm_compat_matrix[8][8] = { 103 /* UN NL CR CW PR PW EX PD */ 104 {1, 1, 1, 1, 1, 1, 1, 0}, /* UN */ 105 {1, 1, 1, 1, 1, 1, 1, 0}, /* NL */ 106 {1, 1, 1, 1, 1, 1, 0, 0}, /* CR */ 107 {1, 1, 1, 1, 0, 0, 0, 0}, /* CW */ 108 {1, 1, 1, 0, 1, 0, 0, 0}, /* PR */ 109 {1, 1, 1, 0, 0, 0, 0, 0}, /* PW */ 110 {1, 1, 0, 0, 0, 0, 0, 0}, /* EX */ 111 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */ 112 }; 113 114 /* 115 * This defines the direction of transfer of LVB data. 116 * Granted mode is the row; requested mode is the column. 117 * Usage: matrix[grmode+1][rqmode+1] 118 * 1 = LVB is returned to the caller 119 * 0 = LVB is written to the resource 120 * -1 = nothing happens to the LVB 121 */ 122 123 const int dlm_lvb_operations[8][8] = { 124 /* UN NL CR CW PR PW EX PD*/ 125 { -1, 1, 1, 1, 1, 1, 1, -1 }, /* UN */ 126 { -1, 1, 1, 1, 1, 1, 1, 0 }, /* NL */ 127 { -1, -1, 1, 1, 1, 1, 1, 0 }, /* CR */ 128 { -1, -1, -1, 1, 1, 1, 1, 0 }, /* CW */ 129 { -1, -1, -1, -1, 1, 1, 1, 0 }, /* PR */ 130 { -1, 0, 0, 0, 0, 0, 1, 0 }, /* PW */ 131 { -1, 0, 0, 0, 0, 0, 0, 0 }, /* EX */ 132 { -1, 0, 0, 0, 0, 0, 0, 0 } /* PD */ 133 }; 134 135 #define modes_compat(gr, rq) \ 136 __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1] 137 138 int dlm_modes_compat(int mode1, int mode2) 139 { 140 return __dlm_compat_matrix[mode1 + 1][mode2 + 1]; 141 } 142 143 /* 144 * Compatibility matrix for conversions with QUECVT set. 145 * Granted mode is the row; requested mode is the column. 146 * Usage: matrix[grmode+1][rqmode+1] 147 */ 148 149 static const int __quecvt_compat_matrix[8][8] = { 150 /* UN NL CR CW PR PW EX PD */ 151 {0, 0, 0, 0, 0, 0, 0, 0}, /* UN */ 152 {0, 0, 1, 1, 1, 1, 1, 0}, /* NL */ 153 {0, 0, 0, 1, 1, 1, 1, 0}, /* CR */ 154 {0, 0, 0, 0, 1, 1, 1, 0}, /* CW */ 155 {0, 0, 0, 1, 0, 1, 1, 0}, /* PR */ 156 {0, 0, 0, 0, 0, 0, 1, 0}, /* PW */ 157 {0, 0, 0, 0, 0, 0, 0, 0}, /* EX */ 158 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */ 159 }; 160 161 void dlm_print_lkb(struct dlm_lkb *lkb) 162 { 163 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x " 164 "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n", 165 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags, 166 dlm_iflags_val(lkb), lkb->lkb_status, lkb->lkb_rqmode, 167 lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid, 168 (unsigned long long)lkb->lkb_recover_seq); 169 } 170 171 static void dlm_print_rsb(struct dlm_rsb *r) 172 { 173 printk(KERN_ERR "rsb: nodeid %d master %d dir %d flags %lx first %x " 174 "rlc %d name %s\n", 175 r->res_nodeid, r->res_master_nodeid, r->res_dir_nodeid, 176 r->res_flags, r->res_first_lkid, r->res_recover_locks_count, 177 r->res_name); 178 } 179 180 void dlm_dump_rsb(struct dlm_rsb *r) 181 { 182 struct dlm_lkb *lkb; 183 184 dlm_print_rsb(r); 185 186 printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n", 187 list_empty(&r->res_root_list), list_empty(&r->res_recover_list)); 188 printk(KERN_ERR "rsb lookup list\n"); 189 list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup) 190 dlm_print_lkb(lkb); 191 printk(KERN_ERR "rsb grant queue:\n"); 192 list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue) 193 dlm_print_lkb(lkb); 194 printk(KERN_ERR "rsb convert queue:\n"); 195 list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue) 196 dlm_print_lkb(lkb); 197 printk(KERN_ERR "rsb wait queue:\n"); 198 list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue) 199 dlm_print_lkb(lkb); 200 } 201 202 /* Threads cannot use the lockspace while it's being recovered */ 203 204 static inline void dlm_lock_recovery(struct dlm_ls *ls) 205 { 206 down_read(&ls->ls_in_recovery); 207 } 208 209 void dlm_unlock_recovery(struct dlm_ls *ls) 210 { 211 up_read(&ls->ls_in_recovery); 212 } 213 214 int dlm_lock_recovery_try(struct dlm_ls *ls) 215 { 216 return down_read_trylock(&ls->ls_in_recovery); 217 } 218 219 static inline int can_be_queued(struct dlm_lkb *lkb) 220 { 221 return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE); 222 } 223 224 static inline int force_blocking_asts(struct dlm_lkb *lkb) 225 { 226 return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST); 227 } 228 229 static inline int is_demoted(struct dlm_lkb *lkb) 230 { 231 return test_bit(DLM_SBF_DEMOTED_BIT, &lkb->lkb_sbflags); 232 } 233 234 static inline int is_altmode(struct dlm_lkb *lkb) 235 { 236 return test_bit(DLM_SBF_ALTMODE_BIT, &lkb->lkb_sbflags); 237 } 238 239 static inline int is_granted(struct dlm_lkb *lkb) 240 { 241 return (lkb->lkb_status == DLM_LKSTS_GRANTED); 242 } 243 244 static inline int is_remote(struct dlm_rsb *r) 245 { 246 DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r);); 247 return !!r->res_nodeid; 248 } 249 250 static inline int is_process_copy(struct dlm_lkb *lkb) 251 { 252 return lkb->lkb_nodeid && 253 !test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags); 254 } 255 256 static inline int is_master_copy(struct dlm_lkb *lkb) 257 { 258 return test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags); 259 } 260 261 static inline int middle_conversion(struct dlm_lkb *lkb) 262 { 263 if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) || 264 (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW)) 265 return 1; 266 return 0; 267 } 268 269 static inline int down_conversion(struct dlm_lkb *lkb) 270 { 271 return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode); 272 } 273 274 static inline int is_overlap_unlock(struct dlm_lkb *lkb) 275 { 276 return test_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags); 277 } 278 279 static inline int is_overlap_cancel(struct dlm_lkb *lkb) 280 { 281 return test_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags); 282 } 283 284 static inline int is_overlap(struct dlm_lkb *lkb) 285 { 286 return test_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags) || 287 test_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags); 288 } 289 290 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) 291 { 292 if (is_master_copy(lkb)) 293 return; 294 295 DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb);); 296 297 if (rv == -DLM_ECANCEL && 298 test_and_clear_bit(DLM_IFL_DEADLOCK_CANCEL_BIT, &lkb->lkb_iflags)) 299 rv = -EDEADLK; 300 301 dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, dlm_sbflags_val(lkb)); 302 } 303 304 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb) 305 { 306 queue_cast(r, lkb, 307 is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL); 308 } 309 310 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode) 311 { 312 if (is_master_copy(lkb)) { 313 send_bast(r, lkb, rqmode); 314 } else { 315 dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0); 316 } 317 } 318 319 /* 320 * Basic operations on rsb's and lkb's 321 */ 322 323 /* This is only called to add a reference when the code already holds 324 a valid reference to the rsb, so there's no need for locking. */ 325 326 static inline void hold_rsb(struct dlm_rsb *r) 327 { 328 kref_get(&r->res_ref); 329 } 330 331 void dlm_hold_rsb(struct dlm_rsb *r) 332 { 333 hold_rsb(r); 334 } 335 336 /* When all references to the rsb are gone it's transferred to 337 the tossed list for later disposal. */ 338 339 static void put_rsb(struct dlm_rsb *r) 340 { 341 struct dlm_ls *ls = r->res_ls; 342 uint32_t bucket = r->res_bucket; 343 int rv; 344 345 rv = kref_put_lock(&r->res_ref, toss_rsb, 346 &ls->ls_rsbtbl[bucket].lock); 347 if (rv) 348 spin_unlock(&ls->ls_rsbtbl[bucket].lock); 349 } 350 351 void dlm_put_rsb(struct dlm_rsb *r) 352 { 353 put_rsb(r); 354 } 355 356 static int pre_rsb_struct(struct dlm_ls *ls) 357 { 358 struct dlm_rsb *r1, *r2; 359 int count = 0; 360 361 spin_lock(&ls->ls_new_rsb_spin); 362 if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) { 363 spin_unlock(&ls->ls_new_rsb_spin); 364 return 0; 365 } 366 spin_unlock(&ls->ls_new_rsb_spin); 367 368 r1 = dlm_allocate_rsb(ls); 369 r2 = dlm_allocate_rsb(ls); 370 371 spin_lock(&ls->ls_new_rsb_spin); 372 if (r1) { 373 list_add(&r1->res_hashchain, &ls->ls_new_rsb); 374 ls->ls_new_rsb_count++; 375 } 376 if (r2) { 377 list_add(&r2->res_hashchain, &ls->ls_new_rsb); 378 ls->ls_new_rsb_count++; 379 } 380 count = ls->ls_new_rsb_count; 381 spin_unlock(&ls->ls_new_rsb_spin); 382 383 if (!count) 384 return -ENOMEM; 385 return 0; 386 } 387 388 /* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can 389 unlock any spinlocks, go back and call pre_rsb_struct again. 390 Otherwise, take an rsb off the list and return it. */ 391 392 static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len, 393 struct dlm_rsb **r_ret) 394 { 395 struct dlm_rsb *r; 396 int count; 397 398 spin_lock(&ls->ls_new_rsb_spin); 399 if (list_empty(&ls->ls_new_rsb)) { 400 count = ls->ls_new_rsb_count; 401 spin_unlock(&ls->ls_new_rsb_spin); 402 log_debug(ls, "find_rsb retry %d %d %s", 403 count, dlm_config.ci_new_rsb_count, 404 (const char *)name); 405 return -EAGAIN; 406 } 407 408 r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain); 409 list_del(&r->res_hashchain); 410 /* Convert the empty list_head to a NULL rb_node for tree usage: */ 411 memset(&r->res_hashnode, 0, sizeof(struct rb_node)); 412 ls->ls_new_rsb_count--; 413 spin_unlock(&ls->ls_new_rsb_spin); 414 415 r->res_ls = ls; 416 r->res_length = len; 417 memcpy(r->res_name, name, len); 418 mutex_init(&r->res_mutex); 419 420 INIT_LIST_HEAD(&r->res_lookup); 421 INIT_LIST_HEAD(&r->res_grantqueue); 422 INIT_LIST_HEAD(&r->res_convertqueue); 423 INIT_LIST_HEAD(&r->res_waitqueue); 424 INIT_LIST_HEAD(&r->res_root_list); 425 INIT_LIST_HEAD(&r->res_recover_list); 426 427 *r_ret = r; 428 return 0; 429 } 430 431 static int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen) 432 { 433 char maxname[DLM_RESNAME_MAXLEN]; 434 435 memset(maxname, 0, DLM_RESNAME_MAXLEN); 436 memcpy(maxname, name, nlen); 437 return memcmp(r->res_name, maxname, DLM_RESNAME_MAXLEN); 438 } 439 440 int dlm_search_rsb_tree(struct rb_root *tree, const void *name, int len, 441 struct dlm_rsb **r_ret) 442 { 443 struct rb_node *node = tree->rb_node; 444 struct dlm_rsb *r; 445 int rc; 446 447 while (node) { 448 r = rb_entry(node, struct dlm_rsb, res_hashnode); 449 rc = rsb_cmp(r, name, len); 450 if (rc < 0) 451 node = node->rb_left; 452 else if (rc > 0) 453 node = node->rb_right; 454 else 455 goto found; 456 } 457 *r_ret = NULL; 458 return -EBADR; 459 460 found: 461 *r_ret = r; 462 return 0; 463 } 464 465 static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree) 466 { 467 struct rb_node **newn = &tree->rb_node; 468 struct rb_node *parent = NULL; 469 int rc; 470 471 while (*newn) { 472 struct dlm_rsb *cur = rb_entry(*newn, struct dlm_rsb, 473 res_hashnode); 474 475 parent = *newn; 476 rc = rsb_cmp(cur, rsb->res_name, rsb->res_length); 477 if (rc < 0) 478 newn = &parent->rb_left; 479 else if (rc > 0) 480 newn = &parent->rb_right; 481 else { 482 log_print("rsb_insert match"); 483 dlm_dump_rsb(rsb); 484 dlm_dump_rsb(cur); 485 return -EEXIST; 486 } 487 } 488 489 rb_link_node(&rsb->res_hashnode, parent, newn); 490 rb_insert_color(&rsb->res_hashnode, tree); 491 return 0; 492 } 493 494 /* 495 * Find rsb in rsbtbl and potentially create/add one 496 * 497 * Delaying the release of rsb's has a similar benefit to applications keeping 498 * NL locks on an rsb, but without the guarantee that the cached master value 499 * will still be valid when the rsb is reused. Apps aren't always smart enough 500 * to keep NL locks on an rsb that they may lock again shortly; this can lead 501 * to excessive master lookups and removals if we don't delay the release. 502 * 503 * Searching for an rsb means looking through both the normal list and toss 504 * list. When found on the toss list the rsb is moved to the normal list with 505 * ref count of 1; when found on normal list the ref count is incremented. 506 * 507 * rsb's on the keep list are being used locally and refcounted. 508 * rsb's on the toss list are not being used locally, and are not refcounted. 509 * 510 * The toss list rsb's were either 511 * - previously used locally but not any more (were on keep list, then 512 * moved to toss list when last refcount dropped) 513 * - created and put on toss list as a directory record for a lookup 514 * (we are the dir node for the res, but are not using the res right now, 515 * but some other node is) 516 * 517 * The purpose of find_rsb() is to return a refcounted rsb for local use. 518 * So, if the given rsb is on the toss list, it is moved to the keep list 519 * before being returned. 520 * 521 * toss_rsb() happens when all local usage of the rsb is done, i.e. no 522 * more refcounts exist, so the rsb is moved from the keep list to the 523 * toss list. 524 * 525 * rsb's on both keep and toss lists are used for doing a name to master 526 * lookups. rsb's that are in use locally (and being refcounted) are on 527 * the keep list, rsb's that are not in use locally (not refcounted) and 528 * only exist for name/master lookups are on the toss list. 529 * 530 * rsb's on the toss list who's dir_nodeid is not local can have stale 531 * name/master mappings. So, remote requests on such rsb's can potentially 532 * return with an error, which means the mapping is stale and needs to 533 * be updated with a new lookup. (The idea behind MASTER UNCERTAIN and 534 * first_lkid is to keep only a single outstanding request on an rsb 535 * while that rsb has a potentially stale master.) 536 */ 537 538 static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, 539 uint32_t hash, uint32_t b, 540 int dir_nodeid, int from_nodeid, 541 unsigned int flags, struct dlm_rsb **r_ret) 542 { 543 struct dlm_rsb *r = NULL; 544 int our_nodeid = dlm_our_nodeid(); 545 int from_local = 0; 546 int from_other = 0; 547 int from_dir = 0; 548 int create = 0; 549 int error; 550 551 if (flags & R_RECEIVE_REQUEST) { 552 if (from_nodeid == dir_nodeid) 553 from_dir = 1; 554 else 555 from_other = 1; 556 } else if (flags & R_REQUEST) { 557 from_local = 1; 558 } 559 560 /* 561 * flags & R_RECEIVE_RECOVER is from dlm_recover_master_copy, so 562 * from_nodeid has sent us a lock in dlm_recover_locks, believing 563 * we're the new master. Our local recovery may not have set 564 * res_master_nodeid to our_nodeid yet, so allow either. Don't 565 * create the rsb; dlm_recover_process_copy() will handle EBADR 566 * by resending. 567 * 568 * If someone sends us a request, we are the dir node, and we do 569 * not find the rsb anywhere, then recreate it. This happens if 570 * someone sends us a request after we have removed/freed an rsb 571 * from our toss list. (They sent a request instead of lookup 572 * because they are using an rsb from their toss list.) 573 */ 574 575 if (from_local || from_dir || 576 (from_other && (dir_nodeid == our_nodeid))) { 577 create = 1; 578 } 579 580 retry: 581 if (create) { 582 error = pre_rsb_struct(ls); 583 if (error < 0) 584 goto out; 585 } 586 587 spin_lock(&ls->ls_rsbtbl[b].lock); 588 589 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); 590 if (error) 591 goto do_toss; 592 593 /* 594 * rsb is active, so we can't check master_nodeid without lock_rsb. 595 */ 596 597 kref_get(&r->res_ref); 598 goto out_unlock; 599 600 601 do_toss: 602 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); 603 if (error) 604 goto do_new; 605 606 /* 607 * rsb found inactive (master_nodeid may be out of date unless 608 * we are the dir_nodeid or were the master) No other thread 609 * is using this rsb because it's on the toss list, so we can 610 * look at or update res_master_nodeid without lock_rsb. 611 */ 612 613 if ((r->res_master_nodeid != our_nodeid) && from_other) { 614 /* our rsb was not master, and another node (not the dir node) 615 has sent us a request */ 616 log_debug(ls, "find_rsb toss from_other %d master %d dir %d %s", 617 from_nodeid, r->res_master_nodeid, dir_nodeid, 618 r->res_name); 619 error = -ENOTBLK; 620 goto out_unlock; 621 } 622 623 if ((r->res_master_nodeid != our_nodeid) && from_dir) { 624 /* don't think this should ever happen */ 625 log_error(ls, "find_rsb toss from_dir %d master %d", 626 from_nodeid, r->res_master_nodeid); 627 dlm_print_rsb(r); 628 /* fix it and go on */ 629 r->res_master_nodeid = our_nodeid; 630 r->res_nodeid = 0; 631 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN); 632 r->res_first_lkid = 0; 633 } 634 635 if (from_local && (r->res_master_nodeid != our_nodeid)) { 636 /* Because we have held no locks on this rsb, 637 res_master_nodeid could have become stale. */ 638 rsb_set_flag(r, RSB_MASTER_UNCERTAIN); 639 r->res_first_lkid = 0; 640 } 641 642 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); 643 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep); 644 goto out_unlock; 645 646 647 do_new: 648 /* 649 * rsb not found 650 */ 651 652 if (error == -EBADR && !create) 653 goto out_unlock; 654 655 error = get_rsb_struct(ls, name, len, &r); 656 if (error == -EAGAIN) { 657 spin_unlock(&ls->ls_rsbtbl[b].lock); 658 goto retry; 659 } 660 if (error) 661 goto out_unlock; 662 663 r->res_hash = hash; 664 r->res_bucket = b; 665 r->res_dir_nodeid = dir_nodeid; 666 kref_init(&r->res_ref); 667 668 if (from_dir) { 669 /* want to see how often this happens */ 670 log_debug(ls, "find_rsb new from_dir %d recreate %s", 671 from_nodeid, r->res_name); 672 r->res_master_nodeid = our_nodeid; 673 r->res_nodeid = 0; 674 goto out_add; 675 } 676 677 if (from_other && (dir_nodeid != our_nodeid)) { 678 /* should never happen */ 679 log_error(ls, "find_rsb new from_other %d dir %d our %d %s", 680 from_nodeid, dir_nodeid, our_nodeid, r->res_name); 681 dlm_free_rsb(r); 682 r = NULL; 683 error = -ENOTBLK; 684 goto out_unlock; 685 } 686 687 if (from_other) { 688 log_debug(ls, "find_rsb new from_other %d dir %d %s", 689 from_nodeid, dir_nodeid, r->res_name); 690 } 691 692 if (dir_nodeid == our_nodeid) { 693 /* When we are the dir nodeid, we can set the master 694 node immediately */ 695 r->res_master_nodeid = our_nodeid; 696 r->res_nodeid = 0; 697 } else { 698 /* set_master will send_lookup to dir_nodeid */ 699 r->res_master_nodeid = 0; 700 r->res_nodeid = -1; 701 } 702 703 out_add: 704 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep); 705 out_unlock: 706 spin_unlock(&ls->ls_rsbtbl[b].lock); 707 out: 708 *r_ret = r; 709 return error; 710 } 711 712 /* During recovery, other nodes can send us new MSTCPY locks (from 713 dlm_recover_locks) before we've made ourself master (in 714 dlm_recover_masters). */ 715 716 static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len, 717 uint32_t hash, uint32_t b, 718 int dir_nodeid, int from_nodeid, 719 unsigned int flags, struct dlm_rsb **r_ret) 720 { 721 struct dlm_rsb *r = NULL; 722 int our_nodeid = dlm_our_nodeid(); 723 int recover = (flags & R_RECEIVE_RECOVER); 724 int error; 725 726 retry: 727 error = pre_rsb_struct(ls); 728 if (error < 0) 729 goto out; 730 731 spin_lock(&ls->ls_rsbtbl[b].lock); 732 733 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); 734 if (error) 735 goto do_toss; 736 737 /* 738 * rsb is active, so we can't check master_nodeid without lock_rsb. 739 */ 740 741 kref_get(&r->res_ref); 742 goto out_unlock; 743 744 745 do_toss: 746 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); 747 if (error) 748 goto do_new; 749 750 /* 751 * rsb found inactive. No other thread is using this rsb because 752 * it's on the toss list, so we can look at or update 753 * res_master_nodeid without lock_rsb. 754 */ 755 756 if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) { 757 /* our rsb is not master, and another node has sent us a 758 request; this should never happen */ 759 log_error(ls, "find_rsb toss from_nodeid %d master %d dir %d", 760 from_nodeid, r->res_master_nodeid, dir_nodeid); 761 dlm_print_rsb(r); 762 error = -ENOTBLK; 763 goto out_unlock; 764 } 765 766 if (!recover && (r->res_master_nodeid != our_nodeid) && 767 (dir_nodeid == our_nodeid)) { 768 /* our rsb is not master, and we are dir; may as well fix it; 769 this should never happen */ 770 log_error(ls, "find_rsb toss our %d master %d dir %d", 771 our_nodeid, r->res_master_nodeid, dir_nodeid); 772 dlm_print_rsb(r); 773 r->res_master_nodeid = our_nodeid; 774 r->res_nodeid = 0; 775 } 776 777 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); 778 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep); 779 goto out_unlock; 780 781 782 do_new: 783 /* 784 * rsb not found 785 */ 786 787 error = get_rsb_struct(ls, name, len, &r); 788 if (error == -EAGAIN) { 789 spin_unlock(&ls->ls_rsbtbl[b].lock); 790 goto retry; 791 } 792 if (error) 793 goto out_unlock; 794 795 r->res_hash = hash; 796 r->res_bucket = b; 797 r->res_dir_nodeid = dir_nodeid; 798 r->res_master_nodeid = dir_nodeid; 799 r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid; 800 kref_init(&r->res_ref); 801 802 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep); 803 out_unlock: 804 spin_unlock(&ls->ls_rsbtbl[b].lock); 805 out: 806 *r_ret = r; 807 return error; 808 } 809 810 static int find_rsb(struct dlm_ls *ls, const void *name, int len, 811 int from_nodeid, unsigned int flags, 812 struct dlm_rsb **r_ret) 813 { 814 uint32_t hash, b; 815 int dir_nodeid; 816 817 if (len > DLM_RESNAME_MAXLEN) 818 return -EINVAL; 819 820 hash = jhash(name, len, 0); 821 b = hash & (ls->ls_rsbtbl_size - 1); 822 823 dir_nodeid = dlm_hash2nodeid(ls, hash); 824 825 if (dlm_no_directory(ls)) 826 return find_rsb_nodir(ls, name, len, hash, b, dir_nodeid, 827 from_nodeid, flags, r_ret); 828 else 829 return find_rsb_dir(ls, name, len, hash, b, dir_nodeid, 830 from_nodeid, flags, r_ret); 831 } 832 833 /* we have received a request and found that res_master_nodeid != our_nodeid, 834 so we need to return an error or make ourself the master */ 835 836 static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r, 837 int from_nodeid) 838 { 839 if (dlm_no_directory(ls)) { 840 log_error(ls, "find_rsb keep from_nodeid %d master %d dir %d", 841 from_nodeid, r->res_master_nodeid, 842 r->res_dir_nodeid); 843 dlm_print_rsb(r); 844 return -ENOTBLK; 845 } 846 847 if (from_nodeid != r->res_dir_nodeid) { 848 /* our rsb is not master, and another node (not the dir node) 849 has sent us a request. this is much more common when our 850 master_nodeid is zero, so limit debug to non-zero. */ 851 852 if (r->res_master_nodeid) { 853 log_debug(ls, "validate master from_other %d master %d " 854 "dir %d first %x %s", from_nodeid, 855 r->res_master_nodeid, r->res_dir_nodeid, 856 r->res_first_lkid, r->res_name); 857 } 858 return -ENOTBLK; 859 } else { 860 /* our rsb is not master, but the dir nodeid has sent us a 861 request; this could happen with master 0 / res_nodeid -1 */ 862 863 if (r->res_master_nodeid) { 864 log_error(ls, "validate master from_dir %d master %d " 865 "first %x %s", 866 from_nodeid, r->res_master_nodeid, 867 r->res_first_lkid, r->res_name); 868 } 869 870 r->res_master_nodeid = dlm_our_nodeid(); 871 r->res_nodeid = 0; 872 return 0; 873 } 874 } 875 876 static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_nodeid, 877 int from_nodeid, bool toss_list, unsigned int flags, 878 int *r_nodeid, int *result) 879 { 880 int fix_master = (flags & DLM_LU_RECOVER_MASTER); 881 int from_master = (flags & DLM_LU_RECOVER_DIR); 882 883 if (r->res_dir_nodeid != our_nodeid) { 884 /* should not happen, but may as well fix it and carry on */ 885 log_error(ls, "%s res_dir %d our %d %s", __func__, 886 r->res_dir_nodeid, our_nodeid, r->res_name); 887 r->res_dir_nodeid = our_nodeid; 888 } 889 890 if (fix_master && dlm_is_removed(ls, r->res_master_nodeid)) { 891 /* Recovery uses this function to set a new master when 892 * the previous master failed. Setting NEW_MASTER will 893 * force dlm_recover_masters to call recover_master on this 894 * rsb even though the res_nodeid is no longer removed. 895 */ 896 897 r->res_master_nodeid = from_nodeid; 898 r->res_nodeid = from_nodeid; 899 rsb_set_flag(r, RSB_NEW_MASTER); 900 901 if (toss_list) { 902 /* I don't think we should ever find it on toss list. */ 903 log_error(ls, "%s fix_master on toss", __func__); 904 dlm_dump_rsb(r); 905 } 906 } 907 908 if (from_master && (r->res_master_nodeid != from_nodeid)) { 909 /* this will happen if from_nodeid became master during 910 * a previous recovery cycle, and we aborted the previous 911 * cycle before recovering this master value 912 */ 913 914 log_limit(ls, "%s from_master %d master_nodeid %d res_nodeid %d first %x %s", 915 __func__, from_nodeid, r->res_master_nodeid, 916 r->res_nodeid, r->res_first_lkid, r->res_name); 917 918 if (r->res_master_nodeid == our_nodeid) { 919 log_error(ls, "from_master %d our_master", from_nodeid); 920 dlm_dump_rsb(r); 921 goto ret_assign; 922 } 923 924 r->res_master_nodeid = from_nodeid; 925 r->res_nodeid = from_nodeid; 926 rsb_set_flag(r, RSB_NEW_MASTER); 927 } 928 929 if (!r->res_master_nodeid) { 930 /* this will happen if recovery happens while we're looking 931 * up the master for this rsb 932 */ 933 934 log_debug(ls, "%s master 0 to %d first %x %s", __func__, 935 from_nodeid, r->res_first_lkid, r->res_name); 936 r->res_master_nodeid = from_nodeid; 937 r->res_nodeid = from_nodeid; 938 } 939 940 if (!from_master && !fix_master && 941 (r->res_master_nodeid == from_nodeid)) { 942 /* this can happen when the master sends remove, the dir node 943 * finds the rsb on the keep list and ignores the remove, 944 * and the former master sends a lookup 945 */ 946 947 log_limit(ls, "%s from master %d flags %x first %x %s", 948 __func__, from_nodeid, flags, r->res_first_lkid, 949 r->res_name); 950 } 951 952 ret_assign: 953 *r_nodeid = r->res_master_nodeid; 954 if (result) 955 *result = DLM_LU_MATCH; 956 } 957 958 /* 959 * We're the dir node for this res and another node wants to know the 960 * master nodeid. During normal operation (non recovery) this is only 961 * called from receive_lookup(); master lookups when the local node is 962 * the dir node are done by find_rsb(). 963 * 964 * normal operation, we are the dir node for a resource 965 * . _request_lock 966 * . set_master 967 * . send_lookup 968 * . receive_lookup 969 * . dlm_master_lookup flags 0 970 * 971 * recover directory, we are rebuilding dir for all resources 972 * . dlm_recover_directory 973 * . dlm_rcom_names 974 * remote node sends back the rsb names it is master of and we are dir of 975 * . dlm_master_lookup RECOVER_DIR (fix_master 0, from_master 1) 976 * we either create new rsb setting remote node as master, or find existing 977 * rsb and set master to be the remote node. 978 * 979 * recover masters, we are finding the new master for resources 980 * . dlm_recover_masters 981 * . recover_master 982 * . dlm_send_rcom_lookup 983 * . receive_rcom_lookup 984 * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0) 985 */ 986 987 int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, char *name, int len, 988 unsigned int flags, int *r_nodeid, int *result) 989 { 990 struct dlm_rsb *r = NULL; 991 uint32_t hash, b; 992 int our_nodeid = dlm_our_nodeid(); 993 int dir_nodeid, error; 994 995 if (len > DLM_RESNAME_MAXLEN) 996 return -EINVAL; 997 998 if (from_nodeid == our_nodeid) { 999 log_error(ls, "dlm_master_lookup from our_nodeid %d flags %x", 1000 our_nodeid, flags); 1001 return -EINVAL; 1002 } 1003 1004 hash = jhash(name, len, 0); 1005 b = hash & (ls->ls_rsbtbl_size - 1); 1006 1007 dir_nodeid = dlm_hash2nodeid(ls, hash); 1008 if (dir_nodeid != our_nodeid) { 1009 log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d", 1010 from_nodeid, dir_nodeid, our_nodeid, hash, 1011 ls->ls_num_nodes); 1012 *r_nodeid = -1; 1013 return -EINVAL; 1014 } 1015 1016 retry: 1017 error = pre_rsb_struct(ls); 1018 if (error < 0) 1019 return error; 1020 1021 spin_lock(&ls->ls_rsbtbl[b].lock); 1022 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); 1023 if (!error) { 1024 /* because the rsb is active, we need to lock_rsb before 1025 * checking/changing re_master_nodeid 1026 */ 1027 1028 hold_rsb(r); 1029 spin_unlock(&ls->ls_rsbtbl[b].lock); 1030 lock_rsb(r); 1031 1032 __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, false, 1033 flags, r_nodeid, result); 1034 1035 /* the rsb was active */ 1036 unlock_rsb(r); 1037 put_rsb(r); 1038 1039 return 0; 1040 } 1041 1042 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); 1043 if (error) 1044 goto not_found; 1045 1046 /* because the rsb is inactive (on toss list), it's not refcounted 1047 * and lock_rsb is not used, but is protected by the rsbtbl lock 1048 */ 1049 1050 __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, true, flags, 1051 r_nodeid, result); 1052 1053 r->res_toss_time = jiffies; 1054 /* the rsb was inactive (on toss list) */ 1055 spin_unlock(&ls->ls_rsbtbl[b].lock); 1056 1057 return 0; 1058 1059 not_found: 1060 error = get_rsb_struct(ls, name, len, &r); 1061 if (error == -EAGAIN) { 1062 spin_unlock(&ls->ls_rsbtbl[b].lock); 1063 goto retry; 1064 } 1065 if (error) 1066 goto out_unlock; 1067 1068 r->res_hash = hash; 1069 r->res_bucket = b; 1070 r->res_dir_nodeid = our_nodeid; 1071 r->res_master_nodeid = from_nodeid; 1072 r->res_nodeid = from_nodeid; 1073 kref_init(&r->res_ref); 1074 r->res_toss_time = jiffies; 1075 1076 error = rsb_insert(r, &ls->ls_rsbtbl[b].toss); 1077 if (error) { 1078 /* should never happen */ 1079 dlm_free_rsb(r); 1080 spin_unlock(&ls->ls_rsbtbl[b].lock); 1081 goto retry; 1082 } 1083 1084 if (result) 1085 *result = DLM_LU_ADD; 1086 *r_nodeid = from_nodeid; 1087 out_unlock: 1088 spin_unlock(&ls->ls_rsbtbl[b].lock); 1089 return error; 1090 } 1091 1092 static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash) 1093 { 1094 struct rb_node *n; 1095 struct dlm_rsb *r; 1096 int i; 1097 1098 for (i = 0; i < ls->ls_rsbtbl_size; i++) { 1099 spin_lock(&ls->ls_rsbtbl[i].lock); 1100 for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) { 1101 r = rb_entry(n, struct dlm_rsb, res_hashnode); 1102 if (r->res_hash == hash) 1103 dlm_dump_rsb(r); 1104 } 1105 spin_unlock(&ls->ls_rsbtbl[i].lock); 1106 } 1107 } 1108 1109 void dlm_dump_rsb_name(struct dlm_ls *ls, char *name, int len) 1110 { 1111 struct dlm_rsb *r = NULL; 1112 uint32_t hash, b; 1113 int error; 1114 1115 hash = jhash(name, len, 0); 1116 b = hash & (ls->ls_rsbtbl_size - 1); 1117 1118 spin_lock(&ls->ls_rsbtbl[b].lock); 1119 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); 1120 if (!error) 1121 goto out_dump; 1122 1123 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); 1124 if (error) 1125 goto out; 1126 out_dump: 1127 dlm_dump_rsb(r); 1128 out: 1129 spin_unlock(&ls->ls_rsbtbl[b].lock); 1130 } 1131 1132 static void toss_rsb(struct kref *kref) 1133 { 1134 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref); 1135 struct dlm_ls *ls = r->res_ls; 1136 1137 DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r);); 1138 kref_init(&r->res_ref); 1139 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep); 1140 rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss); 1141 r->res_toss_time = jiffies; 1142 set_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[r->res_bucket].flags); 1143 if (r->res_lvbptr) { 1144 dlm_free_lvb(r->res_lvbptr); 1145 r->res_lvbptr = NULL; 1146 } 1147 } 1148 1149 /* See comment for unhold_lkb */ 1150 1151 static void unhold_rsb(struct dlm_rsb *r) 1152 { 1153 int rv; 1154 rv = kref_put(&r->res_ref, toss_rsb); 1155 DLM_ASSERT(!rv, dlm_dump_rsb(r);); 1156 } 1157 1158 static void kill_rsb(struct kref *kref) 1159 { 1160 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref); 1161 1162 /* All work is done after the return from kref_put() so we 1163 can release the write_lock before the remove and free. */ 1164 1165 DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r);); 1166 DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r);); 1167 DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r);); 1168 DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r);); 1169 DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r);); 1170 DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r);); 1171 } 1172 1173 /* Attaching/detaching lkb's from rsb's is for rsb reference counting. 1174 The rsb must exist as long as any lkb's for it do. */ 1175 1176 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb) 1177 { 1178 hold_rsb(r); 1179 lkb->lkb_resource = r; 1180 } 1181 1182 static void detach_lkb(struct dlm_lkb *lkb) 1183 { 1184 if (lkb->lkb_resource) { 1185 put_rsb(lkb->lkb_resource); 1186 lkb->lkb_resource = NULL; 1187 } 1188 } 1189 1190 static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret, 1191 int start, int end) 1192 { 1193 struct dlm_lkb *lkb; 1194 int rv; 1195 1196 lkb = dlm_allocate_lkb(ls); 1197 if (!lkb) 1198 return -ENOMEM; 1199 1200 lkb->lkb_last_bast_mode = -1; 1201 lkb->lkb_nodeid = -1; 1202 lkb->lkb_grmode = DLM_LOCK_IV; 1203 kref_init(&lkb->lkb_ref); 1204 INIT_LIST_HEAD(&lkb->lkb_ownqueue); 1205 INIT_LIST_HEAD(&lkb->lkb_rsb_lookup); 1206 INIT_LIST_HEAD(&lkb->lkb_cb_list); 1207 INIT_LIST_HEAD(&lkb->lkb_callbacks); 1208 spin_lock_init(&lkb->lkb_cb_lock); 1209 INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work); 1210 1211 idr_preload(GFP_NOFS); 1212 spin_lock(&ls->ls_lkbidr_spin); 1213 rv = idr_alloc(&ls->ls_lkbidr, lkb, start, end, GFP_NOWAIT); 1214 if (rv >= 0) 1215 lkb->lkb_id = rv; 1216 spin_unlock(&ls->ls_lkbidr_spin); 1217 idr_preload_end(); 1218 1219 if (rv < 0) { 1220 log_error(ls, "create_lkb idr error %d", rv); 1221 dlm_free_lkb(lkb); 1222 return rv; 1223 } 1224 1225 *lkb_ret = lkb; 1226 return 0; 1227 } 1228 1229 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret) 1230 { 1231 return _create_lkb(ls, lkb_ret, 1, 0); 1232 } 1233 1234 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret) 1235 { 1236 struct dlm_lkb *lkb; 1237 1238 spin_lock(&ls->ls_lkbidr_spin); 1239 lkb = idr_find(&ls->ls_lkbidr, lkid); 1240 if (lkb) 1241 kref_get(&lkb->lkb_ref); 1242 spin_unlock(&ls->ls_lkbidr_spin); 1243 1244 *lkb_ret = lkb; 1245 return lkb ? 0 : -ENOENT; 1246 } 1247 1248 static void kill_lkb(struct kref *kref) 1249 { 1250 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref); 1251 1252 /* All work is done after the return from kref_put() so we 1253 can release the write_lock before the detach_lkb */ 1254 1255 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb);); 1256 } 1257 1258 /* __put_lkb() is used when an lkb may not have an rsb attached to 1259 it so we need to provide the lockspace explicitly */ 1260 1261 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb) 1262 { 1263 uint32_t lkid = lkb->lkb_id; 1264 int rv; 1265 1266 rv = kref_put_lock(&lkb->lkb_ref, kill_lkb, 1267 &ls->ls_lkbidr_spin); 1268 if (rv) { 1269 idr_remove(&ls->ls_lkbidr, lkid); 1270 spin_unlock(&ls->ls_lkbidr_spin); 1271 1272 detach_lkb(lkb); 1273 1274 /* for local/process lkbs, lvbptr points to caller's lksb */ 1275 if (lkb->lkb_lvbptr && is_master_copy(lkb)) 1276 dlm_free_lvb(lkb->lkb_lvbptr); 1277 dlm_free_lkb(lkb); 1278 } 1279 1280 return rv; 1281 } 1282 1283 int dlm_put_lkb(struct dlm_lkb *lkb) 1284 { 1285 struct dlm_ls *ls; 1286 1287 DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb);); 1288 DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb);); 1289 1290 ls = lkb->lkb_resource->res_ls; 1291 return __put_lkb(ls, lkb); 1292 } 1293 1294 /* This is only called to add a reference when the code already holds 1295 a valid reference to the lkb, so there's no need for locking. */ 1296 1297 static inline void hold_lkb(struct dlm_lkb *lkb) 1298 { 1299 kref_get(&lkb->lkb_ref); 1300 } 1301 1302 static void unhold_lkb_assert(struct kref *kref) 1303 { 1304 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref); 1305 1306 DLM_ASSERT(false, dlm_print_lkb(lkb);); 1307 } 1308 1309 /* This is called when we need to remove a reference and are certain 1310 it's not the last ref. e.g. del_lkb is always called between a 1311 find_lkb/put_lkb and is always the inverse of a previous add_lkb. 1312 put_lkb would work fine, but would involve unnecessary locking */ 1313 1314 static inline void unhold_lkb(struct dlm_lkb *lkb) 1315 { 1316 kref_put(&lkb->lkb_ref, unhold_lkb_assert); 1317 } 1318 1319 static void lkb_add_ordered(struct list_head *new, struct list_head *head, 1320 int mode) 1321 { 1322 struct dlm_lkb *lkb = NULL, *iter; 1323 1324 list_for_each_entry(iter, head, lkb_statequeue) 1325 if (iter->lkb_rqmode < mode) { 1326 lkb = iter; 1327 list_add_tail(new, &iter->lkb_statequeue); 1328 break; 1329 } 1330 1331 if (!lkb) 1332 list_add_tail(new, head); 1333 } 1334 1335 /* add/remove lkb to rsb's grant/convert/wait queue */ 1336 1337 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status) 1338 { 1339 kref_get(&lkb->lkb_ref); 1340 1341 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb);); 1342 1343 lkb->lkb_timestamp = ktime_get(); 1344 1345 lkb->lkb_status = status; 1346 1347 switch (status) { 1348 case DLM_LKSTS_WAITING: 1349 if (lkb->lkb_exflags & DLM_LKF_HEADQUE) 1350 list_add(&lkb->lkb_statequeue, &r->res_waitqueue); 1351 else 1352 list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue); 1353 break; 1354 case DLM_LKSTS_GRANTED: 1355 /* convention says granted locks kept in order of grmode */ 1356 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue, 1357 lkb->lkb_grmode); 1358 break; 1359 case DLM_LKSTS_CONVERT: 1360 if (lkb->lkb_exflags & DLM_LKF_HEADQUE) 1361 list_add(&lkb->lkb_statequeue, &r->res_convertqueue); 1362 else 1363 list_add_tail(&lkb->lkb_statequeue, 1364 &r->res_convertqueue); 1365 break; 1366 default: 1367 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status);); 1368 } 1369 } 1370 1371 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb) 1372 { 1373 lkb->lkb_status = 0; 1374 list_del(&lkb->lkb_statequeue); 1375 unhold_lkb(lkb); 1376 } 1377 1378 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts) 1379 { 1380 hold_lkb(lkb); 1381 del_lkb(r, lkb); 1382 add_lkb(r, lkb, sts); 1383 unhold_lkb(lkb); 1384 } 1385 1386 static int msg_reply_type(int mstype) 1387 { 1388 switch (mstype) { 1389 case DLM_MSG_REQUEST: 1390 return DLM_MSG_REQUEST_REPLY; 1391 case DLM_MSG_CONVERT: 1392 return DLM_MSG_CONVERT_REPLY; 1393 case DLM_MSG_UNLOCK: 1394 return DLM_MSG_UNLOCK_REPLY; 1395 case DLM_MSG_CANCEL: 1396 return DLM_MSG_CANCEL_REPLY; 1397 case DLM_MSG_LOOKUP: 1398 return DLM_MSG_LOOKUP_REPLY; 1399 } 1400 return -1; 1401 } 1402 1403 /* add/remove lkb from global waiters list of lkb's waiting for 1404 a reply from a remote node */ 1405 1406 static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid) 1407 { 1408 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 1409 int error = 0; 1410 1411 mutex_lock(&ls->ls_waiters_mutex); 1412 1413 if (is_overlap_unlock(lkb) || 1414 (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) { 1415 error = -EINVAL; 1416 goto out; 1417 } 1418 1419 if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) { 1420 switch (mstype) { 1421 case DLM_MSG_UNLOCK: 1422 set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags); 1423 break; 1424 case DLM_MSG_CANCEL: 1425 set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags); 1426 break; 1427 default: 1428 error = -EBUSY; 1429 goto out; 1430 } 1431 lkb->lkb_wait_count++; 1432 hold_lkb(lkb); 1433 1434 log_debug(ls, "addwait %x cur %d overlap %d count %d f %x", 1435 lkb->lkb_id, lkb->lkb_wait_type, mstype, 1436 lkb->lkb_wait_count, dlm_iflags_val(lkb)); 1437 goto out; 1438 } 1439 1440 DLM_ASSERT(!lkb->lkb_wait_count, 1441 dlm_print_lkb(lkb); 1442 printk("wait_count %d\n", lkb->lkb_wait_count);); 1443 1444 lkb->lkb_wait_count++; 1445 lkb->lkb_wait_type = mstype; 1446 lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */ 1447 hold_lkb(lkb); 1448 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters); 1449 out: 1450 if (error) 1451 log_error(ls, "addwait error %x %d flags %x %d %d %s", 1452 lkb->lkb_id, error, dlm_iflags_val(lkb), mstype, 1453 lkb->lkb_wait_type, lkb->lkb_resource->res_name); 1454 mutex_unlock(&ls->ls_waiters_mutex); 1455 return error; 1456 } 1457 1458 /* We clear the RESEND flag because we might be taking an lkb off the waiters 1459 list as part of process_requestqueue (e.g. a lookup that has an optimized 1460 request reply on the requestqueue) between dlm_recover_waiters_pre() which 1461 set RESEND and dlm_recover_waiters_post() */ 1462 1463 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype, 1464 struct dlm_message *ms) 1465 { 1466 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 1467 int overlap_done = 0; 1468 1469 if (mstype == DLM_MSG_UNLOCK_REPLY && 1470 test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) { 1471 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id); 1472 overlap_done = 1; 1473 goto out_del; 1474 } 1475 1476 if (mstype == DLM_MSG_CANCEL_REPLY && 1477 test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags)) { 1478 log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id); 1479 overlap_done = 1; 1480 goto out_del; 1481 } 1482 1483 /* Cancel state was preemptively cleared by a successful convert, 1484 see next comment, nothing to do. */ 1485 1486 if ((mstype == DLM_MSG_CANCEL_REPLY) && 1487 (lkb->lkb_wait_type != DLM_MSG_CANCEL)) { 1488 log_debug(ls, "remwait %x cancel_reply wait_type %d", 1489 lkb->lkb_id, lkb->lkb_wait_type); 1490 return -1; 1491 } 1492 1493 /* Remove for the convert reply, and premptively remove for the 1494 cancel reply. A convert has been granted while there's still 1495 an outstanding cancel on it (the cancel is moot and the result 1496 in the cancel reply should be 0). We preempt the cancel reply 1497 because the app gets the convert result and then can follow up 1498 with another op, like convert. This subsequent op would see the 1499 lingering state of the cancel and fail with -EBUSY. */ 1500 1501 if ((mstype == DLM_MSG_CONVERT_REPLY) && 1502 (lkb->lkb_wait_type == DLM_MSG_CONVERT) && ms && !ms->m_result && 1503 test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags)) { 1504 log_debug(ls, "remwait %x convert_reply zap overlap_cancel", 1505 lkb->lkb_id); 1506 lkb->lkb_wait_type = 0; 1507 lkb->lkb_wait_count--; 1508 unhold_lkb(lkb); 1509 goto out_del; 1510 } 1511 1512 /* N.B. type of reply may not always correspond to type of original 1513 msg due to lookup->request optimization, verify others? */ 1514 1515 if (lkb->lkb_wait_type) { 1516 lkb->lkb_wait_type = 0; 1517 goto out_del; 1518 } 1519 1520 log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait", 1521 lkb->lkb_id, ms ? le32_to_cpu(ms->m_header.h_nodeid) : 0, 1522 lkb->lkb_remid, mstype, dlm_iflags_val(lkb)); 1523 return -1; 1524 1525 out_del: 1526 /* the force-unlock/cancel has completed and we haven't recvd a reply 1527 to the op that was in progress prior to the unlock/cancel; we 1528 give up on any reply to the earlier op. FIXME: not sure when/how 1529 this would happen */ 1530 1531 if (overlap_done && lkb->lkb_wait_type) { 1532 log_error(ls, "remwait error %x reply %d wait_type %d overlap", 1533 lkb->lkb_id, mstype, lkb->lkb_wait_type); 1534 lkb->lkb_wait_count--; 1535 unhold_lkb(lkb); 1536 lkb->lkb_wait_type = 0; 1537 } 1538 1539 DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb);); 1540 1541 clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags); 1542 lkb->lkb_wait_count--; 1543 if (!lkb->lkb_wait_count) 1544 list_del_init(&lkb->lkb_wait_reply); 1545 unhold_lkb(lkb); 1546 return 0; 1547 } 1548 1549 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype) 1550 { 1551 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 1552 int error; 1553 1554 mutex_lock(&ls->ls_waiters_mutex); 1555 error = _remove_from_waiters(lkb, mstype, NULL); 1556 mutex_unlock(&ls->ls_waiters_mutex); 1557 return error; 1558 } 1559 1560 /* Handles situations where we might be processing a "fake" or "local" reply in 1561 which we can't try to take waiters_mutex again. */ 1562 1563 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms, 1564 bool local) 1565 { 1566 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 1567 int error; 1568 1569 if (!local) 1570 mutex_lock(&ls->ls_waiters_mutex); 1571 error = _remove_from_waiters(lkb, le32_to_cpu(ms->m_type), ms); 1572 if (!local) 1573 mutex_unlock(&ls->ls_waiters_mutex); 1574 return error; 1575 } 1576 1577 static void shrink_bucket(struct dlm_ls *ls, int b) 1578 { 1579 struct rb_node *n, *next; 1580 struct dlm_rsb *r; 1581 char *name; 1582 int our_nodeid = dlm_our_nodeid(); 1583 int remote_count = 0; 1584 int need_shrink = 0; 1585 int i, len, rv; 1586 1587 memset(&ls->ls_remove_lens, 0, sizeof(int) * DLM_REMOVE_NAMES_MAX); 1588 1589 spin_lock(&ls->ls_rsbtbl[b].lock); 1590 1591 if (!test_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags)) { 1592 spin_unlock(&ls->ls_rsbtbl[b].lock); 1593 return; 1594 } 1595 1596 for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = next) { 1597 next = rb_next(n); 1598 r = rb_entry(n, struct dlm_rsb, res_hashnode); 1599 1600 /* If we're the directory record for this rsb, and 1601 we're not the master of it, then we need to wait 1602 for the master node to send us a dir remove for 1603 before removing the dir record. */ 1604 1605 if (!dlm_no_directory(ls) && 1606 (r->res_master_nodeid != our_nodeid) && 1607 (dlm_dir_nodeid(r) == our_nodeid)) { 1608 continue; 1609 } 1610 1611 need_shrink = 1; 1612 1613 if (!time_after_eq(jiffies, r->res_toss_time + 1614 dlm_config.ci_toss_secs * HZ)) { 1615 continue; 1616 } 1617 1618 if (!dlm_no_directory(ls) && 1619 (r->res_master_nodeid == our_nodeid) && 1620 (dlm_dir_nodeid(r) != our_nodeid)) { 1621 1622 /* We're the master of this rsb but we're not 1623 the directory record, so we need to tell the 1624 dir node to remove the dir record. */ 1625 1626 ls->ls_remove_lens[remote_count] = r->res_length; 1627 memcpy(ls->ls_remove_names[remote_count], r->res_name, 1628 DLM_RESNAME_MAXLEN); 1629 remote_count++; 1630 1631 if (remote_count >= DLM_REMOVE_NAMES_MAX) 1632 break; 1633 continue; 1634 } 1635 1636 if (!kref_put(&r->res_ref, kill_rsb)) { 1637 log_error(ls, "tossed rsb in use %s", r->res_name); 1638 continue; 1639 } 1640 1641 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); 1642 dlm_free_rsb(r); 1643 } 1644 1645 if (need_shrink) 1646 set_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags); 1647 else 1648 clear_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags); 1649 spin_unlock(&ls->ls_rsbtbl[b].lock); 1650 1651 /* 1652 * While searching for rsb's to free, we found some that require 1653 * remote removal. We leave them in place and find them again here 1654 * so there is a very small gap between removing them from the toss 1655 * list and sending the removal. Keeping this gap small is 1656 * important to keep us (the master node) from being out of sync 1657 * with the remote dir node for very long. 1658 */ 1659 1660 for (i = 0; i < remote_count; i++) { 1661 name = ls->ls_remove_names[i]; 1662 len = ls->ls_remove_lens[i]; 1663 1664 spin_lock(&ls->ls_rsbtbl[b].lock); 1665 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); 1666 if (rv) { 1667 spin_unlock(&ls->ls_rsbtbl[b].lock); 1668 log_debug(ls, "remove_name not toss %s", name); 1669 continue; 1670 } 1671 1672 if (r->res_master_nodeid != our_nodeid) { 1673 spin_unlock(&ls->ls_rsbtbl[b].lock); 1674 log_debug(ls, "remove_name master %d dir %d our %d %s", 1675 r->res_master_nodeid, r->res_dir_nodeid, 1676 our_nodeid, name); 1677 continue; 1678 } 1679 1680 if (r->res_dir_nodeid == our_nodeid) { 1681 /* should never happen */ 1682 spin_unlock(&ls->ls_rsbtbl[b].lock); 1683 log_error(ls, "remove_name dir %d master %d our %d %s", 1684 r->res_dir_nodeid, r->res_master_nodeid, 1685 our_nodeid, name); 1686 continue; 1687 } 1688 1689 if (!time_after_eq(jiffies, r->res_toss_time + 1690 dlm_config.ci_toss_secs * HZ)) { 1691 spin_unlock(&ls->ls_rsbtbl[b].lock); 1692 log_debug(ls, "remove_name toss_time %lu now %lu %s", 1693 r->res_toss_time, jiffies, name); 1694 continue; 1695 } 1696 1697 if (!kref_put(&r->res_ref, kill_rsb)) { 1698 spin_unlock(&ls->ls_rsbtbl[b].lock); 1699 log_error(ls, "remove_name in use %s", name); 1700 continue; 1701 } 1702 1703 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); 1704 send_remove(r); 1705 spin_unlock(&ls->ls_rsbtbl[b].lock); 1706 1707 dlm_free_rsb(r); 1708 } 1709 } 1710 1711 void dlm_scan_rsbs(struct dlm_ls *ls) 1712 { 1713 int i; 1714 1715 for (i = 0; i < ls->ls_rsbtbl_size; i++) { 1716 shrink_bucket(ls, i); 1717 if (dlm_locking_stopped(ls)) 1718 break; 1719 cond_resched(); 1720 } 1721 } 1722 1723 /* lkb is master or local copy */ 1724 1725 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 1726 { 1727 int b, len = r->res_ls->ls_lvblen; 1728 1729 /* b=1 lvb returned to caller 1730 b=0 lvb written to rsb or invalidated 1731 b=-1 do nothing */ 1732 1733 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1]; 1734 1735 if (b == 1) { 1736 if (!lkb->lkb_lvbptr) 1737 return; 1738 1739 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK)) 1740 return; 1741 1742 if (!r->res_lvbptr) 1743 return; 1744 1745 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len); 1746 lkb->lkb_lvbseq = r->res_lvbseq; 1747 1748 } else if (b == 0) { 1749 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) { 1750 rsb_set_flag(r, RSB_VALNOTVALID); 1751 return; 1752 } 1753 1754 if (!lkb->lkb_lvbptr) 1755 return; 1756 1757 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK)) 1758 return; 1759 1760 if (!r->res_lvbptr) 1761 r->res_lvbptr = dlm_allocate_lvb(r->res_ls); 1762 1763 if (!r->res_lvbptr) 1764 return; 1765 1766 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len); 1767 r->res_lvbseq++; 1768 lkb->lkb_lvbseq = r->res_lvbseq; 1769 rsb_clear_flag(r, RSB_VALNOTVALID); 1770 } 1771 1772 if (rsb_flag(r, RSB_VALNOTVALID)) 1773 set_bit(DLM_SBF_VALNOTVALID_BIT, &lkb->lkb_sbflags); 1774 } 1775 1776 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb) 1777 { 1778 if (lkb->lkb_grmode < DLM_LOCK_PW) 1779 return; 1780 1781 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) { 1782 rsb_set_flag(r, RSB_VALNOTVALID); 1783 return; 1784 } 1785 1786 if (!lkb->lkb_lvbptr) 1787 return; 1788 1789 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK)) 1790 return; 1791 1792 if (!r->res_lvbptr) 1793 r->res_lvbptr = dlm_allocate_lvb(r->res_ls); 1794 1795 if (!r->res_lvbptr) 1796 return; 1797 1798 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen); 1799 r->res_lvbseq++; 1800 rsb_clear_flag(r, RSB_VALNOTVALID); 1801 } 1802 1803 /* lkb is process copy (pc) */ 1804 1805 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb, 1806 struct dlm_message *ms) 1807 { 1808 int b; 1809 1810 if (!lkb->lkb_lvbptr) 1811 return; 1812 1813 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK)) 1814 return; 1815 1816 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1]; 1817 if (b == 1) { 1818 int len = receive_extralen(ms); 1819 if (len > r->res_ls->ls_lvblen) 1820 len = r->res_ls->ls_lvblen; 1821 memcpy(lkb->lkb_lvbptr, ms->m_extra, len); 1822 lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq); 1823 } 1824 } 1825 1826 /* Manipulate lkb's on rsb's convert/granted/waiting queues 1827 remove_lock -- used for unlock, removes lkb from granted 1828 revert_lock -- used for cancel, moves lkb from convert to granted 1829 grant_lock -- used for request and convert, adds lkb to granted or 1830 moves lkb from convert or waiting to granted 1831 1832 Each of these is used for master or local copy lkb's. There is 1833 also a _pc() variation used to make the corresponding change on 1834 a process copy (pc) lkb. */ 1835 1836 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 1837 { 1838 del_lkb(r, lkb); 1839 lkb->lkb_grmode = DLM_LOCK_IV; 1840 /* this unhold undoes the original ref from create_lkb() 1841 so this leads to the lkb being freed */ 1842 unhold_lkb(lkb); 1843 } 1844 1845 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 1846 { 1847 set_lvb_unlock(r, lkb); 1848 _remove_lock(r, lkb); 1849 } 1850 1851 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb) 1852 { 1853 _remove_lock(r, lkb); 1854 } 1855 1856 /* returns: 0 did nothing 1857 1 moved lock to granted 1858 -1 removed lock */ 1859 1860 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 1861 { 1862 int rv = 0; 1863 1864 lkb->lkb_rqmode = DLM_LOCK_IV; 1865 1866 switch (lkb->lkb_status) { 1867 case DLM_LKSTS_GRANTED: 1868 break; 1869 case DLM_LKSTS_CONVERT: 1870 move_lkb(r, lkb, DLM_LKSTS_GRANTED); 1871 rv = 1; 1872 break; 1873 case DLM_LKSTS_WAITING: 1874 del_lkb(r, lkb); 1875 lkb->lkb_grmode = DLM_LOCK_IV; 1876 /* this unhold undoes the original ref from create_lkb() 1877 so this leads to the lkb being freed */ 1878 unhold_lkb(lkb); 1879 rv = -1; 1880 break; 1881 default: 1882 log_print("invalid status for revert %d", lkb->lkb_status); 1883 } 1884 return rv; 1885 } 1886 1887 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb) 1888 { 1889 return revert_lock(r, lkb); 1890 } 1891 1892 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 1893 { 1894 if (lkb->lkb_grmode != lkb->lkb_rqmode) { 1895 lkb->lkb_grmode = lkb->lkb_rqmode; 1896 if (lkb->lkb_status) 1897 move_lkb(r, lkb, DLM_LKSTS_GRANTED); 1898 else 1899 add_lkb(r, lkb, DLM_LKSTS_GRANTED); 1900 } 1901 1902 lkb->lkb_rqmode = DLM_LOCK_IV; 1903 lkb->lkb_highbast = 0; 1904 } 1905 1906 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 1907 { 1908 set_lvb_lock(r, lkb); 1909 _grant_lock(r, lkb); 1910 } 1911 1912 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb, 1913 struct dlm_message *ms) 1914 { 1915 set_lvb_lock_pc(r, lkb, ms); 1916 _grant_lock(r, lkb); 1917 } 1918 1919 /* called by grant_pending_locks() which means an async grant message must 1920 be sent to the requesting node in addition to granting the lock if the 1921 lkb belongs to a remote node. */ 1922 1923 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb) 1924 { 1925 grant_lock(r, lkb); 1926 if (is_master_copy(lkb)) 1927 send_grant(r, lkb); 1928 else 1929 queue_cast(r, lkb, 0); 1930 } 1931 1932 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to 1933 change the granted/requested modes. We're munging things accordingly in 1934 the process copy. 1935 CONVDEADLK: our grmode may have been forced down to NL to resolve a 1936 conversion deadlock 1937 ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become 1938 compatible with other granted locks */ 1939 1940 static void munge_demoted(struct dlm_lkb *lkb) 1941 { 1942 if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) { 1943 log_print("munge_demoted %x invalid modes gr %d rq %d", 1944 lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode); 1945 return; 1946 } 1947 1948 lkb->lkb_grmode = DLM_LOCK_NL; 1949 } 1950 1951 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms) 1952 { 1953 if (ms->m_type != cpu_to_le32(DLM_MSG_REQUEST_REPLY) && 1954 ms->m_type != cpu_to_le32(DLM_MSG_GRANT)) { 1955 log_print("munge_altmode %x invalid reply type %d", 1956 lkb->lkb_id, le32_to_cpu(ms->m_type)); 1957 return; 1958 } 1959 1960 if (lkb->lkb_exflags & DLM_LKF_ALTPR) 1961 lkb->lkb_rqmode = DLM_LOCK_PR; 1962 else if (lkb->lkb_exflags & DLM_LKF_ALTCW) 1963 lkb->lkb_rqmode = DLM_LOCK_CW; 1964 else { 1965 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags); 1966 dlm_print_lkb(lkb); 1967 } 1968 } 1969 1970 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head) 1971 { 1972 struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb, 1973 lkb_statequeue); 1974 if (lkb->lkb_id == first->lkb_id) 1975 return 1; 1976 1977 return 0; 1978 } 1979 1980 /* Check if the given lkb conflicts with another lkb on the queue. */ 1981 1982 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb) 1983 { 1984 struct dlm_lkb *this; 1985 1986 list_for_each_entry(this, head, lkb_statequeue) { 1987 if (this == lkb) 1988 continue; 1989 if (!modes_compat(this, lkb)) 1990 return 1; 1991 } 1992 return 0; 1993 } 1994 1995 /* 1996 * "A conversion deadlock arises with a pair of lock requests in the converting 1997 * queue for one resource. The granted mode of each lock blocks the requested 1998 * mode of the other lock." 1999 * 2000 * Part 2: if the granted mode of lkb is preventing an earlier lkb in the 2001 * convert queue from being granted, then deadlk/demote lkb. 2002 * 2003 * Example: 2004 * Granted Queue: empty 2005 * Convert Queue: NL->EX (first lock) 2006 * PR->EX (second lock) 2007 * 2008 * The first lock can't be granted because of the granted mode of the second 2009 * lock and the second lock can't be granted because it's not first in the 2010 * list. We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we 2011 * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK 2012 * flag set and return DEMOTED in the lksb flags. 2013 * 2014 * Originally, this function detected conv-deadlk in a more limited scope: 2015 * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or 2016 * - if lkb1 was the first entry in the queue (not just earlier), and was 2017 * blocked by the granted mode of lkb2, and there was nothing on the 2018 * granted queue preventing lkb1 from being granted immediately, i.e. 2019 * lkb2 was the only thing preventing lkb1 from being granted. 2020 * 2021 * That second condition meant we'd only say there was conv-deadlk if 2022 * resolving it (by demotion) would lead to the first lock on the convert 2023 * queue being granted right away. It allowed conversion deadlocks to exist 2024 * between locks on the convert queue while they couldn't be granted anyway. 2025 * 2026 * Now, we detect and take action on conversion deadlocks immediately when 2027 * they're created, even if they may not be immediately consequential. If 2028 * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted 2029 * mode that would prevent lkb1's conversion from being granted, we do a 2030 * deadlk/demote on lkb2 right away and don't let it onto the convert queue. 2031 * I think this means that the lkb_is_ahead condition below should always 2032 * be zero, i.e. there will never be conv-deadlk between two locks that are 2033 * both already on the convert queue. 2034 */ 2035 2036 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2) 2037 { 2038 struct dlm_lkb *lkb1; 2039 int lkb_is_ahead = 0; 2040 2041 list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) { 2042 if (lkb1 == lkb2) { 2043 lkb_is_ahead = 1; 2044 continue; 2045 } 2046 2047 if (!lkb_is_ahead) { 2048 if (!modes_compat(lkb2, lkb1)) 2049 return 1; 2050 } else { 2051 if (!modes_compat(lkb2, lkb1) && 2052 !modes_compat(lkb1, lkb2)) 2053 return 1; 2054 } 2055 } 2056 return 0; 2057 } 2058 2059 /* 2060 * Return 1 if the lock can be granted, 0 otherwise. 2061 * Also detect and resolve conversion deadlocks. 2062 * 2063 * lkb is the lock to be granted 2064 * 2065 * now is 1 if the function is being called in the context of the 2066 * immediate request, it is 0 if called later, after the lock has been 2067 * queued. 2068 * 2069 * recover is 1 if dlm_recover_grant() is trying to grant conversions 2070 * after recovery. 2071 * 2072 * References are from chapter 6 of "VAXcluster Principles" by Roy Davis 2073 */ 2074 2075 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now, 2076 int recover) 2077 { 2078 int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV); 2079 2080 /* 2081 * 6-10: Version 5.4 introduced an option to address the phenomenon of 2082 * a new request for a NL mode lock being blocked. 2083 * 2084 * 6-11: If the optional EXPEDITE flag is used with the new NL mode 2085 * request, then it would be granted. In essence, the use of this flag 2086 * tells the Lock Manager to expedite theis request by not considering 2087 * what may be in the CONVERTING or WAITING queues... As of this 2088 * writing, the EXPEDITE flag can be used only with new requests for NL 2089 * mode locks. This flag is not valid for conversion requests. 2090 * 2091 * A shortcut. Earlier checks return an error if EXPEDITE is used in a 2092 * conversion or used with a non-NL requested mode. We also know an 2093 * EXPEDITE request is always granted immediately, so now must always 2094 * be 1. The full condition to grant an expedite request: (now && 2095 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can 2096 * therefore be shortened to just checking the flag. 2097 */ 2098 2099 if (lkb->lkb_exflags & DLM_LKF_EXPEDITE) 2100 return 1; 2101 2102 /* 2103 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be 2104 * added to the remaining conditions. 2105 */ 2106 2107 if (queue_conflict(&r->res_grantqueue, lkb)) 2108 return 0; 2109 2110 /* 2111 * 6-3: By default, a conversion request is immediately granted if the 2112 * requested mode is compatible with the modes of all other granted 2113 * locks 2114 */ 2115 2116 if (queue_conflict(&r->res_convertqueue, lkb)) 2117 return 0; 2118 2119 /* 2120 * The RECOVER_GRANT flag means dlm_recover_grant() is granting 2121 * locks for a recovered rsb, on which lkb's have been rebuilt. 2122 * The lkb's may have been rebuilt on the queues in a different 2123 * order than they were in on the previous master. So, granting 2124 * queued conversions in order after recovery doesn't make sense 2125 * since the order hasn't been preserved anyway. The new order 2126 * could also have created a new "in place" conversion deadlock. 2127 * (e.g. old, failed master held granted EX, with PR->EX, NL->EX. 2128 * After recovery, there would be no granted locks, and possibly 2129 * NL->EX, PR->EX, an in-place conversion deadlock.) So, after 2130 * recovery, grant conversions without considering order. 2131 */ 2132 2133 if (conv && recover) 2134 return 1; 2135 2136 /* 2137 * 6-5: But the default algorithm for deciding whether to grant or 2138 * queue conversion requests does not by itself guarantee that such 2139 * requests are serviced on a "first come first serve" basis. This, in 2140 * turn, can lead to a phenomenon known as "indefinate postponement". 2141 * 2142 * 6-7: This issue is dealt with by using the optional QUECVT flag with 2143 * the system service employed to request a lock conversion. This flag 2144 * forces certain conversion requests to be queued, even if they are 2145 * compatible with the granted modes of other locks on the same 2146 * resource. Thus, the use of this flag results in conversion requests 2147 * being ordered on a "first come first servce" basis. 2148 * 2149 * DCT: This condition is all about new conversions being able to occur 2150 * "in place" while the lock remains on the granted queue (assuming 2151 * nothing else conflicts.) IOW if QUECVT isn't set, a conversion 2152 * doesn't _have_ to go onto the convert queue where it's processed in 2153 * order. The "now" variable is necessary to distinguish converts 2154 * being received and processed for the first time now, because once a 2155 * convert is moved to the conversion queue the condition below applies 2156 * requiring fifo granting. 2157 */ 2158 2159 if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT)) 2160 return 1; 2161 2162 /* 2163 * Even if the convert is compat with all granted locks, 2164 * QUECVT forces it behind other locks on the convert queue. 2165 */ 2166 2167 if (now && conv && (lkb->lkb_exflags & DLM_LKF_QUECVT)) { 2168 if (list_empty(&r->res_convertqueue)) 2169 return 1; 2170 else 2171 return 0; 2172 } 2173 2174 /* 2175 * The NOORDER flag is set to avoid the standard vms rules on grant 2176 * order. 2177 */ 2178 2179 if (lkb->lkb_exflags & DLM_LKF_NOORDER) 2180 return 1; 2181 2182 /* 2183 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be 2184 * granted until all other conversion requests ahead of it are granted 2185 * and/or canceled. 2186 */ 2187 2188 if (!now && conv && first_in_list(lkb, &r->res_convertqueue)) 2189 return 1; 2190 2191 /* 2192 * 6-4: By default, a new request is immediately granted only if all 2193 * three of the following conditions are satisfied when the request is 2194 * issued: 2195 * - The queue of ungranted conversion requests for the resource is 2196 * empty. 2197 * - The queue of ungranted new requests for the resource is empty. 2198 * - The mode of the new request is compatible with the most 2199 * restrictive mode of all granted locks on the resource. 2200 */ 2201 2202 if (now && !conv && list_empty(&r->res_convertqueue) && 2203 list_empty(&r->res_waitqueue)) 2204 return 1; 2205 2206 /* 2207 * 6-4: Once a lock request is in the queue of ungranted new requests, 2208 * it cannot be granted until the queue of ungranted conversion 2209 * requests is empty, all ungranted new requests ahead of it are 2210 * granted and/or canceled, and it is compatible with the granted mode 2211 * of the most restrictive lock granted on the resource. 2212 */ 2213 2214 if (!now && !conv && list_empty(&r->res_convertqueue) && 2215 first_in_list(lkb, &r->res_waitqueue)) 2216 return 1; 2217 2218 return 0; 2219 } 2220 2221 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now, 2222 int recover, int *err) 2223 { 2224 int rv; 2225 int8_t alt = 0, rqmode = lkb->lkb_rqmode; 2226 int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV); 2227 2228 if (err) 2229 *err = 0; 2230 2231 rv = _can_be_granted(r, lkb, now, recover); 2232 if (rv) 2233 goto out; 2234 2235 /* 2236 * The CONVDEADLK flag is non-standard and tells the dlm to resolve 2237 * conversion deadlocks by demoting grmode to NL, otherwise the dlm 2238 * cancels one of the locks. 2239 */ 2240 2241 if (is_convert && can_be_queued(lkb) && 2242 conversion_deadlock_detect(r, lkb)) { 2243 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) { 2244 lkb->lkb_grmode = DLM_LOCK_NL; 2245 set_bit(DLM_SBF_DEMOTED_BIT, &lkb->lkb_sbflags); 2246 } else if (err) { 2247 *err = -EDEADLK; 2248 } else { 2249 log_print("can_be_granted deadlock %x now %d", 2250 lkb->lkb_id, now); 2251 dlm_dump_rsb(r); 2252 } 2253 goto out; 2254 } 2255 2256 /* 2257 * The ALTPR and ALTCW flags are non-standard and tell the dlm to try 2258 * to grant a request in a mode other than the normal rqmode. It's a 2259 * simple way to provide a big optimization to applications that can 2260 * use them. 2261 */ 2262 2263 if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR)) 2264 alt = DLM_LOCK_PR; 2265 else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW)) 2266 alt = DLM_LOCK_CW; 2267 2268 if (alt) { 2269 lkb->lkb_rqmode = alt; 2270 rv = _can_be_granted(r, lkb, now, 0); 2271 if (rv) 2272 set_bit(DLM_SBF_ALTMODE_BIT, &lkb->lkb_sbflags); 2273 else 2274 lkb->lkb_rqmode = rqmode; 2275 } 2276 out: 2277 return rv; 2278 } 2279 2280 /* Returns the highest requested mode of all blocked conversions; sets 2281 cw if there's a blocked conversion to DLM_LOCK_CW. */ 2282 2283 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw, 2284 unsigned int *count) 2285 { 2286 struct dlm_lkb *lkb, *s; 2287 int recover = rsb_flag(r, RSB_RECOVER_GRANT); 2288 int hi, demoted, quit, grant_restart, demote_restart; 2289 int deadlk; 2290 2291 quit = 0; 2292 restart: 2293 grant_restart = 0; 2294 demote_restart = 0; 2295 hi = DLM_LOCK_IV; 2296 2297 list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) { 2298 demoted = is_demoted(lkb); 2299 deadlk = 0; 2300 2301 if (can_be_granted(r, lkb, 0, recover, &deadlk)) { 2302 grant_lock_pending(r, lkb); 2303 grant_restart = 1; 2304 if (count) 2305 (*count)++; 2306 continue; 2307 } 2308 2309 if (!demoted && is_demoted(lkb)) { 2310 log_print("WARN: pending demoted %x node %d %s", 2311 lkb->lkb_id, lkb->lkb_nodeid, r->res_name); 2312 demote_restart = 1; 2313 continue; 2314 } 2315 2316 if (deadlk) { 2317 /* 2318 * If DLM_LKB_NODLKWT flag is set and conversion 2319 * deadlock is detected, we request blocking AST and 2320 * down (or cancel) conversion. 2321 */ 2322 if (lkb->lkb_exflags & DLM_LKF_NODLCKWT) { 2323 if (lkb->lkb_highbast < lkb->lkb_rqmode) { 2324 queue_bast(r, lkb, lkb->lkb_rqmode); 2325 lkb->lkb_highbast = lkb->lkb_rqmode; 2326 } 2327 } else { 2328 log_print("WARN: pending deadlock %x node %d %s", 2329 lkb->lkb_id, lkb->lkb_nodeid, 2330 r->res_name); 2331 dlm_dump_rsb(r); 2332 } 2333 continue; 2334 } 2335 2336 hi = max_t(int, lkb->lkb_rqmode, hi); 2337 2338 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW) 2339 *cw = 1; 2340 } 2341 2342 if (grant_restart) 2343 goto restart; 2344 if (demote_restart && !quit) { 2345 quit = 1; 2346 goto restart; 2347 } 2348 2349 return max_t(int, high, hi); 2350 } 2351 2352 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw, 2353 unsigned int *count) 2354 { 2355 struct dlm_lkb *lkb, *s; 2356 2357 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) { 2358 if (can_be_granted(r, lkb, 0, 0, NULL)) { 2359 grant_lock_pending(r, lkb); 2360 if (count) 2361 (*count)++; 2362 } else { 2363 high = max_t(int, lkb->lkb_rqmode, high); 2364 if (lkb->lkb_rqmode == DLM_LOCK_CW) 2365 *cw = 1; 2366 } 2367 } 2368 2369 return high; 2370 } 2371 2372 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked 2373 on either the convert or waiting queue. 2374 high is the largest rqmode of all locks blocked on the convert or 2375 waiting queue. */ 2376 2377 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw) 2378 { 2379 if (gr->lkb_grmode == DLM_LOCK_PR && cw) { 2380 if (gr->lkb_highbast < DLM_LOCK_EX) 2381 return 1; 2382 return 0; 2383 } 2384 2385 if (gr->lkb_highbast < high && 2386 !__dlm_compat_matrix[gr->lkb_grmode+1][high+1]) 2387 return 1; 2388 return 0; 2389 } 2390 2391 static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count) 2392 { 2393 struct dlm_lkb *lkb, *s; 2394 int high = DLM_LOCK_IV; 2395 int cw = 0; 2396 2397 if (!is_master(r)) { 2398 log_print("grant_pending_locks r nodeid %d", r->res_nodeid); 2399 dlm_dump_rsb(r); 2400 return; 2401 } 2402 2403 high = grant_pending_convert(r, high, &cw, count); 2404 high = grant_pending_wait(r, high, &cw, count); 2405 2406 if (high == DLM_LOCK_IV) 2407 return; 2408 2409 /* 2410 * If there are locks left on the wait/convert queue then send blocking 2411 * ASTs to granted locks based on the largest requested mode (high) 2412 * found above. 2413 */ 2414 2415 list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) { 2416 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) { 2417 if (cw && high == DLM_LOCK_PR && 2418 lkb->lkb_grmode == DLM_LOCK_PR) 2419 queue_bast(r, lkb, DLM_LOCK_CW); 2420 else 2421 queue_bast(r, lkb, high); 2422 lkb->lkb_highbast = high; 2423 } 2424 } 2425 } 2426 2427 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq) 2428 { 2429 if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) || 2430 (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) { 2431 if (gr->lkb_highbast < DLM_LOCK_EX) 2432 return 1; 2433 return 0; 2434 } 2435 2436 if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq)) 2437 return 1; 2438 return 0; 2439 } 2440 2441 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head, 2442 struct dlm_lkb *lkb) 2443 { 2444 struct dlm_lkb *gr; 2445 2446 list_for_each_entry(gr, head, lkb_statequeue) { 2447 /* skip self when sending basts to convertqueue */ 2448 if (gr == lkb) 2449 continue; 2450 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) { 2451 queue_bast(r, gr, lkb->lkb_rqmode); 2452 gr->lkb_highbast = lkb->lkb_rqmode; 2453 } 2454 } 2455 } 2456 2457 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb) 2458 { 2459 send_bast_queue(r, &r->res_grantqueue, lkb); 2460 } 2461 2462 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb) 2463 { 2464 send_bast_queue(r, &r->res_grantqueue, lkb); 2465 send_bast_queue(r, &r->res_convertqueue, lkb); 2466 } 2467 2468 /* set_master(r, lkb) -- set the master nodeid of a resource 2469 2470 The purpose of this function is to set the nodeid field in the given 2471 lkb using the nodeid field in the given rsb. If the rsb's nodeid is 2472 known, it can just be copied to the lkb and the function will return 2473 0. If the rsb's nodeid is _not_ known, it needs to be looked up 2474 before it can be copied to the lkb. 2475 2476 When the rsb nodeid is being looked up remotely, the initial lkb 2477 causing the lookup is kept on the ls_waiters list waiting for the 2478 lookup reply. Other lkb's waiting for the same rsb lookup are kept 2479 on the rsb's res_lookup list until the master is verified. 2480 2481 Return values: 2482 0: nodeid is set in rsb/lkb and the caller should go ahead and use it 2483 1: the rsb master is not available and the lkb has been placed on 2484 a wait queue 2485 */ 2486 2487 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb) 2488 { 2489 int our_nodeid = dlm_our_nodeid(); 2490 2491 if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) { 2492 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN); 2493 r->res_first_lkid = lkb->lkb_id; 2494 lkb->lkb_nodeid = r->res_nodeid; 2495 return 0; 2496 } 2497 2498 if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) { 2499 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup); 2500 return 1; 2501 } 2502 2503 if (r->res_master_nodeid == our_nodeid) { 2504 lkb->lkb_nodeid = 0; 2505 return 0; 2506 } 2507 2508 if (r->res_master_nodeid) { 2509 lkb->lkb_nodeid = r->res_master_nodeid; 2510 return 0; 2511 } 2512 2513 if (dlm_dir_nodeid(r) == our_nodeid) { 2514 /* This is a somewhat unusual case; find_rsb will usually 2515 have set res_master_nodeid when dir nodeid is local, but 2516 there are cases where we become the dir node after we've 2517 past find_rsb and go through _request_lock again. 2518 confirm_master() or process_lookup_list() needs to be 2519 called after this. */ 2520 log_debug(r->res_ls, "set_master %x self master %d dir %d %s", 2521 lkb->lkb_id, r->res_master_nodeid, r->res_dir_nodeid, 2522 r->res_name); 2523 r->res_master_nodeid = our_nodeid; 2524 r->res_nodeid = 0; 2525 lkb->lkb_nodeid = 0; 2526 return 0; 2527 } 2528 2529 r->res_first_lkid = lkb->lkb_id; 2530 send_lookup(r, lkb); 2531 return 1; 2532 } 2533 2534 static void process_lookup_list(struct dlm_rsb *r) 2535 { 2536 struct dlm_lkb *lkb, *safe; 2537 2538 list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) { 2539 list_del_init(&lkb->lkb_rsb_lookup); 2540 _request_lock(r, lkb); 2541 schedule(); 2542 } 2543 } 2544 2545 /* confirm_master -- confirm (or deny) an rsb's master nodeid */ 2546 2547 static void confirm_master(struct dlm_rsb *r, int error) 2548 { 2549 struct dlm_lkb *lkb; 2550 2551 if (!r->res_first_lkid) 2552 return; 2553 2554 switch (error) { 2555 case 0: 2556 case -EINPROGRESS: 2557 r->res_first_lkid = 0; 2558 process_lookup_list(r); 2559 break; 2560 2561 case -EAGAIN: 2562 case -EBADR: 2563 case -ENOTBLK: 2564 /* the remote request failed and won't be retried (it was 2565 a NOQUEUE, or has been canceled/unlocked); make a waiting 2566 lkb the first_lkid */ 2567 2568 r->res_first_lkid = 0; 2569 2570 if (!list_empty(&r->res_lookup)) { 2571 lkb = list_entry(r->res_lookup.next, struct dlm_lkb, 2572 lkb_rsb_lookup); 2573 list_del_init(&lkb->lkb_rsb_lookup); 2574 r->res_first_lkid = lkb->lkb_id; 2575 _request_lock(r, lkb); 2576 } 2577 break; 2578 2579 default: 2580 log_error(r->res_ls, "confirm_master unknown error %d", error); 2581 } 2582 } 2583 2584 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags, 2585 int namelen, void (*ast)(void *astparam), 2586 void *astparam, 2587 void (*bast)(void *astparam, int mode), 2588 struct dlm_args *args) 2589 { 2590 int rv = -EINVAL; 2591 2592 /* check for invalid arg usage */ 2593 2594 if (mode < 0 || mode > DLM_LOCK_EX) 2595 goto out; 2596 2597 if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN)) 2598 goto out; 2599 2600 if (flags & DLM_LKF_CANCEL) 2601 goto out; 2602 2603 if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT)) 2604 goto out; 2605 2606 if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT)) 2607 goto out; 2608 2609 if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE) 2610 goto out; 2611 2612 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT) 2613 goto out; 2614 2615 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT) 2616 goto out; 2617 2618 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE) 2619 goto out; 2620 2621 if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL) 2622 goto out; 2623 2624 if (!ast || !lksb) 2625 goto out; 2626 2627 if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr) 2628 goto out; 2629 2630 if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid) 2631 goto out; 2632 2633 /* these args will be copied to the lkb in validate_lock_args, 2634 it cannot be done now because when converting locks, fields in 2635 an active lkb cannot be modified before locking the rsb */ 2636 2637 args->flags = flags; 2638 args->astfn = ast; 2639 args->astparam = astparam; 2640 args->bastfn = bast; 2641 args->mode = mode; 2642 args->lksb = lksb; 2643 rv = 0; 2644 out: 2645 return rv; 2646 } 2647 2648 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args) 2649 { 2650 if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK | 2651 DLM_LKF_FORCEUNLOCK)) 2652 return -EINVAL; 2653 2654 if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK) 2655 return -EINVAL; 2656 2657 args->flags = flags; 2658 args->astparam = astarg; 2659 return 0; 2660 } 2661 2662 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, 2663 struct dlm_args *args) 2664 { 2665 int rv = -EBUSY; 2666 2667 if (args->flags & DLM_LKF_CONVERT) { 2668 if (lkb->lkb_status != DLM_LKSTS_GRANTED) 2669 goto out; 2670 2671 /* lock not allowed if there's any op in progress */ 2672 if (lkb->lkb_wait_type || lkb->lkb_wait_count) 2673 goto out; 2674 2675 if (is_overlap(lkb)) 2676 goto out; 2677 2678 rv = -EINVAL; 2679 if (test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags)) 2680 goto out; 2681 2682 if (args->flags & DLM_LKF_QUECVT && 2683 !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1]) 2684 goto out; 2685 } 2686 2687 lkb->lkb_exflags = args->flags; 2688 dlm_set_sbflags_val(lkb, 0); 2689 lkb->lkb_astfn = args->astfn; 2690 lkb->lkb_astparam = args->astparam; 2691 lkb->lkb_bastfn = args->bastfn; 2692 lkb->lkb_rqmode = args->mode; 2693 lkb->lkb_lksb = args->lksb; 2694 lkb->lkb_lvbptr = args->lksb->sb_lvbptr; 2695 lkb->lkb_ownpid = (int) current->pid; 2696 rv = 0; 2697 out: 2698 switch (rv) { 2699 case 0: 2700 break; 2701 case -EINVAL: 2702 /* annoy the user because dlm usage is wrong */ 2703 WARN_ON(1); 2704 log_error(ls, "%s %d %x %x %x %d %d %s", __func__, 2705 rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags, 2706 lkb->lkb_status, lkb->lkb_wait_type, 2707 lkb->lkb_resource->res_name); 2708 break; 2709 default: 2710 log_debug(ls, "%s %d %x %x %x %d %d %s", __func__, 2711 rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags, 2712 lkb->lkb_status, lkb->lkb_wait_type, 2713 lkb->lkb_resource->res_name); 2714 break; 2715 } 2716 2717 return rv; 2718 } 2719 2720 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0 2721 for success */ 2722 2723 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here 2724 because there may be a lookup in progress and it's valid to do 2725 cancel/unlockf on it */ 2726 2727 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args) 2728 { 2729 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 2730 int rv = -EBUSY; 2731 2732 /* normal unlock not allowed if there's any op in progress */ 2733 if (!(args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) && 2734 (lkb->lkb_wait_type || lkb->lkb_wait_count)) 2735 goto out; 2736 2737 /* an lkb may be waiting for an rsb lookup to complete where the 2738 lookup was initiated by another lock */ 2739 2740 if (!list_empty(&lkb->lkb_rsb_lookup)) { 2741 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) { 2742 log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id); 2743 list_del_init(&lkb->lkb_rsb_lookup); 2744 queue_cast(lkb->lkb_resource, lkb, 2745 args->flags & DLM_LKF_CANCEL ? 2746 -DLM_ECANCEL : -DLM_EUNLOCK); 2747 unhold_lkb(lkb); /* undoes create_lkb() */ 2748 } 2749 /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */ 2750 goto out; 2751 } 2752 2753 rv = -EINVAL; 2754 if (test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags)) { 2755 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id); 2756 dlm_print_lkb(lkb); 2757 goto out; 2758 } 2759 2760 /* an lkb may still exist even though the lock is EOL'ed due to a 2761 * cancel, unlock or failed noqueue request; an app can't use these 2762 * locks; return same error as if the lkid had not been found at all 2763 */ 2764 2765 if (test_bit(DLM_IFL_ENDOFLIFE_BIT, &lkb->lkb_iflags)) { 2766 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id); 2767 rv = -ENOENT; 2768 goto out; 2769 } 2770 2771 /* cancel not allowed with another cancel/unlock in progress */ 2772 2773 if (args->flags & DLM_LKF_CANCEL) { 2774 if (lkb->lkb_exflags & DLM_LKF_CANCEL) 2775 goto out; 2776 2777 if (is_overlap(lkb)) 2778 goto out; 2779 2780 if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) { 2781 set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags); 2782 rv = -EBUSY; 2783 goto out; 2784 } 2785 2786 /* there's nothing to cancel */ 2787 if (lkb->lkb_status == DLM_LKSTS_GRANTED && 2788 !lkb->lkb_wait_type) { 2789 rv = -EBUSY; 2790 goto out; 2791 } 2792 2793 switch (lkb->lkb_wait_type) { 2794 case DLM_MSG_LOOKUP: 2795 case DLM_MSG_REQUEST: 2796 set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags); 2797 rv = -EBUSY; 2798 goto out; 2799 case DLM_MSG_UNLOCK: 2800 case DLM_MSG_CANCEL: 2801 goto out; 2802 } 2803 /* add_to_waiters() will set OVERLAP_CANCEL */ 2804 goto out_ok; 2805 } 2806 2807 /* do we need to allow a force-unlock if there's a normal unlock 2808 already in progress? in what conditions could the normal unlock 2809 fail such that we'd want to send a force-unlock to be sure? */ 2810 2811 if (args->flags & DLM_LKF_FORCEUNLOCK) { 2812 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK) 2813 goto out; 2814 2815 if (is_overlap_unlock(lkb)) 2816 goto out; 2817 2818 if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) { 2819 set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags); 2820 rv = -EBUSY; 2821 goto out; 2822 } 2823 2824 switch (lkb->lkb_wait_type) { 2825 case DLM_MSG_LOOKUP: 2826 case DLM_MSG_REQUEST: 2827 set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags); 2828 rv = -EBUSY; 2829 goto out; 2830 case DLM_MSG_UNLOCK: 2831 goto out; 2832 } 2833 /* add_to_waiters() will set OVERLAP_UNLOCK */ 2834 } 2835 2836 out_ok: 2837 /* an overlapping op shouldn't blow away exflags from other op */ 2838 lkb->lkb_exflags |= args->flags; 2839 dlm_set_sbflags_val(lkb, 0); 2840 lkb->lkb_astparam = args->astparam; 2841 rv = 0; 2842 out: 2843 switch (rv) { 2844 case 0: 2845 break; 2846 case -EINVAL: 2847 /* annoy the user because dlm usage is wrong */ 2848 WARN_ON(1); 2849 log_error(ls, "%s %d %x %x %x %x %d %s", __func__, rv, 2850 lkb->lkb_id, dlm_iflags_val(lkb), lkb->lkb_exflags, 2851 args->flags, lkb->lkb_wait_type, 2852 lkb->lkb_resource->res_name); 2853 break; 2854 default: 2855 log_debug(ls, "%s %d %x %x %x %x %d %s", __func__, rv, 2856 lkb->lkb_id, dlm_iflags_val(lkb), lkb->lkb_exflags, 2857 args->flags, lkb->lkb_wait_type, 2858 lkb->lkb_resource->res_name); 2859 break; 2860 } 2861 2862 return rv; 2863 } 2864 2865 /* 2866 * Four stage 4 varieties: 2867 * do_request(), do_convert(), do_unlock(), do_cancel() 2868 * These are called on the master node for the given lock and 2869 * from the central locking logic. 2870 */ 2871 2872 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb) 2873 { 2874 int error = 0; 2875 2876 if (can_be_granted(r, lkb, 1, 0, NULL)) { 2877 grant_lock(r, lkb); 2878 queue_cast(r, lkb, 0); 2879 goto out; 2880 } 2881 2882 if (can_be_queued(lkb)) { 2883 error = -EINPROGRESS; 2884 add_lkb(r, lkb, DLM_LKSTS_WAITING); 2885 goto out; 2886 } 2887 2888 error = -EAGAIN; 2889 queue_cast(r, lkb, -EAGAIN); 2890 out: 2891 return error; 2892 } 2893 2894 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb, 2895 int error) 2896 { 2897 switch (error) { 2898 case -EAGAIN: 2899 if (force_blocking_asts(lkb)) 2900 send_blocking_asts_all(r, lkb); 2901 break; 2902 case -EINPROGRESS: 2903 send_blocking_asts(r, lkb); 2904 break; 2905 } 2906 } 2907 2908 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb) 2909 { 2910 int error = 0; 2911 int deadlk = 0; 2912 2913 /* changing an existing lock may allow others to be granted */ 2914 2915 if (can_be_granted(r, lkb, 1, 0, &deadlk)) { 2916 grant_lock(r, lkb); 2917 queue_cast(r, lkb, 0); 2918 goto out; 2919 } 2920 2921 /* can_be_granted() detected that this lock would block in a conversion 2922 deadlock, so we leave it on the granted queue and return EDEADLK in 2923 the ast for the convert. */ 2924 2925 if (deadlk && !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) { 2926 /* it's left on the granted queue */ 2927 revert_lock(r, lkb); 2928 queue_cast(r, lkb, -EDEADLK); 2929 error = -EDEADLK; 2930 goto out; 2931 } 2932 2933 /* is_demoted() means the can_be_granted() above set the grmode 2934 to NL, and left us on the granted queue. This auto-demotion 2935 (due to CONVDEADLK) might mean other locks, and/or this lock, are 2936 now grantable. We have to try to grant other converting locks 2937 before we try again to grant this one. */ 2938 2939 if (is_demoted(lkb)) { 2940 grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL); 2941 if (_can_be_granted(r, lkb, 1, 0)) { 2942 grant_lock(r, lkb); 2943 queue_cast(r, lkb, 0); 2944 goto out; 2945 } 2946 /* else fall through and move to convert queue */ 2947 } 2948 2949 if (can_be_queued(lkb)) { 2950 error = -EINPROGRESS; 2951 del_lkb(r, lkb); 2952 add_lkb(r, lkb, DLM_LKSTS_CONVERT); 2953 goto out; 2954 } 2955 2956 error = -EAGAIN; 2957 queue_cast(r, lkb, -EAGAIN); 2958 out: 2959 return error; 2960 } 2961 2962 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb, 2963 int error) 2964 { 2965 switch (error) { 2966 case 0: 2967 grant_pending_locks(r, NULL); 2968 /* grant_pending_locks also sends basts */ 2969 break; 2970 case -EAGAIN: 2971 if (force_blocking_asts(lkb)) 2972 send_blocking_asts_all(r, lkb); 2973 break; 2974 case -EINPROGRESS: 2975 send_blocking_asts(r, lkb); 2976 break; 2977 } 2978 } 2979 2980 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb) 2981 { 2982 remove_lock(r, lkb); 2983 queue_cast(r, lkb, -DLM_EUNLOCK); 2984 return -DLM_EUNLOCK; 2985 } 2986 2987 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb, 2988 int error) 2989 { 2990 grant_pending_locks(r, NULL); 2991 } 2992 2993 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */ 2994 2995 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb) 2996 { 2997 int error; 2998 2999 error = revert_lock(r, lkb); 3000 if (error) { 3001 queue_cast(r, lkb, -DLM_ECANCEL); 3002 return -DLM_ECANCEL; 3003 } 3004 return 0; 3005 } 3006 3007 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb, 3008 int error) 3009 { 3010 if (error) 3011 grant_pending_locks(r, NULL); 3012 } 3013 3014 /* 3015 * Four stage 3 varieties: 3016 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock() 3017 */ 3018 3019 /* add a new lkb to a possibly new rsb, called by requesting process */ 3020 3021 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 3022 { 3023 int error; 3024 3025 /* set_master: sets lkb nodeid from r */ 3026 3027 error = set_master(r, lkb); 3028 if (error < 0) 3029 goto out; 3030 if (error) { 3031 error = 0; 3032 goto out; 3033 } 3034 3035 if (is_remote(r)) { 3036 /* receive_request() calls do_request() on remote node */ 3037 error = send_request(r, lkb); 3038 } else { 3039 error = do_request(r, lkb); 3040 /* for remote locks the request_reply is sent 3041 between do_request and do_request_effects */ 3042 do_request_effects(r, lkb, error); 3043 } 3044 out: 3045 return error; 3046 } 3047 3048 /* change some property of an existing lkb, e.g. mode */ 3049 3050 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 3051 { 3052 int error; 3053 3054 if (is_remote(r)) { 3055 /* receive_convert() calls do_convert() on remote node */ 3056 error = send_convert(r, lkb); 3057 } else { 3058 error = do_convert(r, lkb); 3059 /* for remote locks the convert_reply is sent 3060 between do_convert and do_convert_effects */ 3061 do_convert_effects(r, lkb, error); 3062 } 3063 3064 return error; 3065 } 3066 3067 /* remove an existing lkb from the granted queue */ 3068 3069 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 3070 { 3071 int error; 3072 3073 if (is_remote(r)) { 3074 /* receive_unlock() calls do_unlock() on remote node */ 3075 error = send_unlock(r, lkb); 3076 } else { 3077 error = do_unlock(r, lkb); 3078 /* for remote locks the unlock_reply is sent 3079 between do_unlock and do_unlock_effects */ 3080 do_unlock_effects(r, lkb, error); 3081 } 3082 3083 return error; 3084 } 3085 3086 /* remove an existing lkb from the convert or wait queue */ 3087 3088 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 3089 { 3090 int error; 3091 3092 if (is_remote(r)) { 3093 /* receive_cancel() calls do_cancel() on remote node */ 3094 error = send_cancel(r, lkb); 3095 } else { 3096 error = do_cancel(r, lkb); 3097 /* for remote locks the cancel_reply is sent 3098 between do_cancel and do_cancel_effects */ 3099 do_cancel_effects(r, lkb, error); 3100 } 3101 3102 return error; 3103 } 3104 3105 /* 3106 * Four stage 2 varieties: 3107 * request_lock(), convert_lock(), unlock_lock(), cancel_lock() 3108 */ 3109 3110 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, 3111 const void *name, int len, 3112 struct dlm_args *args) 3113 { 3114 struct dlm_rsb *r; 3115 int error; 3116 3117 error = validate_lock_args(ls, lkb, args); 3118 if (error) 3119 return error; 3120 3121 error = find_rsb(ls, name, len, 0, R_REQUEST, &r); 3122 if (error) 3123 return error; 3124 3125 lock_rsb(r); 3126 3127 attach_lkb(r, lkb); 3128 lkb->lkb_lksb->sb_lkid = lkb->lkb_id; 3129 3130 error = _request_lock(r, lkb); 3131 3132 unlock_rsb(r); 3133 put_rsb(r); 3134 return error; 3135 } 3136 3137 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, 3138 struct dlm_args *args) 3139 { 3140 struct dlm_rsb *r; 3141 int error; 3142 3143 r = lkb->lkb_resource; 3144 3145 hold_rsb(r); 3146 lock_rsb(r); 3147 3148 error = validate_lock_args(ls, lkb, args); 3149 if (error) 3150 goto out; 3151 3152 error = _convert_lock(r, lkb); 3153 out: 3154 unlock_rsb(r); 3155 put_rsb(r); 3156 return error; 3157 } 3158 3159 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, 3160 struct dlm_args *args) 3161 { 3162 struct dlm_rsb *r; 3163 int error; 3164 3165 r = lkb->lkb_resource; 3166 3167 hold_rsb(r); 3168 lock_rsb(r); 3169 3170 error = validate_unlock_args(lkb, args); 3171 if (error) 3172 goto out; 3173 3174 error = _unlock_lock(r, lkb); 3175 out: 3176 unlock_rsb(r); 3177 put_rsb(r); 3178 return error; 3179 } 3180 3181 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, 3182 struct dlm_args *args) 3183 { 3184 struct dlm_rsb *r; 3185 int error; 3186 3187 r = lkb->lkb_resource; 3188 3189 hold_rsb(r); 3190 lock_rsb(r); 3191 3192 error = validate_unlock_args(lkb, args); 3193 if (error) 3194 goto out; 3195 3196 error = _cancel_lock(r, lkb); 3197 out: 3198 unlock_rsb(r); 3199 put_rsb(r); 3200 return error; 3201 } 3202 3203 /* 3204 * Two stage 1 varieties: dlm_lock() and dlm_unlock() 3205 */ 3206 3207 int dlm_lock(dlm_lockspace_t *lockspace, 3208 int mode, 3209 struct dlm_lksb *lksb, 3210 uint32_t flags, 3211 const void *name, 3212 unsigned int namelen, 3213 uint32_t parent_lkid, 3214 void (*ast) (void *astarg), 3215 void *astarg, 3216 void (*bast) (void *astarg, int mode)) 3217 { 3218 struct dlm_ls *ls; 3219 struct dlm_lkb *lkb; 3220 struct dlm_args args; 3221 int error, convert = flags & DLM_LKF_CONVERT; 3222 3223 ls = dlm_find_lockspace_local(lockspace); 3224 if (!ls) 3225 return -EINVAL; 3226 3227 dlm_lock_recovery(ls); 3228 3229 if (convert) 3230 error = find_lkb(ls, lksb->sb_lkid, &lkb); 3231 else 3232 error = create_lkb(ls, &lkb); 3233 3234 if (error) 3235 goto out; 3236 3237 trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags); 3238 3239 error = set_lock_args(mode, lksb, flags, namelen, ast, astarg, bast, 3240 &args); 3241 if (error) 3242 goto out_put; 3243 3244 if (convert) 3245 error = convert_lock(ls, lkb, &args); 3246 else 3247 error = request_lock(ls, lkb, name, namelen, &args); 3248 3249 if (error == -EINPROGRESS) 3250 error = 0; 3251 out_put: 3252 trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, true); 3253 3254 if (convert || error) 3255 __put_lkb(ls, lkb); 3256 if (error == -EAGAIN || error == -EDEADLK) 3257 error = 0; 3258 out: 3259 dlm_unlock_recovery(ls); 3260 dlm_put_lockspace(ls); 3261 return error; 3262 } 3263 3264 int dlm_unlock(dlm_lockspace_t *lockspace, 3265 uint32_t lkid, 3266 uint32_t flags, 3267 struct dlm_lksb *lksb, 3268 void *astarg) 3269 { 3270 struct dlm_ls *ls; 3271 struct dlm_lkb *lkb; 3272 struct dlm_args args; 3273 int error; 3274 3275 ls = dlm_find_lockspace_local(lockspace); 3276 if (!ls) 3277 return -EINVAL; 3278 3279 dlm_lock_recovery(ls); 3280 3281 error = find_lkb(ls, lkid, &lkb); 3282 if (error) 3283 goto out; 3284 3285 trace_dlm_unlock_start(ls, lkb, flags); 3286 3287 error = set_unlock_args(flags, astarg, &args); 3288 if (error) 3289 goto out_put; 3290 3291 if (flags & DLM_LKF_CANCEL) 3292 error = cancel_lock(ls, lkb, &args); 3293 else 3294 error = unlock_lock(ls, lkb, &args); 3295 3296 if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL) 3297 error = 0; 3298 if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK))) 3299 error = 0; 3300 out_put: 3301 trace_dlm_unlock_end(ls, lkb, flags, error); 3302 3303 dlm_put_lkb(lkb); 3304 out: 3305 dlm_unlock_recovery(ls); 3306 dlm_put_lockspace(ls); 3307 return error; 3308 } 3309 3310 /* 3311 * send/receive routines for remote operations and replies 3312 * 3313 * send_args 3314 * send_common 3315 * send_request receive_request 3316 * send_convert receive_convert 3317 * send_unlock receive_unlock 3318 * send_cancel receive_cancel 3319 * send_grant receive_grant 3320 * send_bast receive_bast 3321 * send_lookup receive_lookup 3322 * send_remove receive_remove 3323 * 3324 * send_common_reply 3325 * receive_request_reply send_request_reply 3326 * receive_convert_reply send_convert_reply 3327 * receive_unlock_reply send_unlock_reply 3328 * receive_cancel_reply send_cancel_reply 3329 * receive_lookup_reply send_lookup_reply 3330 */ 3331 3332 static int _create_message(struct dlm_ls *ls, int mb_len, 3333 int to_nodeid, int mstype, 3334 struct dlm_message **ms_ret, 3335 struct dlm_mhandle **mh_ret, 3336 gfp_t allocation) 3337 { 3338 struct dlm_message *ms; 3339 struct dlm_mhandle *mh; 3340 char *mb; 3341 3342 /* get_buffer gives us a message handle (mh) that we need to 3343 pass into midcomms_commit and a message buffer (mb) that we 3344 write our data into */ 3345 3346 mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, allocation, &mb); 3347 if (!mh) 3348 return -ENOBUFS; 3349 3350 ms = (struct dlm_message *) mb; 3351 3352 ms->m_header.h_version = cpu_to_le32(DLM_HEADER_MAJOR | DLM_HEADER_MINOR); 3353 ms->m_header.u.h_lockspace = cpu_to_le32(ls->ls_global_id); 3354 ms->m_header.h_nodeid = cpu_to_le32(dlm_our_nodeid()); 3355 ms->m_header.h_length = cpu_to_le16(mb_len); 3356 ms->m_header.h_cmd = DLM_MSG; 3357 3358 ms->m_type = cpu_to_le32(mstype); 3359 3360 *mh_ret = mh; 3361 *ms_ret = ms; 3362 return 0; 3363 } 3364 3365 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb, 3366 int to_nodeid, int mstype, 3367 struct dlm_message **ms_ret, 3368 struct dlm_mhandle **mh_ret, 3369 gfp_t allocation) 3370 { 3371 int mb_len = sizeof(struct dlm_message); 3372 3373 switch (mstype) { 3374 case DLM_MSG_REQUEST: 3375 case DLM_MSG_LOOKUP: 3376 case DLM_MSG_REMOVE: 3377 mb_len += r->res_length; 3378 break; 3379 case DLM_MSG_CONVERT: 3380 case DLM_MSG_UNLOCK: 3381 case DLM_MSG_REQUEST_REPLY: 3382 case DLM_MSG_CONVERT_REPLY: 3383 case DLM_MSG_GRANT: 3384 if (lkb && lkb->lkb_lvbptr && (lkb->lkb_exflags & DLM_LKF_VALBLK)) 3385 mb_len += r->res_ls->ls_lvblen; 3386 break; 3387 } 3388 3389 return _create_message(r->res_ls, mb_len, to_nodeid, mstype, 3390 ms_ret, mh_ret, allocation); 3391 } 3392 3393 /* further lowcomms enhancements or alternate implementations may make 3394 the return value from this function useful at some point */ 3395 3396 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms, 3397 const void *name, int namelen) 3398 { 3399 dlm_midcomms_commit_mhandle(mh, name, namelen); 3400 return 0; 3401 } 3402 3403 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb, 3404 struct dlm_message *ms) 3405 { 3406 ms->m_nodeid = cpu_to_le32(lkb->lkb_nodeid); 3407 ms->m_pid = cpu_to_le32(lkb->lkb_ownpid); 3408 ms->m_lkid = cpu_to_le32(lkb->lkb_id); 3409 ms->m_remid = cpu_to_le32(lkb->lkb_remid); 3410 ms->m_exflags = cpu_to_le32(lkb->lkb_exflags); 3411 ms->m_sbflags = cpu_to_le32(dlm_sbflags_val(lkb)); 3412 ms->m_flags = cpu_to_le32(dlm_dflags_val(lkb)); 3413 ms->m_lvbseq = cpu_to_le32(lkb->lkb_lvbseq); 3414 ms->m_status = cpu_to_le32(lkb->lkb_status); 3415 ms->m_grmode = cpu_to_le32(lkb->lkb_grmode); 3416 ms->m_rqmode = cpu_to_le32(lkb->lkb_rqmode); 3417 ms->m_hash = cpu_to_le32(r->res_hash); 3418 3419 /* m_result and m_bastmode are set from function args, 3420 not from lkb fields */ 3421 3422 if (lkb->lkb_bastfn) 3423 ms->m_asts |= cpu_to_le32(DLM_CB_BAST); 3424 if (lkb->lkb_astfn) 3425 ms->m_asts |= cpu_to_le32(DLM_CB_CAST); 3426 3427 /* compare with switch in create_message; send_remove() doesn't 3428 use send_args() */ 3429 3430 switch (ms->m_type) { 3431 case cpu_to_le32(DLM_MSG_REQUEST): 3432 case cpu_to_le32(DLM_MSG_LOOKUP): 3433 memcpy(ms->m_extra, r->res_name, r->res_length); 3434 break; 3435 case cpu_to_le32(DLM_MSG_CONVERT): 3436 case cpu_to_le32(DLM_MSG_UNLOCK): 3437 case cpu_to_le32(DLM_MSG_REQUEST_REPLY): 3438 case cpu_to_le32(DLM_MSG_CONVERT_REPLY): 3439 case cpu_to_le32(DLM_MSG_GRANT): 3440 if (!lkb->lkb_lvbptr || !(lkb->lkb_exflags & DLM_LKF_VALBLK)) 3441 break; 3442 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen); 3443 break; 3444 } 3445 } 3446 3447 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype) 3448 { 3449 struct dlm_message *ms; 3450 struct dlm_mhandle *mh; 3451 int to_nodeid, error; 3452 3453 to_nodeid = r->res_nodeid; 3454 3455 error = add_to_waiters(lkb, mstype, to_nodeid); 3456 if (error) 3457 return error; 3458 3459 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh, GFP_NOFS); 3460 if (error) 3461 goto fail; 3462 3463 send_args(r, lkb, ms); 3464 3465 error = send_message(mh, ms, r->res_name, r->res_length); 3466 if (error) 3467 goto fail; 3468 return 0; 3469 3470 fail: 3471 remove_from_waiters(lkb, msg_reply_type(mstype)); 3472 return error; 3473 } 3474 3475 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb) 3476 { 3477 return send_common(r, lkb, DLM_MSG_REQUEST); 3478 } 3479 3480 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb) 3481 { 3482 int error; 3483 3484 error = send_common(r, lkb, DLM_MSG_CONVERT); 3485 3486 /* down conversions go without a reply from the master */ 3487 if (!error && down_conversion(lkb)) { 3488 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY); 3489 r->res_ls->ls_local_ms.m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY); 3490 r->res_ls->ls_local_ms.m_result = 0; 3491 __receive_convert_reply(r, lkb, &r->res_ls->ls_local_ms, true); 3492 } 3493 3494 return error; 3495 } 3496 3497 /* FIXME: if this lkb is the only lock we hold on the rsb, then set 3498 MASTER_UNCERTAIN to force the next request on the rsb to confirm 3499 that the master is still correct. */ 3500 3501 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb) 3502 { 3503 return send_common(r, lkb, DLM_MSG_UNLOCK); 3504 } 3505 3506 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb) 3507 { 3508 return send_common(r, lkb, DLM_MSG_CANCEL); 3509 } 3510 3511 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb) 3512 { 3513 struct dlm_message *ms; 3514 struct dlm_mhandle *mh; 3515 int to_nodeid, error; 3516 3517 to_nodeid = lkb->lkb_nodeid; 3518 3519 error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh, 3520 GFP_NOFS); 3521 if (error) 3522 goto out; 3523 3524 send_args(r, lkb, ms); 3525 3526 ms->m_result = 0; 3527 3528 error = send_message(mh, ms, r->res_name, r->res_length); 3529 out: 3530 return error; 3531 } 3532 3533 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode) 3534 { 3535 struct dlm_message *ms; 3536 struct dlm_mhandle *mh; 3537 int to_nodeid, error; 3538 3539 to_nodeid = lkb->lkb_nodeid; 3540 3541 error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh, 3542 GFP_NOFS); 3543 if (error) 3544 goto out; 3545 3546 send_args(r, lkb, ms); 3547 3548 ms->m_bastmode = cpu_to_le32(mode); 3549 3550 error = send_message(mh, ms, r->res_name, r->res_length); 3551 out: 3552 return error; 3553 } 3554 3555 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb) 3556 { 3557 struct dlm_message *ms; 3558 struct dlm_mhandle *mh; 3559 int to_nodeid, error; 3560 3561 to_nodeid = dlm_dir_nodeid(r); 3562 3563 error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid); 3564 if (error) 3565 return error; 3566 3567 error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh, 3568 GFP_NOFS); 3569 if (error) 3570 goto fail; 3571 3572 send_args(r, lkb, ms); 3573 3574 error = send_message(mh, ms, r->res_name, r->res_length); 3575 if (error) 3576 goto fail; 3577 return 0; 3578 3579 fail: 3580 remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY); 3581 return error; 3582 } 3583 3584 static int send_remove(struct dlm_rsb *r) 3585 { 3586 struct dlm_message *ms; 3587 struct dlm_mhandle *mh; 3588 int to_nodeid, error; 3589 3590 to_nodeid = dlm_dir_nodeid(r); 3591 3592 error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh, 3593 GFP_ATOMIC); 3594 if (error) 3595 goto out; 3596 3597 memcpy(ms->m_extra, r->res_name, r->res_length); 3598 ms->m_hash = cpu_to_le32(r->res_hash); 3599 3600 error = send_message(mh, ms, r->res_name, r->res_length); 3601 out: 3602 return error; 3603 } 3604 3605 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, 3606 int mstype, int rv) 3607 { 3608 struct dlm_message *ms; 3609 struct dlm_mhandle *mh; 3610 int to_nodeid, error; 3611 3612 to_nodeid = lkb->lkb_nodeid; 3613 3614 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh, GFP_NOFS); 3615 if (error) 3616 goto out; 3617 3618 send_args(r, lkb, ms); 3619 3620 ms->m_result = cpu_to_le32(to_dlm_errno(rv)); 3621 3622 error = send_message(mh, ms, r->res_name, r->res_length); 3623 out: 3624 return error; 3625 } 3626 3627 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) 3628 { 3629 return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv); 3630 } 3631 3632 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) 3633 { 3634 return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv); 3635 } 3636 3637 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) 3638 { 3639 return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv); 3640 } 3641 3642 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) 3643 { 3644 return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv); 3645 } 3646 3647 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in, 3648 int ret_nodeid, int rv) 3649 { 3650 struct dlm_rsb *r = &ls->ls_local_rsb; 3651 struct dlm_message *ms; 3652 struct dlm_mhandle *mh; 3653 int error, nodeid = le32_to_cpu(ms_in->m_header.h_nodeid); 3654 3655 error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh, 3656 GFP_NOFS); 3657 if (error) 3658 goto out; 3659 3660 ms->m_lkid = ms_in->m_lkid; 3661 ms->m_result = cpu_to_le32(to_dlm_errno(rv)); 3662 ms->m_nodeid = cpu_to_le32(ret_nodeid); 3663 3664 error = send_message(mh, ms, ms_in->m_extra, receive_extralen(ms_in)); 3665 out: 3666 return error; 3667 } 3668 3669 /* which args we save from a received message depends heavily on the type 3670 of message, unlike the send side where we can safely send everything about 3671 the lkb for any type of message */ 3672 3673 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms) 3674 { 3675 lkb->lkb_exflags = le32_to_cpu(ms->m_exflags); 3676 dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags)); 3677 dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags)); 3678 } 3679 3680 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms, 3681 bool local) 3682 { 3683 if (local) 3684 return; 3685 3686 dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags)); 3687 dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags)); 3688 } 3689 3690 static int receive_extralen(struct dlm_message *ms) 3691 { 3692 return (le16_to_cpu(ms->m_header.h_length) - 3693 sizeof(struct dlm_message)); 3694 } 3695 3696 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb, 3697 struct dlm_message *ms) 3698 { 3699 int len; 3700 3701 if (lkb->lkb_exflags & DLM_LKF_VALBLK) { 3702 if (!lkb->lkb_lvbptr) 3703 lkb->lkb_lvbptr = dlm_allocate_lvb(ls); 3704 if (!lkb->lkb_lvbptr) 3705 return -ENOMEM; 3706 len = receive_extralen(ms); 3707 if (len > ls->ls_lvblen) 3708 len = ls->ls_lvblen; 3709 memcpy(lkb->lkb_lvbptr, ms->m_extra, len); 3710 } 3711 return 0; 3712 } 3713 3714 static void fake_bastfn(void *astparam, int mode) 3715 { 3716 log_print("fake_bastfn should not be called"); 3717 } 3718 3719 static void fake_astfn(void *astparam) 3720 { 3721 log_print("fake_astfn should not be called"); 3722 } 3723 3724 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb, 3725 struct dlm_message *ms) 3726 { 3727 lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid); 3728 lkb->lkb_ownpid = le32_to_cpu(ms->m_pid); 3729 lkb->lkb_remid = le32_to_cpu(ms->m_lkid); 3730 lkb->lkb_grmode = DLM_LOCK_IV; 3731 lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode); 3732 3733 lkb->lkb_bastfn = (ms->m_asts & cpu_to_le32(DLM_CB_BAST)) ? &fake_bastfn : NULL; 3734 lkb->lkb_astfn = (ms->m_asts & cpu_to_le32(DLM_CB_CAST)) ? &fake_astfn : NULL; 3735 3736 if (lkb->lkb_exflags & DLM_LKF_VALBLK) { 3737 /* lkb was just created so there won't be an lvb yet */ 3738 lkb->lkb_lvbptr = dlm_allocate_lvb(ls); 3739 if (!lkb->lkb_lvbptr) 3740 return -ENOMEM; 3741 } 3742 3743 return 0; 3744 } 3745 3746 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb, 3747 struct dlm_message *ms) 3748 { 3749 if (lkb->lkb_status != DLM_LKSTS_GRANTED) 3750 return -EBUSY; 3751 3752 if (receive_lvb(ls, lkb, ms)) 3753 return -ENOMEM; 3754 3755 lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode); 3756 lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq); 3757 3758 return 0; 3759 } 3760 3761 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, 3762 struct dlm_message *ms) 3763 { 3764 if (receive_lvb(ls, lkb, ms)) 3765 return -ENOMEM; 3766 return 0; 3767 } 3768 3769 /* We fill in the local-lkb fields with the info that send_xxxx_reply() 3770 uses to send a reply and that the remote end uses to process the reply. */ 3771 3772 static void setup_local_lkb(struct dlm_ls *ls, struct dlm_message *ms) 3773 { 3774 struct dlm_lkb *lkb = &ls->ls_local_lkb; 3775 lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid); 3776 lkb->lkb_remid = le32_to_cpu(ms->m_lkid); 3777 } 3778 3779 /* This is called after the rsb is locked so that we can safely inspect 3780 fields in the lkb. */ 3781 3782 static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms) 3783 { 3784 int from = le32_to_cpu(ms->m_header.h_nodeid); 3785 int error = 0; 3786 3787 /* currently mixing of user/kernel locks are not supported */ 3788 if (ms->m_flags & cpu_to_le32(BIT(DLM_DFL_USER_BIT)) && 3789 !test_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags)) { 3790 log_error(lkb->lkb_resource->res_ls, 3791 "got user dlm message for a kernel lock"); 3792 error = -EINVAL; 3793 goto out; 3794 } 3795 3796 switch (ms->m_type) { 3797 case cpu_to_le32(DLM_MSG_CONVERT): 3798 case cpu_to_le32(DLM_MSG_UNLOCK): 3799 case cpu_to_le32(DLM_MSG_CANCEL): 3800 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from) 3801 error = -EINVAL; 3802 break; 3803 3804 case cpu_to_le32(DLM_MSG_CONVERT_REPLY): 3805 case cpu_to_le32(DLM_MSG_UNLOCK_REPLY): 3806 case cpu_to_le32(DLM_MSG_CANCEL_REPLY): 3807 case cpu_to_le32(DLM_MSG_GRANT): 3808 case cpu_to_le32(DLM_MSG_BAST): 3809 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from) 3810 error = -EINVAL; 3811 break; 3812 3813 case cpu_to_le32(DLM_MSG_REQUEST_REPLY): 3814 if (!is_process_copy(lkb)) 3815 error = -EINVAL; 3816 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from) 3817 error = -EINVAL; 3818 break; 3819 3820 default: 3821 error = -EINVAL; 3822 } 3823 3824 out: 3825 if (error) 3826 log_error(lkb->lkb_resource->res_ls, 3827 "ignore invalid message %d from %d %x %x %x %d", 3828 le32_to_cpu(ms->m_type), from, lkb->lkb_id, 3829 lkb->lkb_remid, dlm_iflags_val(lkb), 3830 lkb->lkb_nodeid); 3831 return error; 3832 } 3833 3834 static int receive_request(struct dlm_ls *ls, struct dlm_message *ms) 3835 { 3836 struct dlm_lkb *lkb; 3837 struct dlm_rsb *r; 3838 int from_nodeid; 3839 int error, namelen = 0; 3840 3841 from_nodeid = le32_to_cpu(ms->m_header.h_nodeid); 3842 3843 error = create_lkb(ls, &lkb); 3844 if (error) 3845 goto fail; 3846 3847 receive_flags(lkb, ms); 3848 set_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags); 3849 error = receive_request_args(ls, lkb, ms); 3850 if (error) { 3851 __put_lkb(ls, lkb); 3852 goto fail; 3853 } 3854 3855 /* The dir node is the authority on whether we are the master 3856 for this rsb or not, so if the master sends us a request, we should 3857 recreate the rsb if we've destroyed it. This race happens when we 3858 send a remove message to the dir node at the same time that the dir 3859 node sends us a request for the rsb. */ 3860 3861 namelen = receive_extralen(ms); 3862 3863 error = find_rsb(ls, ms->m_extra, namelen, from_nodeid, 3864 R_RECEIVE_REQUEST, &r); 3865 if (error) { 3866 __put_lkb(ls, lkb); 3867 goto fail; 3868 } 3869 3870 lock_rsb(r); 3871 3872 if (r->res_master_nodeid != dlm_our_nodeid()) { 3873 error = validate_master_nodeid(ls, r, from_nodeid); 3874 if (error) { 3875 unlock_rsb(r); 3876 put_rsb(r); 3877 __put_lkb(ls, lkb); 3878 goto fail; 3879 } 3880 } 3881 3882 attach_lkb(r, lkb); 3883 error = do_request(r, lkb); 3884 send_request_reply(r, lkb, error); 3885 do_request_effects(r, lkb, error); 3886 3887 unlock_rsb(r); 3888 put_rsb(r); 3889 3890 if (error == -EINPROGRESS) 3891 error = 0; 3892 if (error) 3893 dlm_put_lkb(lkb); 3894 return 0; 3895 3896 fail: 3897 /* TODO: instead of returning ENOTBLK, add the lkb to res_lookup 3898 and do this receive_request again from process_lookup_list once 3899 we get the lookup reply. This would avoid a many repeated 3900 ENOTBLK request failures when the lookup reply designating us 3901 as master is delayed. */ 3902 3903 if (error != -ENOTBLK) { 3904 log_limit(ls, "receive_request %x from %d %d", 3905 le32_to_cpu(ms->m_lkid), from_nodeid, error); 3906 } 3907 3908 setup_local_lkb(ls, ms); 3909 send_request_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error); 3910 return error; 3911 } 3912 3913 static int receive_convert(struct dlm_ls *ls, struct dlm_message *ms) 3914 { 3915 struct dlm_lkb *lkb; 3916 struct dlm_rsb *r; 3917 int error, reply = 1; 3918 3919 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb); 3920 if (error) 3921 goto fail; 3922 3923 if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) { 3924 log_error(ls, "receive_convert %x remid %x recover_seq %llu " 3925 "remote %d %x", lkb->lkb_id, lkb->lkb_remid, 3926 (unsigned long long)lkb->lkb_recover_seq, 3927 le32_to_cpu(ms->m_header.h_nodeid), 3928 le32_to_cpu(ms->m_lkid)); 3929 error = -ENOENT; 3930 dlm_put_lkb(lkb); 3931 goto fail; 3932 } 3933 3934 r = lkb->lkb_resource; 3935 3936 hold_rsb(r); 3937 lock_rsb(r); 3938 3939 error = validate_message(lkb, ms); 3940 if (error) 3941 goto out; 3942 3943 receive_flags(lkb, ms); 3944 3945 error = receive_convert_args(ls, lkb, ms); 3946 if (error) { 3947 send_convert_reply(r, lkb, error); 3948 goto out; 3949 } 3950 3951 reply = !down_conversion(lkb); 3952 3953 error = do_convert(r, lkb); 3954 if (reply) 3955 send_convert_reply(r, lkb, error); 3956 do_convert_effects(r, lkb, error); 3957 out: 3958 unlock_rsb(r); 3959 put_rsb(r); 3960 dlm_put_lkb(lkb); 3961 return 0; 3962 3963 fail: 3964 setup_local_lkb(ls, ms); 3965 send_convert_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error); 3966 return error; 3967 } 3968 3969 static int receive_unlock(struct dlm_ls *ls, struct dlm_message *ms) 3970 { 3971 struct dlm_lkb *lkb; 3972 struct dlm_rsb *r; 3973 int error; 3974 3975 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb); 3976 if (error) 3977 goto fail; 3978 3979 if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) { 3980 log_error(ls, "receive_unlock %x remid %x remote %d %x", 3981 lkb->lkb_id, lkb->lkb_remid, 3982 le32_to_cpu(ms->m_header.h_nodeid), 3983 le32_to_cpu(ms->m_lkid)); 3984 error = -ENOENT; 3985 dlm_put_lkb(lkb); 3986 goto fail; 3987 } 3988 3989 r = lkb->lkb_resource; 3990 3991 hold_rsb(r); 3992 lock_rsb(r); 3993 3994 error = validate_message(lkb, ms); 3995 if (error) 3996 goto out; 3997 3998 receive_flags(lkb, ms); 3999 4000 error = receive_unlock_args(ls, lkb, ms); 4001 if (error) { 4002 send_unlock_reply(r, lkb, error); 4003 goto out; 4004 } 4005 4006 error = do_unlock(r, lkb); 4007 send_unlock_reply(r, lkb, error); 4008 do_unlock_effects(r, lkb, error); 4009 out: 4010 unlock_rsb(r); 4011 put_rsb(r); 4012 dlm_put_lkb(lkb); 4013 return 0; 4014 4015 fail: 4016 setup_local_lkb(ls, ms); 4017 send_unlock_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error); 4018 return error; 4019 } 4020 4021 static int receive_cancel(struct dlm_ls *ls, struct dlm_message *ms) 4022 { 4023 struct dlm_lkb *lkb; 4024 struct dlm_rsb *r; 4025 int error; 4026 4027 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb); 4028 if (error) 4029 goto fail; 4030 4031 receive_flags(lkb, ms); 4032 4033 r = lkb->lkb_resource; 4034 4035 hold_rsb(r); 4036 lock_rsb(r); 4037 4038 error = validate_message(lkb, ms); 4039 if (error) 4040 goto out; 4041 4042 error = do_cancel(r, lkb); 4043 send_cancel_reply(r, lkb, error); 4044 do_cancel_effects(r, lkb, error); 4045 out: 4046 unlock_rsb(r); 4047 put_rsb(r); 4048 dlm_put_lkb(lkb); 4049 return 0; 4050 4051 fail: 4052 setup_local_lkb(ls, ms); 4053 send_cancel_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error); 4054 return error; 4055 } 4056 4057 static int receive_grant(struct dlm_ls *ls, struct dlm_message *ms) 4058 { 4059 struct dlm_lkb *lkb; 4060 struct dlm_rsb *r; 4061 int error; 4062 4063 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb); 4064 if (error) 4065 return error; 4066 4067 r = lkb->lkb_resource; 4068 4069 hold_rsb(r); 4070 lock_rsb(r); 4071 4072 error = validate_message(lkb, ms); 4073 if (error) 4074 goto out; 4075 4076 receive_flags_reply(lkb, ms, false); 4077 if (is_altmode(lkb)) 4078 munge_altmode(lkb, ms); 4079 grant_lock_pc(r, lkb, ms); 4080 queue_cast(r, lkb, 0); 4081 out: 4082 unlock_rsb(r); 4083 put_rsb(r); 4084 dlm_put_lkb(lkb); 4085 return 0; 4086 } 4087 4088 static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms) 4089 { 4090 struct dlm_lkb *lkb; 4091 struct dlm_rsb *r; 4092 int error; 4093 4094 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb); 4095 if (error) 4096 return error; 4097 4098 r = lkb->lkb_resource; 4099 4100 hold_rsb(r); 4101 lock_rsb(r); 4102 4103 error = validate_message(lkb, ms); 4104 if (error) 4105 goto out; 4106 4107 queue_bast(r, lkb, le32_to_cpu(ms->m_bastmode)); 4108 lkb->lkb_highbast = le32_to_cpu(ms->m_bastmode); 4109 out: 4110 unlock_rsb(r); 4111 put_rsb(r); 4112 dlm_put_lkb(lkb); 4113 return 0; 4114 } 4115 4116 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms) 4117 { 4118 int len, error, ret_nodeid, from_nodeid, our_nodeid; 4119 4120 from_nodeid = le32_to_cpu(ms->m_header.h_nodeid); 4121 our_nodeid = dlm_our_nodeid(); 4122 4123 len = receive_extralen(ms); 4124 4125 error = dlm_master_lookup(ls, from_nodeid, ms->m_extra, len, 0, 4126 &ret_nodeid, NULL); 4127 4128 /* Optimization: we're master so treat lookup as a request */ 4129 if (!error && ret_nodeid == our_nodeid) { 4130 receive_request(ls, ms); 4131 return; 4132 } 4133 send_lookup_reply(ls, ms, ret_nodeid, error); 4134 } 4135 4136 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms) 4137 { 4138 char name[DLM_RESNAME_MAXLEN+1]; 4139 struct dlm_rsb *r; 4140 uint32_t hash, b; 4141 int rv, len, dir_nodeid, from_nodeid; 4142 4143 from_nodeid = le32_to_cpu(ms->m_header.h_nodeid); 4144 4145 len = receive_extralen(ms); 4146 4147 if (len > DLM_RESNAME_MAXLEN) { 4148 log_error(ls, "receive_remove from %d bad len %d", 4149 from_nodeid, len); 4150 return; 4151 } 4152 4153 dir_nodeid = dlm_hash2nodeid(ls, le32_to_cpu(ms->m_hash)); 4154 if (dir_nodeid != dlm_our_nodeid()) { 4155 log_error(ls, "receive_remove from %d bad nodeid %d", 4156 from_nodeid, dir_nodeid); 4157 return; 4158 } 4159 4160 /* Look for name on rsbtbl.toss, if it's there, kill it. 4161 If it's on rsbtbl.keep, it's being used, and we should ignore this 4162 message. This is an expected race between the dir node sending a 4163 request to the master node at the same time as the master node sends 4164 a remove to the dir node. The resolution to that race is for the 4165 dir node to ignore the remove message, and the master node to 4166 recreate the master rsb when it gets a request from the dir node for 4167 an rsb it doesn't have. */ 4168 4169 memset(name, 0, sizeof(name)); 4170 memcpy(name, ms->m_extra, len); 4171 4172 hash = jhash(name, len, 0); 4173 b = hash & (ls->ls_rsbtbl_size - 1); 4174 4175 spin_lock(&ls->ls_rsbtbl[b].lock); 4176 4177 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); 4178 if (rv) { 4179 /* verify the rsb is on keep list per comment above */ 4180 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); 4181 if (rv) { 4182 /* should not happen */ 4183 log_error(ls, "receive_remove from %d not found %s", 4184 from_nodeid, name); 4185 spin_unlock(&ls->ls_rsbtbl[b].lock); 4186 return; 4187 } 4188 if (r->res_master_nodeid != from_nodeid) { 4189 /* should not happen */ 4190 log_error(ls, "receive_remove keep from %d master %d", 4191 from_nodeid, r->res_master_nodeid); 4192 dlm_print_rsb(r); 4193 spin_unlock(&ls->ls_rsbtbl[b].lock); 4194 return; 4195 } 4196 4197 log_debug(ls, "receive_remove from %d master %d first %x %s", 4198 from_nodeid, r->res_master_nodeid, r->res_first_lkid, 4199 name); 4200 spin_unlock(&ls->ls_rsbtbl[b].lock); 4201 return; 4202 } 4203 4204 if (r->res_master_nodeid != from_nodeid) { 4205 log_error(ls, "receive_remove toss from %d master %d", 4206 from_nodeid, r->res_master_nodeid); 4207 dlm_print_rsb(r); 4208 spin_unlock(&ls->ls_rsbtbl[b].lock); 4209 return; 4210 } 4211 4212 if (kref_put(&r->res_ref, kill_rsb)) { 4213 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); 4214 spin_unlock(&ls->ls_rsbtbl[b].lock); 4215 dlm_free_rsb(r); 4216 } else { 4217 log_error(ls, "receive_remove from %d rsb ref error", 4218 from_nodeid); 4219 dlm_print_rsb(r); 4220 spin_unlock(&ls->ls_rsbtbl[b].lock); 4221 } 4222 } 4223 4224 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms) 4225 { 4226 do_purge(ls, le32_to_cpu(ms->m_nodeid), le32_to_cpu(ms->m_pid)); 4227 } 4228 4229 static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) 4230 { 4231 struct dlm_lkb *lkb; 4232 struct dlm_rsb *r; 4233 int error, mstype, result; 4234 int from_nodeid = le32_to_cpu(ms->m_header.h_nodeid); 4235 4236 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb); 4237 if (error) 4238 return error; 4239 4240 r = lkb->lkb_resource; 4241 hold_rsb(r); 4242 lock_rsb(r); 4243 4244 error = validate_message(lkb, ms); 4245 if (error) 4246 goto out; 4247 4248 mstype = lkb->lkb_wait_type; 4249 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY); 4250 if (error) { 4251 log_error(ls, "receive_request_reply %x remote %d %x result %d", 4252 lkb->lkb_id, from_nodeid, le32_to_cpu(ms->m_lkid), 4253 from_dlm_errno(le32_to_cpu(ms->m_result))); 4254 dlm_dump_rsb(r); 4255 goto out; 4256 } 4257 4258 /* Optimization: the dir node was also the master, so it took our 4259 lookup as a request and sent request reply instead of lookup reply */ 4260 if (mstype == DLM_MSG_LOOKUP) { 4261 r->res_master_nodeid = from_nodeid; 4262 r->res_nodeid = from_nodeid; 4263 lkb->lkb_nodeid = from_nodeid; 4264 } 4265 4266 /* this is the value returned from do_request() on the master */ 4267 result = from_dlm_errno(le32_to_cpu(ms->m_result)); 4268 4269 switch (result) { 4270 case -EAGAIN: 4271 /* request would block (be queued) on remote master */ 4272 queue_cast(r, lkb, -EAGAIN); 4273 confirm_master(r, -EAGAIN); 4274 unhold_lkb(lkb); /* undoes create_lkb() */ 4275 break; 4276 4277 case -EINPROGRESS: 4278 case 0: 4279 /* request was queued or granted on remote master */ 4280 receive_flags_reply(lkb, ms, false); 4281 lkb->lkb_remid = le32_to_cpu(ms->m_lkid); 4282 if (is_altmode(lkb)) 4283 munge_altmode(lkb, ms); 4284 if (result) { 4285 add_lkb(r, lkb, DLM_LKSTS_WAITING); 4286 } else { 4287 grant_lock_pc(r, lkb, ms); 4288 queue_cast(r, lkb, 0); 4289 } 4290 confirm_master(r, result); 4291 break; 4292 4293 case -EBADR: 4294 case -ENOTBLK: 4295 /* find_rsb failed to find rsb or rsb wasn't master */ 4296 log_limit(ls, "receive_request_reply %x from %d %d " 4297 "master %d dir %d first %x %s", lkb->lkb_id, 4298 from_nodeid, result, r->res_master_nodeid, 4299 r->res_dir_nodeid, r->res_first_lkid, r->res_name); 4300 4301 if (r->res_dir_nodeid != dlm_our_nodeid() && 4302 r->res_master_nodeid != dlm_our_nodeid()) { 4303 /* cause _request_lock->set_master->send_lookup */ 4304 r->res_master_nodeid = 0; 4305 r->res_nodeid = -1; 4306 lkb->lkb_nodeid = -1; 4307 } 4308 4309 if (is_overlap(lkb)) { 4310 /* we'll ignore error in cancel/unlock reply */ 4311 queue_cast_overlap(r, lkb); 4312 confirm_master(r, result); 4313 unhold_lkb(lkb); /* undoes create_lkb() */ 4314 } else { 4315 _request_lock(r, lkb); 4316 4317 if (r->res_master_nodeid == dlm_our_nodeid()) 4318 confirm_master(r, 0); 4319 } 4320 break; 4321 4322 default: 4323 log_error(ls, "receive_request_reply %x error %d", 4324 lkb->lkb_id, result); 4325 } 4326 4327 if ((result == 0 || result == -EINPROGRESS) && 4328 test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) { 4329 log_debug(ls, "receive_request_reply %x result %d unlock", 4330 lkb->lkb_id, result); 4331 clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags); 4332 send_unlock(r, lkb); 4333 } else if ((result == -EINPROGRESS) && 4334 test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, 4335 &lkb->lkb_iflags)) { 4336 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id); 4337 clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags); 4338 send_cancel(r, lkb); 4339 } else { 4340 clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags); 4341 clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags); 4342 } 4343 out: 4344 unlock_rsb(r); 4345 put_rsb(r); 4346 dlm_put_lkb(lkb); 4347 return 0; 4348 } 4349 4350 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, 4351 struct dlm_message *ms, bool local) 4352 { 4353 /* this is the value returned from do_convert() on the master */ 4354 switch (from_dlm_errno(le32_to_cpu(ms->m_result))) { 4355 case -EAGAIN: 4356 /* convert would block (be queued) on remote master */ 4357 queue_cast(r, lkb, -EAGAIN); 4358 break; 4359 4360 case -EDEADLK: 4361 receive_flags_reply(lkb, ms, local); 4362 revert_lock_pc(r, lkb); 4363 queue_cast(r, lkb, -EDEADLK); 4364 break; 4365 4366 case -EINPROGRESS: 4367 /* convert was queued on remote master */ 4368 receive_flags_reply(lkb, ms, local); 4369 if (is_demoted(lkb)) 4370 munge_demoted(lkb); 4371 del_lkb(r, lkb); 4372 add_lkb(r, lkb, DLM_LKSTS_CONVERT); 4373 break; 4374 4375 case 0: 4376 /* convert was granted on remote master */ 4377 receive_flags_reply(lkb, ms, local); 4378 if (is_demoted(lkb)) 4379 munge_demoted(lkb); 4380 grant_lock_pc(r, lkb, ms); 4381 queue_cast(r, lkb, 0); 4382 break; 4383 4384 default: 4385 log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d", 4386 lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid), 4387 le32_to_cpu(ms->m_lkid), 4388 from_dlm_errno(le32_to_cpu(ms->m_result))); 4389 dlm_print_rsb(r); 4390 dlm_print_lkb(lkb); 4391 } 4392 } 4393 4394 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms, 4395 bool local) 4396 { 4397 struct dlm_rsb *r = lkb->lkb_resource; 4398 int error; 4399 4400 hold_rsb(r); 4401 lock_rsb(r); 4402 4403 error = validate_message(lkb, ms); 4404 if (error) 4405 goto out; 4406 4407 /* local reply can happen with waiters_mutex held */ 4408 error = remove_from_waiters_ms(lkb, ms, local); 4409 if (error) 4410 goto out; 4411 4412 __receive_convert_reply(r, lkb, ms, local); 4413 out: 4414 unlock_rsb(r); 4415 put_rsb(r); 4416 } 4417 4418 static int receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms) 4419 { 4420 struct dlm_lkb *lkb; 4421 int error; 4422 4423 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb); 4424 if (error) 4425 return error; 4426 4427 _receive_convert_reply(lkb, ms, false); 4428 dlm_put_lkb(lkb); 4429 return 0; 4430 } 4431 4432 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms, 4433 bool local) 4434 { 4435 struct dlm_rsb *r = lkb->lkb_resource; 4436 int error; 4437 4438 hold_rsb(r); 4439 lock_rsb(r); 4440 4441 error = validate_message(lkb, ms); 4442 if (error) 4443 goto out; 4444 4445 /* local reply can happen with waiters_mutex held */ 4446 error = remove_from_waiters_ms(lkb, ms, local); 4447 if (error) 4448 goto out; 4449 4450 /* this is the value returned from do_unlock() on the master */ 4451 4452 switch (from_dlm_errno(le32_to_cpu(ms->m_result))) { 4453 case -DLM_EUNLOCK: 4454 receive_flags_reply(lkb, ms, local); 4455 remove_lock_pc(r, lkb); 4456 queue_cast(r, lkb, -DLM_EUNLOCK); 4457 break; 4458 case -ENOENT: 4459 break; 4460 default: 4461 log_error(r->res_ls, "receive_unlock_reply %x error %d", 4462 lkb->lkb_id, from_dlm_errno(le32_to_cpu(ms->m_result))); 4463 } 4464 out: 4465 unlock_rsb(r); 4466 put_rsb(r); 4467 } 4468 4469 static int receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms) 4470 { 4471 struct dlm_lkb *lkb; 4472 int error; 4473 4474 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb); 4475 if (error) 4476 return error; 4477 4478 _receive_unlock_reply(lkb, ms, false); 4479 dlm_put_lkb(lkb); 4480 return 0; 4481 } 4482 4483 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms, 4484 bool local) 4485 { 4486 struct dlm_rsb *r = lkb->lkb_resource; 4487 int error; 4488 4489 hold_rsb(r); 4490 lock_rsb(r); 4491 4492 error = validate_message(lkb, ms); 4493 if (error) 4494 goto out; 4495 4496 /* local reply can happen with waiters_mutex held */ 4497 error = remove_from_waiters_ms(lkb, ms, local); 4498 if (error) 4499 goto out; 4500 4501 /* this is the value returned from do_cancel() on the master */ 4502 4503 switch (from_dlm_errno(le32_to_cpu(ms->m_result))) { 4504 case -DLM_ECANCEL: 4505 receive_flags_reply(lkb, ms, local); 4506 revert_lock_pc(r, lkb); 4507 queue_cast(r, lkb, -DLM_ECANCEL); 4508 break; 4509 case 0: 4510 break; 4511 default: 4512 log_error(r->res_ls, "receive_cancel_reply %x error %d", 4513 lkb->lkb_id, 4514 from_dlm_errno(le32_to_cpu(ms->m_result))); 4515 } 4516 out: 4517 unlock_rsb(r); 4518 put_rsb(r); 4519 } 4520 4521 static int receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms) 4522 { 4523 struct dlm_lkb *lkb; 4524 int error; 4525 4526 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb); 4527 if (error) 4528 return error; 4529 4530 _receive_cancel_reply(lkb, ms, false); 4531 dlm_put_lkb(lkb); 4532 return 0; 4533 } 4534 4535 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms) 4536 { 4537 struct dlm_lkb *lkb; 4538 struct dlm_rsb *r; 4539 int error, ret_nodeid; 4540 int do_lookup_list = 0; 4541 4542 error = find_lkb(ls, le32_to_cpu(ms->m_lkid), &lkb); 4543 if (error) { 4544 log_error(ls, "%s no lkid %x", __func__, 4545 le32_to_cpu(ms->m_lkid)); 4546 return; 4547 } 4548 4549 /* ms->m_result is the value returned by dlm_master_lookup on dir node 4550 FIXME: will a non-zero error ever be returned? */ 4551 4552 r = lkb->lkb_resource; 4553 hold_rsb(r); 4554 lock_rsb(r); 4555 4556 error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY); 4557 if (error) 4558 goto out; 4559 4560 ret_nodeid = le32_to_cpu(ms->m_nodeid); 4561 4562 /* We sometimes receive a request from the dir node for this 4563 rsb before we've received the dir node's loookup_reply for it. 4564 The request from the dir node implies we're the master, so we set 4565 ourself as master in receive_request_reply, and verify here that 4566 we are indeed the master. */ 4567 4568 if (r->res_master_nodeid && (r->res_master_nodeid != ret_nodeid)) { 4569 /* This should never happen */ 4570 log_error(ls, "receive_lookup_reply %x from %d ret %d " 4571 "master %d dir %d our %d first %x %s", 4572 lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid), 4573 ret_nodeid, r->res_master_nodeid, r->res_dir_nodeid, 4574 dlm_our_nodeid(), r->res_first_lkid, r->res_name); 4575 } 4576 4577 if (ret_nodeid == dlm_our_nodeid()) { 4578 r->res_master_nodeid = ret_nodeid; 4579 r->res_nodeid = 0; 4580 do_lookup_list = 1; 4581 r->res_first_lkid = 0; 4582 } else if (ret_nodeid == -1) { 4583 /* the remote node doesn't believe it's the dir node */ 4584 log_error(ls, "receive_lookup_reply %x from %d bad ret_nodeid", 4585 lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid)); 4586 r->res_master_nodeid = 0; 4587 r->res_nodeid = -1; 4588 lkb->lkb_nodeid = -1; 4589 } else { 4590 /* set_master() will set lkb_nodeid from r */ 4591 r->res_master_nodeid = ret_nodeid; 4592 r->res_nodeid = ret_nodeid; 4593 } 4594 4595 if (is_overlap(lkb)) { 4596 log_debug(ls, "receive_lookup_reply %x unlock %x", 4597 lkb->lkb_id, dlm_iflags_val(lkb)); 4598 queue_cast_overlap(r, lkb); 4599 unhold_lkb(lkb); /* undoes create_lkb() */ 4600 goto out_list; 4601 } 4602 4603 _request_lock(r, lkb); 4604 4605 out_list: 4606 if (do_lookup_list) 4607 process_lookup_list(r); 4608 out: 4609 unlock_rsb(r); 4610 put_rsb(r); 4611 dlm_put_lkb(lkb); 4612 } 4613 4614 static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms, 4615 uint32_t saved_seq) 4616 { 4617 int error = 0, noent = 0; 4618 4619 if (!dlm_is_member(ls, le32_to_cpu(ms->m_header.h_nodeid))) { 4620 log_limit(ls, "receive %d from non-member %d %x %x %d", 4621 le32_to_cpu(ms->m_type), 4622 le32_to_cpu(ms->m_header.h_nodeid), 4623 le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid), 4624 from_dlm_errno(le32_to_cpu(ms->m_result))); 4625 return; 4626 } 4627 4628 switch (ms->m_type) { 4629 4630 /* messages sent to a master node */ 4631 4632 case cpu_to_le32(DLM_MSG_REQUEST): 4633 error = receive_request(ls, ms); 4634 break; 4635 4636 case cpu_to_le32(DLM_MSG_CONVERT): 4637 error = receive_convert(ls, ms); 4638 break; 4639 4640 case cpu_to_le32(DLM_MSG_UNLOCK): 4641 error = receive_unlock(ls, ms); 4642 break; 4643 4644 case cpu_to_le32(DLM_MSG_CANCEL): 4645 noent = 1; 4646 error = receive_cancel(ls, ms); 4647 break; 4648 4649 /* messages sent from a master node (replies to above) */ 4650 4651 case cpu_to_le32(DLM_MSG_REQUEST_REPLY): 4652 error = receive_request_reply(ls, ms); 4653 break; 4654 4655 case cpu_to_le32(DLM_MSG_CONVERT_REPLY): 4656 error = receive_convert_reply(ls, ms); 4657 break; 4658 4659 case cpu_to_le32(DLM_MSG_UNLOCK_REPLY): 4660 error = receive_unlock_reply(ls, ms); 4661 break; 4662 4663 case cpu_to_le32(DLM_MSG_CANCEL_REPLY): 4664 error = receive_cancel_reply(ls, ms); 4665 break; 4666 4667 /* messages sent from a master node (only two types of async msg) */ 4668 4669 case cpu_to_le32(DLM_MSG_GRANT): 4670 noent = 1; 4671 error = receive_grant(ls, ms); 4672 break; 4673 4674 case cpu_to_le32(DLM_MSG_BAST): 4675 noent = 1; 4676 error = receive_bast(ls, ms); 4677 break; 4678 4679 /* messages sent to a dir node */ 4680 4681 case cpu_to_le32(DLM_MSG_LOOKUP): 4682 receive_lookup(ls, ms); 4683 break; 4684 4685 case cpu_to_le32(DLM_MSG_REMOVE): 4686 receive_remove(ls, ms); 4687 break; 4688 4689 /* messages sent from a dir node (remove has no reply) */ 4690 4691 case cpu_to_le32(DLM_MSG_LOOKUP_REPLY): 4692 receive_lookup_reply(ls, ms); 4693 break; 4694 4695 /* other messages */ 4696 4697 case cpu_to_le32(DLM_MSG_PURGE): 4698 receive_purge(ls, ms); 4699 break; 4700 4701 default: 4702 log_error(ls, "unknown message type %d", 4703 le32_to_cpu(ms->m_type)); 4704 } 4705 4706 /* 4707 * When checking for ENOENT, we're checking the result of 4708 * find_lkb(m_remid): 4709 * 4710 * The lock id referenced in the message wasn't found. This may 4711 * happen in normal usage for the async messages and cancel, so 4712 * only use log_debug for them. 4713 * 4714 * Some errors are expected and normal. 4715 */ 4716 4717 if (error == -ENOENT && noent) { 4718 log_debug(ls, "receive %d no %x remote %d %x saved_seq %u", 4719 le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid), 4720 le32_to_cpu(ms->m_header.h_nodeid), 4721 le32_to_cpu(ms->m_lkid), saved_seq); 4722 } else if (error == -ENOENT) { 4723 log_error(ls, "receive %d no %x remote %d %x saved_seq %u", 4724 le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid), 4725 le32_to_cpu(ms->m_header.h_nodeid), 4726 le32_to_cpu(ms->m_lkid), saved_seq); 4727 4728 if (ms->m_type == cpu_to_le32(DLM_MSG_CONVERT)) 4729 dlm_dump_rsb_hash(ls, le32_to_cpu(ms->m_hash)); 4730 } 4731 4732 if (error == -EINVAL) { 4733 log_error(ls, "receive %d inval from %d lkid %x remid %x " 4734 "saved_seq %u", 4735 le32_to_cpu(ms->m_type), 4736 le32_to_cpu(ms->m_header.h_nodeid), 4737 le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid), 4738 saved_seq); 4739 } 4740 } 4741 4742 /* If the lockspace is in recovery mode (locking stopped), then normal 4743 messages are saved on the requestqueue for processing after recovery is 4744 done. When not in recovery mode, we wait for dlm_recoverd to drain saved 4745 messages off the requestqueue before we process new ones. This occurs right 4746 after recovery completes when we transition from saving all messages on 4747 requestqueue, to processing all the saved messages, to processing new 4748 messages as they arrive. */ 4749 4750 static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms, 4751 int nodeid) 4752 { 4753 if (dlm_locking_stopped(ls)) { 4754 /* If we were a member of this lockspace, left, and rejoined, 4755 other nodes may still be sending us messages from the 4756 lockspace generation before we left. */ 4757 if (!ls->ls_generation) { 4758 log_limit(ls, "receive %d from %d ignore old gen", 4759 le32_to_cpu(ms->m_type), nodeid); 4760 return; 4761 } 4762 4763 dlm_add_requestqueue(ls, nodeid, ms); 4764 } else { 4765 dlm_wait_requestqueue(ls); 4766 _receive_message(ls, ms, 0); 4767 } 4768 } 4769 4770 /* This is called by dlm_recoverd to process messages that were saved on 4771 the requestqueue. */ 4772 4773 void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms, 4774 uint32_t saved_seq) 4775 { 4776 _receive_message(ls, ms, saved_seq); 4777 } 4778 4779 /* This is called by the midcomms layer when something is received for 4780 the lockspace. It could be either a MSG (normal message sent as part of 4781 standard locking activity) or an RCOM (recovery message sent as part of 4782 lockspace recovery). */ 4783 4784 void dlm_receive_buffer(union dlm_packet *p, int nodeid) 4785 { 4786 struct dlm_header *hd = &p->header; 4787 struct dlm_ls *ls; 4788 int type = 0; 4789 4790 switch (hd->h_cmd) { 4791 case DLM_MSG: 4792 type = le32_to_cpu(p->message.m_type); 4793 break; 4794 case DLM_RCOM: 4795 type = le32_to_cpu(p->rcom.rc_type); 4796 break; 4797 default: 4798 log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid); 4799 return; 4800 } 4801 4802 if (le32_to_cpu(hd->h_nodeid) != nodeid) { 4803 log_print("invalid h_nodeid %d from %d lockspace %x", 4804 le32_to_cpu(hd->h_nodeid), nodeid, 4805 le32_to_cpu(hd->u.h_lockspace)); 4806 return; 4807 } 4808 4809 ls = dlm_find_lockspace_global(le32_to_cpu(hd->u.h_lockspace)); 4810 if (!ls) { 4811 if (dlm_config.ci_log_debug) { 4812 printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace " 4813 "%u from %d cmd %d type %d\n", 4814 le32_to_cpu(hd->u.h_lockspace), nodeid, 4815 hd->h_cmd, type); 4816 } 4817 4818 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS) 4819 dlm_send_ls_not_ready(nodeid, &p->rcom); 4820 return; 4821 } 4822 4823 /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to 4824 be inactive (in this ls) before transitioning to recovery mode */ 4825 4826 down_read(&ls->ls_recv_active); 4827 if (hd->h_cmd == DLM_MSG) 4828 dlm_receive_message(ls, &p->message, nodeid); 4829 else if (hd->h_cmd == DLM_RCOM) 4830 dlm_receive_rcom(ls, &p->rcom, nodeid); 4831 else 4832 log_error(ls, "invalid h_cmd %d from %d lockspace %x", 4833 hd->h_cmd, nodeid, le32_to_cpu(hd->u.h_lockspace)); 4834 up_read(&ls->ls_recv_active); 4835 4836 dlm_put_lockspace(ls); 4837 } 4838 4839 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb, 4840 struct dlm_message *ms_local) 4841 { 4842 if (middle_conversion(lkb)) { 4843 hold_lkb(lkb); 4844 memset(ms_local, 0, sizeof(struct dlm_message)); 4845 ms_local->m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY); 4846 ms_local->m_result = cpu_to_le32(to_dlm_errno(-EINPROGRESS)); 4847 ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid); 4848 _receive_convert_reply(lkb, ms_local, true); 4849 4850 /* Same special case as in receive_rcom_lock_args() */ 4851 lkb->lkb_grmode = DLM_LOCK_IV; 4852 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT); 4853 unhold_lkb(lkb); 4854 4855 } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) { 4856 set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags); 4857 } 4858 4859 /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down 4860 conversions are async; there's no reply from the remote master */ 4861 } 4862 4863 /* A waiting lkb needs recovery if the master node has failed, or 4864 the master node is changing (only when no directory is used) */ 4865 4866 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb, 4867 int dir_nodeid) 4868 { 4869 if (dlm_no_directory(ls)) 4870 return 1; 4871 4872 if (dlm_is_removed(ls, lkb->lkb_wait_nodeid)) 4873 return 1; 4874 4875 return 0; 4876 } 4877 4878 /* Recovery for locks that are waiting for replies from nodes that are now 4879 gone. We can just complete unlocks and cancels by faking a reply from the 4880 dead node. Requests and up-conversions we flag to be resent after 4881 recovery. Down-conversions can just be completed with a fake reply like 4882 unlocks. Conversions between PR and CW need special attention. */ 4883 4884 void dlm_recover_waiters_pre(struct dlm_ls *ls) 4885 { 4886 struct dlm_lkb *lkb, *safe; 4887 struct dlm_message *ms_local; 4888 int wait_type, local_unlock_result, local_cancel_result; 4889 int dir_nodeid; 4890 4891 ms_local = kmalloc(sizeof(*ms_local), GFP_KERNEL); 4892 if (!ms_local) 4893 return; 4894 4895 mutex_lock(&ls->ls_waiters_mutex); 4896 4897 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) { 4898 4899 dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource); 4900 4901 /* exclude debug messages about unlocks because there can be so 4902 many and they aren't very interesting */ 4903 4904 if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) { 4905 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d " 4906 "lkb_nodeid %d wait_nodeid %d dir_nodeid %d", 4907 lkb->lkb_id, 4908 lkb->lkb_remid, 4909 lkb->lkb_wait_type, 4910 lkb->lkb_resource->res_nodeid, 4911 lkb->lkb_nodeid, 4912 lkb->lkb_wait_nodeid, 4913 dir_nodeid); 4914 } 4915 4916 /* all outstanding lookups, regardless of destination will be 4917 resent after recovery is done */ 4918 4919 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) { 4920 set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags); 4921 continue; 4922 } 4923 4924 if (!waiter_needs_recovery(ls, lkb, dir_nodeid)) 4925 continue; 4926 4927 wait_type = lkb->lkb_wait_type; 4928 local_unlock_result = -DLM_EUNLOCK; 4929 local_cancel_result = -DLM_ECANCEL; 4930 4931 /* Main reply may have been received leaving a zero wait_type, 4932 but a reply for the overlapping op may not have been 4933 received. In that case we need to fake the appropriate 4934 reply for the overlap op. */ 4935 4936 if (!wait_type) { 4937 if (is_overlap_cancel(lkb)) { 4938 wait_type = DLM_MSG_CANCEL; 4939 if (lkb->lkb_grmode == DLM_LOCK_IV) 4940 local_cancel_result = 0; 4941 } 4942 if (is_overlap_unlock(lkb)) { 4943 wait_type = DLM_MSG_UNLOCK; 4944 if (lkb->lkb_grmode == DLM_LOCK_IV) 4945 local_unlock_result = -ENOENT; 4946 } 4947 4948 log_debug(ls, "rwpre overlap %x %x %d %d %d", 4949 lkb->lkb_id, dlm_iflags_val(lkb), wait_type, 4950 local_cancel_result, local_unlock_result); 4951 } 4952 4953 switch (wait_type) { 4954 4955 case DLM_MSG_REQUEST: 4956 set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags); 4957 break; 4958 4959 case DLM_MSG_CONVERT: 4960 recover_convert_waiter(ls, lkb, ms_local); 4961 break; 4962 4963 case DLM_MSG_UNLOCK: 4964 hold_lkb(lkb); 4965 memset(ms_local, 0, sizeof(struct dlm_message)); 4966 ms_local->m_type = cpu_to_le32(DLM_MSG_UNLOCK_REPLY); 4967 ms_local->m_result = cpu_to_le32(to_dlm_errno(local_unlock_result)); 4968 ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid); 4969 _receive_unlock_reply(lkb, ms_local, true); 4970 dlm_put_lkb(lkb); 4971 break; 4972 4973 case DLM_MSG_CANCEL: 4974 hold_lkb(lkb); 4975 memset(ms_local, 0, sizeof(struct dlm_message)); 4976 ms_local->m_type = cpu_to_le32(DLM_MSG_CANCEL_REPLY); 4977 ms_local->m_result = cpu_to_le32(to_dlm_errno(local_cancel_result)); 4978 ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid); 4979 _receive_cancel_reply(lkb, ms_local, true); 4980 dlm_put_lkb(lkb); 4981 break; 4982 4983 default: 4984 log_error(ls, "invalid lkb wait_type %d %d", 4985 lkb->lkb_wait_type, wait_type); 4986 } 4987 schedule(); 4988 } 4989 mutex_unlock(&ls->ls_waiters_mutex); 4990 kfree(ms_local); 4991 } 4992 4993 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls) 4994 { 4995 struct dlm_lkb *lkb = NULL, *iter; 4996 4997 mutex_lock(&ls->ls_waiters_mutex); 4998 list_for_each_entry(iter, &ls->ls_waiters, lkb_wait_reply) { 4999 if (test_bit(DLM_IFL_RESEND_BIT, &iter->lkb_iflags)) { 5000 hold_lkb(iter); 5001 lkb = iter; 5002 break; 5003 } 5004 } 5005 mutex_unlock(&ls->ls_waiters_mutex); 5006 5007 return lkb; 5008 } 5009 5010 /* Deal with lookups and lkb's marked RESEND from _pre. We may now be the 5011 master or dir-node for r. Processing the lkb may result in it being placed 5012 back on waiters. */ 5013 5014 /* We do this after normal locking has been enabled and any saved messages 5015 (in requestqueue) have been processed. We should be confident that at 5016 this point we won't get or process a reply to any of these waiting 5017 operations. But, new ops may be coming in on the rsbs/locks here from 5018 userspace or remotely. */ 5019 5020 /* there may have been an overlap unlock/cancel prior to recovery or after 5021 recovery. if before, the lkb may still have a pos wait_count; if after, the 5022 overlap flag would just have been set and nothing new sent. we can be 5023 confident here than any replies to either the initial op or overlap ops 5024 prior to recovery have been received. */ 5025 5026 int dlm_recover_waiters_post(struct dlm_ls *ls) 5027 { 5028 struct dlm_lkb *lkb; 5029 struct dlm_rsb *r; 5030 int error = 0, mstype, err, oc, ou; 5031 5032 while (1) { 5033 if (dlm_locking_stopped(ls)) { 5034 log_debug(ls, "recover_waiters_post aborted"); 5035 error = -EINTR; 5036 break; 5037 } 5038 5039 lkb = find_resend_waiter(ls); 5040 if (!lkb) 5041 break; 5042 5043 r = lkb->lkb_resource; 5044 hold_rsb(r); 5045 lock_rsb(r); 5046 5047 mstype = lkb->lkb_wait_type; 5048 oc = test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, 5049 &lkb->lkb_iflags); 5050 ou = test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, 5051 &lkb->lkb_iflags); 5052 err = 0; 5053 5054 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d " 5055 "lkb_nodeid %d wait_nodeid %d dir_nodeid %d " 5056 "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype, 5057 r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid, 5058 dlm_dir_nodeid(r), oc, ou); 5059 5060 /* At this point we assume that we won't get a reply to any 5061 previous op or overlap op on this lock. First, do a big 5062 remove_from_waiters() for all previous ops. */ 5063 5064 clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags); 5065 lkb->lkb_wait_type = 0; 5066 /* drop all wait_count references we still 5067 * hold a reference for this iteration. 5068 */ 5069 while (lkb->lkb_wait_count) { 5070 lkb->lkb_wait_count--; 5071 unhold_lkb(lkb); 5072 } 5073 mutex_lock(&ls->ls_waiters_mutex); 5074 list_del_init(&lkb->lkb_wait_reply); 5075 mutex_unlock(&ls->ls_waiters_mutex); 5076 5077 if (oc || ou) { 5078 /* do an unlock or cancel instead of resending */ 5079 switch (mstype) { 5080 case DLM_MSG_LOOKUP: 5081 case DLM_MSG_REQUEST: 5082 queue_cast(r, lkb, ou ? -DLM_EUNLOCK : 5083 -DLM_ECANCEL); 5084 unhold_lkb(lkb); /* undoes create_lkb() */ 5085 break; 5086 case DLM_MSG_CONVERT: 5087 if (oc) { 5088 queue_cast(r, lkb, -DLM_ECANCEL); 5089 } else { 5090 lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK; 5091 _unlock_lock(r, lkb); 5092 } 5093 break; 5094 default: 5095 err = 1; 5096 } 5097 } else { 5098 switch (mstype) { 5099 case DLM_MSG_LOOKUP: 5100 case DLM_MSG_REQUEST: 5101 _request_lock(r, lkb); 5102 if (is_master(r)) 5103 confirm_master(r, 0); 5104 break; 5105 case DLM_MSG_CONVERT: 5106 _convert_lock(r, lkb); 5107 break; 5108 default: 5109 err = 1; 5110 } 5111 } 5112 5113 if (err) { 5114 log_error(ls, "waiter %x msg %d r_nodeid %d " 5115 "dir_nodeid %d overlap %d %d", 5116 lkb->lkb_id, mstype, r->res_nodeid, 5117 dlm_dir_nodeid(r), oc, ou); 5118 } 5119 unlock_rsb(r); 5120 put_rsb(r); 5121 dlm_put_lkb(lkb); 5122 } 5123 5124 return error; 5125 } 5126 5127 static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r, 5128 struct list_head *list) 5129 { 5130 struct dlm_lkb *lkb, *safe; 5131 5132 list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) { 5133 if (!is_master_copy(lkb)) 5134 continue; 5135 5136 /* don't purge lkbs we've added in recover_master_copy for 5137 the current recovery seq */ 5138 5139 if (lkb->lkb_recover_seq == ls->ls_recover_seq) 5140 continue; 5141 5142 del_lkb(r, lkb); 5143 5144 /* this put should free the lkb */ 5145 if (!dlm_put_lkb(lkb)) 5146 log_error(ls, "purged mstcpy lkb not released"); 5147 } 5148 } 5149 5150 void dlm_purge_mstcpy_locks(struct dlm_rsb *r) 5151 { 5152 struct dlm_ls *ls = r->res_ls; 5153 5154 purge_mstcpy_list(ls, r, &r->res_grantqueue); 5155 purge_mstcpy_list(ls, r, &r->res_convertqueue); 5156 purge_mstcpy_list(ls, r, &r->res_waitqueue); 5157 } 5158 5159 static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r, 5160 struct list_head *list, 5161 int nodeid_gone, unsigned int *count) 5162 { 5163 struct dlm_lkb *lkb, *safe; 5164 5165 list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) { 5166 if (!is_master_copy(lkb)) 5167 continue; 5168 5169 if ((lkb->lkb_nodeid == nodeid_gone) || 5170 dlm_is_removed(ls, lkb->lkb_nodeid)) { 5171 5172 /* tell recover_lvb to invalidate the lvb 5173 because a node holding EX/PW failed */ 5174 if ((lkb->lkb_exflags & DLM_LKF_VALBLK) && 5175 (lkb->lkb_grmode >= DLM_LOCK_PW)) { 5176 rsb_set_flag(r, RSB_RECOVER_LVB_INVAL); 5177 } 5178 5179 del_lkb(r, lkb); 5180 5181 /* this put should free the lkb */ 5182 if (!dlm_put_lkb(lkb)) 5183 log_error(ls, "purged dead lkb not released"); 5184 5185 rsb_set_flag(r, RSB_RECOVER_GRANT); 5186 5187 (*count)++; 5188 } 5189 } 5190 } 5191 5192 /* Get rid of locks held by nodes that are gone. */ 5193 5194 void dlm_recover_purge(struct dlm_ls *ls) 5195 { 5196 struct dlm_rsb *r; 5197 struct dlm_member *memb; 5198 int nodes_count = 0; 5199 int nodeid_gone = 0; 5200 unsigned int lkb_count = 0; 5201 5202 /* cache one removed nodeid to optimize the common 5203 case of a single node removed */ 5204 5205 list_for_each_entry(memb, &ls->ls_nodes_gone, list) { 5206 nodes_count++; 5207 nodeid_gone = memb->nodeid; 5208 } 5209 5210 if (!nodes_count) 5211 return; 5212 5213 down_write(&ls->ls_root_sem); 5214 list_for_each_entry(r, &ls->ls_root_list, res_root_list) { 5215 hold_rsb(r); 5216 lock_rsb(r); 5217 if (is_master(r)) { 5218 purge_dead_list(ls, r, &r->res_grantqueue, 5219 nodeid_gone, &lkb_count); 5220 purge_dead_list(ls, r, &r->res_convertqueue, 5221 nodeid_gone, &lkb_count); 5222 purge_dead_list(ls, r, &r->res_waitqueue, 5223 nodeid_gone, &lkb_count); 5224 } 5225 unlock_rsb(r); 5226 unhold_rsb(r); 5227 cond_resched(); 5228 } 5229 up_write(&ls->ls_root_sem); 5230 5231 if (lkb_count) 5232 log_rinfo(ls, "dlm_recover_purge %u locks for %u nodes", 5233 lkb_count, nodes_count); 5234 } 5235 5236 static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket) 5237 { 5238 struct rb_node *n; 5239 struct dlm_rsb *r; 5240 5241 spin_lock(&ls->ls_rsbtbl[bucket].lock); 5242 for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) { 5243 r = rb_entry(n, struct dlm_rsb, res_hashnode); 5244 5245 if (!rsb_flag(r, RSB_RECOVER_GRANT)) 5246 continue; 5247 if (!is_master(r)) { 5248 rsb_clear_flag(r, RSB_RECOVER_GRANT); 5249 continue; 5250 } 5251 hold_rsb(r); 5252 spin_unlock(&ls->ls_rsbtbl[bucket].lock); 5253 return r; 5254 } 5255 spin_unlock(&ls->ls_rsbtbl[bucket].lock); 5256 return NULL; 5257 } 5258 5259 /* 5260 * Attempt to grant locks on resources that we are the master of. 5261 * Locks may have become grantable during recovery because locks 5262 * from departed nodes have been purged (or not rebuilt), allowing 5263 * previously blocked locks to now be granted. The subset of rsb's 5264 * we are interested in are those with lkb's on either the convert or 5265 * waiting queues. 5266 * 5267 * Simplest would be to go through each master rsb and check for non-empty 5268 * convert or waiting queues, and attempt to grant on those rsbs. 5269 * Checking the queues requires lock_rsb, though, for which we'd need 5270 * to release the rsbtbl lock. This would make iterating through all 5271 * rsb's very inefficient. So, we rely on earlier recovery routines 5272 * to set RECOVER_GRANT on any rsb's that we should attempt to grant 5273 * locks for. 5274 */ 5275 5276 void dlm_recover_grant(struct dlm_ls *ls) 5277 { 5278 struct dlm_rsb *r; 5279 int bucket = 0; 5280 unsigned int count = 0; 5281 unsigned int rsb_count = 0; 5282 unsigned int lkb_count = 0; 5283 5284 while (1) { 5285 r = find_grant_rsb(ls, bucket); 5286 if (!r) { 5287 if (bucket == ls->ls_rsbtbl_size - 1) 5288 break; 5289 bucket++; 5290 continue; 5291 } 5292 rsb_count++; 5293 count = 0; 5294 lock_rsb(r); 5295 /* the RECOVER_GRANT flag is checked in the grant path */ 5296 grant_pending_locks(r, &count); 5297 rsb_clear_flag(r, RSB_RECOVER_GRANT); 5298 lkb_count += count; 5299 confirm_master(r, 0); 5300 unlock_rsb(r); 5301 put_rsb(r); 5302 cond_resched(); 5303 } 5304 5305 if (lkb_count) 5306 log_rinfo(ls, "dlm_recover_grant %u locks on %u resources", 5307 lkb_count, rsb_count); 5308 } 5309 5310 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid, 5311 uint32_t remid) 5312 { 5313 struct dlm_lkb *lkb; 5314 5315 list_for_each_entry(lkb, head, lkb_statequeue) { 5316 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid) 5317 return lkb; 5318 } 5319 return NULL; 5320 } 5321 5322 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid, 5323 uint32_t remid) 5324 { 5325 struct dlm_lkb *lkb; 5326 5327 lkb = search_remid_list(&r->res_grantqueue, nodeid, remid); 5328 if (lkb) 5329 return lkb; 5330 lkb = search_remid_list(&r->res_convertqueue, nodeid, remid); 5331 if (lkb) 5332 return lkb; 5333 lkb = search_remid_list(&r->res_waitqueue, nodeid, remid); 5334 if (lkb) 5335 return lkb; 5336 return NULL; 5337 } 5338 5339 /* needs at least dlm_rcom + rcom_lock */ 5340 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, 5341 struct dlm_rsb *r, struct dlm_rcom *rc) 5342 { 5343 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf; 5344 5345 lkb->lkb_nodeid = le32_to_cpu(rc->rc_header.h_nodeid); 5346 lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid); 5347 lkb->lkb_remid = le32_to_cpu(rl->rl_lkid); 5348 lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags); 5349 dlm_set_dflags_val(lkb, le32_to_cpu(rl->rl_flags)); 5350 set_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags); 5351 lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq); 5352 lkb->lkb_rqmode = rl->rl_rqmode; 5353 lkb->lkb_grmode = rl->rl_grmode; 5354 /* don't set lkb_status because add_lkb wants to itself */ 5355 5356 lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL; 5357 lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL; 5358 5359 if (lkb->lkb_exflags & DLM_LKF_VALBLK) { 5360 int lvblen = le16_to_cpu(rc->rc_header.h_length) - 5361 sizeof(struct dlm_rcom) - sizeof(struct rcom_lock); 5362 if (lvblen > ls->ls_lvblen) 5363 return -EINVAL; 5364 lkb->lkb_lvbptr = dlm_allocate_lvb(ls); 5365 if (!lkb->lkb_lvbptr) 5366 return -ENOMEM; 5367 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen); 5368 } 5369 5370 /* Conversions between PR and CW (middle modes) need special handling. 5371 The real granted mode of these converting locks cannot be determined 5372 until all locks have been rebuilt on the rsb (recover_conversion) */ 5373 5374 if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) && 5375 middle_conversion(lkb)) { 5376 rl->rl_status = DLM_LKSTS_CONVERT; 5377 lkb->lkb_grmode = DLM_LOCK_IV; 5378 rsb_set_flag(r, RSB_RECOVER_CONVERT); 5379 } 5380 5381 return 0; 5382 } 5383 5384 /* This lkb may have been recovered in a previous aborted recovery so we need 5385 to check if the rsb already has an lkb with the given remote nodeid/lkid. 5386 If so we just send back a standard reply. If not, we create a new lkb with 5387 the given values and send back our lkid. We send back our lkid by sending 5388 back the rcom_lock struct we got but with the remid field filled in. */ 5389 5390 /* needs at least dlm_rcom + rcom_lock */ 5391 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc) 5392 { 5393 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf; 5394 struct dlm_rsb *r; 5395 struct dlm_lkb *lkb; 5396 uint32_t remid = 0; 5397 int from_nodeid = le32_to_cpu(rc->rc_header.h_nodeid); 5398 int error; 5399 5400 if (rl->rl_parent_lkid) { 5401 error = -EOPNOTSUPP; 5402 goto out; 5403 } 5404 5405 remid = le32_to_cpu(rl->rl_lkid); 5406 5407 /* In general we expect the rsb returned to be R_MASTER, but we don't 5408 have to require it. Recovery of masters on one node can overlap 5409 recovery of locks on another node, so one node can send us MSTCPY 5410 locks before we've made ourselves master of this rsb. We can still 5411 add new MSTCPY locks that we receive here without any harm; when 5412 we make ourselves master, dlm_recover_masters() won't touch the 5413 MSTCPY locks we've received early. */ 5414 5415 error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen), 5416 from_nodeid, R_RECEIVE_RECOVER, &r); 5417 if (error) 5418 goto out; 5419 5420 lock_rsb(r); 5421 5422 if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) { 5423 log_error(ls, "dlm_recover_master_copy remote %d %x not dir", 5424 from_nodeid, remid); 5425 error = -EBADR; 5426 goto out_unlock; 5427 } 5428 5429 lkb = search_remid(r, from_nodeid, remid); 5430 if (lkb) { 5431 error = -EEXIST; 5432 goto out_remid; 5433 } 5434 5435 error = create_lkb(ls, &lkb); 5436 if (error) 5437 goto out_unlock; 5438 5439 error = receive_rcom_lock_args(ls, lkb, r, rc); 5440 if (error) { 5441 __put_lkb(ls, lkb); 5442 goto out_unlock; 5443 } 5444 5445 attach_lkb(r, lkb); 5446 add_lkb(r, lkb, rl->rl_status); 5447 ls->ls_recover_locks_in++; 5448 5449 if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue)) 5450 rsb_set_flag(r, RSB_RECOVER_GRANT); 5451 5452 out_remid: 5453 /* this is the new value returned to the lock holder for 5454 saving in its process-copy lkb */ 5455 rl->rl_remid = cpu_to_le32(lkb->lkb_id); 5456 5457 lkb->lkb_recover_seq = ls->ls_recover_seq; 5458 5459 out_unlock: 5460 unlock_rsb(r); 5461 put_rsb(r); 5462 out: 5463 if (error && error != -EEXIST) 5464 log_rinfo(ls, "dlm_recover_master_copy remote %d %x error %d", 5465 from_nodeid, remid, error); 5466 rl->rl_result = cpu_to_le32(error); 5467 return error; 5468 } 5469 5470 /* needs at least dlm_rcom + rcom_lock */ 5471 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc) 5472 { 5473 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf; 5474 struct dlm_rsb *r; 5475 struct dlm_lkb *lkb; 5476 uint32_t lkid, remid; 5477 int error, result; 5478 5479 lkid = le32_to_cpu(rl->rl_lkid); 5480 remid = le32_to_cpu(rl->rl_remid); 5481 result = le32_to_cpu(rl->rl_result); 5482 5483 error = find_lkb(ls, lkid, &lkb); 5484 if (error) { 5485 log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d", 5486 lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid, 5487 result); 5488 return error; 5489 } 5490 5491 r = lkb->lkb_resource; 5492 hold_rsb(r); 5493 lock_rsb(r); 5494 5495 if (!is_process_copy(lkb)) { 5496 log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d", 5497 lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid, 5498 result); 5499 dlm_dump_rsb(r); 5500 unlock_rsb(r); 5501 put_rsb(r); 5502 dlm_put_lkb(lkb); 5503 return -EINVAL; 5504 } 5505 5506 switch (result) { 5507 case -EBADR: 5508 /* There's a chance the new master received our lock before 5509 dlm_recover_master_reply(), this wouldn't happen if we did 5510 a barrier between recover_masters and recover_locks. */ 5511 5512 log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d", 5513 lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid, 5514 result); 5515 5516 dlm_send_rcom_lock(r, lkb); 5517 goto out; 5518 case -EEXIST: 5519 case 0: 5520 lkb->lkb_remid = remid; 5521 break; 5522 default: 5523 log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk", 5524 lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid, 5525 result); 5526 } 5527 5528 /* an ack for dlm_recover_locks() which waits for replies from 5529 all the locks it sends to new masters */ 5530 dlm_recovered_lock(r); 5531 out: 5532 unlock_rsb(r); 5533 put_rsb(r); 5534 dlm_put_lkb(lkb); 5535 5536 return 0; 5537 } 5538 5539 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua, 5540 int mode, uint32_t flags, void *name, unsigned int namelen) 5541 { 5542 struct dlm_lkb *lkb; 5543 struct dlm_args args; 5544 bool do_put = true; 5545 int error; 5546 5547 dlm_lock_recovery(ls); 5548 5549 error = create_lkb(ls, &lkb); 5550 if (error) { 5551 kfree(ua); 5552 goto out; 5553 } 5554 5555 trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags); 5556 5557 if (flags & DLM_LKF_VALBLK) { 5558 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS); 5559 if (!ua->lksb.sb_lvbptr) { 5560 kfree(ua); 5561 error = -ENOMEM; 5562 goto out_put; 5563 } 5564 } 5565 error = set_lock_args(mode, &ua->lksb, flags, namelen, fake_astfn, ua, 5566 fake_bastfn, &args); 5567 if (error) { 5568 kfree(ua->lksb.sb_lvbptr); 5569 ua->lksb.sb_lvbptr = NULL; 5570 kfree(ua); 5571 goto out_put; 5572 } 5573 5574 /* After ua is attached to lkb it will be freed by dlm_free_lkb(). 5575 When DLM_DFL_USER_BIT is set, the dlm knows that this is a userspace 5576 lock and that lkb_astparam is the dlm_user_args structure. */ 5577 set_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags); 5578 error = request_lock(ls, lkb, name, namelen, &args); 5579 5580 switch (error) { 5581 case 0: 5582 break; 5583 case -EINPROGRESS: 5584 error = 0; 5585 break; 5586 case -EAGAIN: 5587 error = 0; 5588 fallthrough; 5589 default: 5590 goto out_put; 5591 } 5592 5593 /* add this new lkb to the per-process list of locks */ 5594 spin_lock(&ua->proc->locks_spin); 5595 hold_lkb(lkb); 5596 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks); 5597 spin_unlock(&ua->proc->locks_spin); 5598 do_put = false; 5599 out_put: 5600 trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, false); 5601 if (do_put) 5602 __put_lkb(ls, lkb); 5603 out: 5604 dlm_unlock_recovery(ls); 5605 return error; 5606 } 5607 5608 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, 5609 int mode, uint32_t flags, uint32_t lkid, char *lvb_in) 5610 { 5611 struct dlm_lkb *lkb; 5612 struct dlm_args args; 5613 struct dlm_user_args *ua; 5614 int error; 5615 5616 dlm_lock_recovery(ls); 5617 5618 error = find_lkb(ls, lkid, &lkb); 5619 if (error) 5620 goto out; 5621 5622 trace_dlm_lock_start(ls, lkb, NULL, 0, mode, flags); 5623 5624 /* user can change the params on its lock when it converts it, or 5625 add an lvb that didn't exist before */ 5626 5627 ua = lkb->lkb_ua; 5628 5629 if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) { 5630 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS); 5631 if (!ua->lksb.sb_lvbptr) { 5632 error = -ENOMEM; 5633 goto out_put; 5634 } 5635 } 5636 if (lvb_in && ua->lksb.sb_lvbptr) 5637 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN); 5638 5639 ua->xid = ua_tmp->xid; 5640 ua->castparam = ua_tmp->castparam; 5641 ua->castaddr = ua_tmp->castaddr; 5642 ua->bastparam = ua_tmp->bastparam; 5643 ua->bastaddr = ua_tmp->bastaddr; 5644 ua->user_lksb = ua_tmp->user_lksb; 5645 5646 error = set_lock_args(mode, &ua->lksb, flags, 0, fake_astfn, ua, 5647 fake_bastfn, &args); 5648 if (error) 5649 goto out_put; 5650 5651 error = convert_lock(ls, lkb, &args); 5652 5653 if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK) 5654 error = 0; 5655 out_put: 5656 trace_dlm_lock_end(ls, lkb, NULL, 0, mode, flags, error, false); 5657 dlm_put_lkb(lkb); 5658 out: 5659 dlm_unlock_recovery(ls); 5660 kfree(ua_tmp); 5661 return error; 5662 } 5663 5664 /* 5665 * The caller asks for an orphan lock on a given resource with a given mode. 5666 * If a matching lock exists, it's moved to the owner's list of locks and 5667 * the lkid is returned. 5668 */ 5669 5670 int dlm_user_adopt_orphan(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, 5671 int mode, uint32_t flags, void *name, unsigned int namelen, 5672 uint32_t *lkid) 5673 { 5674 struct dlm_lkb *lkb = NULL, *iter; 5675 struct dlm_user_args *ua; 5676 int found_other_mode = 0; 5677 int rv = 0; 5678 5679 mutex_lock(&ls->ls_orphans_mutex); 5680 list_for_each_entry(iter, &ls->ls_orphans, lkb_ownqueue) { 5681 if (iter->lkb_resource->res_length != namelen) 5682 continue; 5683 if (memcmp(iter->lkb_resource->res_name, name, namelen)) 5684 continue; 5685 if (iter->lkb_grmode != mode) { 5686 found_other_mode = 1; 5687 continue; 5688 } 5689 5690 lkb = iter; 5691 list_del_init(&iter->lkb_ownqueue); 5692 clear_bit(DLM_DFL_ORPHAN_BIT, &iter->lkb_dflags); 5693 *lkid = iter->lkb_id; 5694 break; 5695 } 5696 mutex_unlock(&ls->ls_orphans_mutex); 5697 5698 if (!lkb && found_other_mode) { 5699 rv = -EAGAIN; 5700 goto out; 5701 } 5702 5703 if (!lkb) { 5704 rv = -ENOENT; 5705 goto out; 5706 } 5707 5708 lkb->lkb_exflags = flags; 5709 lkb->lkb_ownpid = (int) current->pid; 5710 5711 ua = lkb->lkb_ua; 5712 5713 ua->proc = ua_tmp->proc; 5714 ua->xid = ua_tmp->xid; 5715 ua->castparam = ua_tmp->castparam; 5716 ua->castaddr = ua_tmp->castaddr; 5717 ua->bastparam = ua_tmp->bastparam; 5718 ua->bastaddr = ua_tmp->bastaddr; 5719 ua->user_lksb = ua_tmp->user_lksb; 5720 5721 /* 5722 * The lkb reference from the ls_orphans list was not 5723 * removed above, and is now considered the reference 5724 * for the proc locks list. 5725 */ 5726 5727 spin_lock(&ua->proc->locks_spin); 5728 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks); 5729 spin_unlock(&ua->proc->locks_spin); 5730 out: 5731 kfree(ua_tmp); 5732 return rv; 5733 } 5734 5735 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, 5736 uint32_t flags, uint32_t lkid, char *lvb_in) 5737 { 5738 struct dlm_lkb *lkb; 5739 struct dlm_args args; 5740 struct dlm_user_args *ua; 5741 int error; 5742 5743 dlm_lock_recovery(ls); 5744 5745 error = find_lkb(ls, lkid, &lkb); 5746 if (error) 5747 goto out; 5748 5749 trace_dlm_unlock_start(ls, lkb, flags); 5750 5751 ua = lkb->lkb_ua; 5752 5753 if (lvb_in && ua->lksb.sb_lvbptr) 5754 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN); 5755 if (ua_tmp->castparam) 5756 ua->castparam = ua_tmp->castparam; 5757 ua->user_lksb = ua_tmp->user_lksb; 5758 5759 error = set_unlock_args(flags, ua, &args); 5760 if (error) 5761 goto out_put; 5762 5763 error = unlock_lock(ls, lkb, &args); 5764 5765 if (error == -DLM_EUNLOCK) 5766 error = 0; 5767 /* from validate_unlock_args() */ 5768 if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK)) 5769 error = 0; 5770 if (error) 5771 goto out_put; 5772 5773 spin_lock(&ua->proc->locks_spin); 5774 /* dlm_user_add_cb() may have already taken lkb off the proc list */ 5775 if (!list_empty(&lkb->lkb_ownqueue)) 5776 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking); 5777 spin_unlock(&ua->proc->locks_spin); 5778 out_put: 5779 trace_dlm_unlock_end(ls, lkb, flags, error); 5780 dlm_put_lkb(lkb); 5781 out: 5782 dlm_unlock_recovery(ls); 5783 kfree(ua_tmp); 5784 return error; 5785 } 5786 5787 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, 5788 uint32_t flags, uint32_t lkid) 5789 { 5790 struct dlm_lkb *lkb; 5791 struct dlm_args args; 5792 struct dlm_user_args *ua; 5793 int error; 5794 5795 dlm_lock_recovery(ls); 5796 5797 error = find_lkb(ls, lkid, &lkb); 5798 if (error) 5799 goto out; 5800 5801 trace_dlm_unlock_start(ls, lkb, flags); 5802 5803 ua = lkb->lkb_ua; 5804 if (ua_tmp->castparam) 5805 ua->castparam = ua_tmp->castparam; 5806 ua->user_lksb = ua_tmp->user_lksb; 5807 5808 error = set_unlock_args(flags, ua, &args); 5809 if (error) 5810 goto out_put; 5811 5812 error = cancel_lock(ls, lkb, &args); 5813 5814 if (error == -DLM_ECANCEL) 5815 error = 0; 5816 /* from validate_unlock_args() */ 5817 if (error == -EBUSY) 5818 error = 0; 5819 out_put: 5820 trace_dlm_unlock_end(ls, lkb, flags, error); 5821 dlm_put_lkb(lkb); 5822 out: 5823 dlm_unlock_recovery(ls); 5824 kfree(ua_tmp); 5825 return error; 5826 } 5827 5828 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid) 5829 { 5830 struct dlm_lkb *lkb; 5831 struct dlm_args args; 5832 struct dlm_user_args *ua; 5833 struct dlm_rsb *r; 5834 int error; 5835 5836 dlm_lock_recovery(ls); 5837 5838 error = find_lkb(ls, lkid, &lkb); 5839 if (error) 5840 goto out; 5841 5842 trace_dlm_unlock_start(ls, lkb, flags); 5843 5844 ua = lkb->lkb_ua; 5845 5846 error = set_unlock_args(flags, ua, &args); 5847 if (error) 5848 goto out_put; 5849 5850 /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */ 5851 5852 r = lkb->lkb_resource; 5853 hold_rsb(r); 5854 lock_rsb(r); 5855 5856 error = validate_unlock_args(lkb, &args); 5857 if (error) 5858 goto out_r; 5859 set_bit(DLM_IFL_DEADLOCK_CANCEL_BIT, &lkb->lkb_iflags); 5860 5861 error = _cancel_lock(r, lkb); 5862 out_r: 5863 unlock_rsb(r); 5864 put_rsb(r); 5865 5866 if (error == -DLM_ECANCEL) 5867 error = 0; 5868 /* from validate_unlock_args() */ 5869 if (error == -EBUSY) 5870 error = 0; 5871 out_put: 5872 trace_dlm_unlock_end(ls, lkb, flags, error); 5873 dlm_put_lkb(lkb); 5874 out: 5875 dlm_unlock_recovery(ls); 5876 return error; 5877 } 5878 5879 /* lkb's that are removed from the waiters list by revert are just left on the 5880 orphans list with the granted orphan locks, to be freed by purge */ 5881 5882 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb) 5883 { 5884 struct dlm_args args; 5885 int error; 5886 5887 hold_lkb(lkb); /* reference for the ls_orphans list */ 5888 mutex_lock(&ls->ls_orphans_mutex); 5889 list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans); 5890 mutex_unlock(&ls->ls_orphans_mutex); 5891 5892 set_unlock_args(0, lkb->lkb_ua, &args); 5893 5894 error = cancel_lock(ls, lkb, &args); 5895 if (error == -DLM_ECANCEL) 5896 error = 0; 5897 return error; 5898 } 5899 5900 /* The FORCEUNLOCK flag allows the unlock to go ahead even if the lkb isn't 5901 granted. Regardless of what rsb queue the lock is on, it's removed and 5902 freed. The IVVALBLK flag causes the lvb on the resource to be invalidated 5903 if our lock is PW/EX (it's ignored if our granted mode is smaller.) */ 5904 5905 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb) 5906 { 5907 struct dlm_args args; 5908 int error; 5909 5910 set_unlock_args(DLM_LKF_FORCEUNLOCK | DLM_LKF_IVVALBLK, 5911 lkb->lkb_ua, &args); 5912 5913 error = unlock_lock(ls, lkb, &args); 5914 if (error == -DLM_EUNLOCK) 5915 error = 0; 5916 return error; 5917 } 5918 5919 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock() 5920 (which does lock_rsb) due to deadlock with receiving a message that does 5921 lock_rsb followed by dlm_user_add_cb() */ 5922 5923 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls, 5924 struct dlm_user_proc *proc) 5925 { 5926 struct dlm_lkb *lkb = NULL; 5927 5928 spin_lock(&ls->ls_clear_proc_locks); 5929 if (list_empty(&proc->locks)) 5930 goto out; 5931 5932 lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue); 5933 list_del_init(&lkb->lkb_ownqueue); 5934 5935 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT) 5936 set_bit(DLM_DFL_ORPHAN_BIT, &lkb->lkb_dflags); 5937 else 5938 set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags); 5939 out: 5940 spin_unlock(&ls->ls_clear_proc_locks); 5941 return lkb; 5942 } 5943 5944 /* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which 5945 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts, 5946 which we clear here. */ 5947 5948 /* proc CLOSING flag is set so no more device_reads should look at proc->asts 5949 list, and no more device_writes should add lkb's to proc->locks list; so we 5950 shouldn't need to take asts_spin or locks_spin here. this assumes that 5951 device reads/writes/closes are serialized -- FIXME: we may need to serialize 5952 them ourself. */ 5953 5954 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) 5955 { 5956 struct dlm_lkb *lkb, *safe; 5957 5958 dlm_lock_recovery(ls); 5959 5960 while (1) { 5961 lkb = del_proc_lock(ls, proc); 5962 if (!lkb) 5963 break; 5964 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT) 5965 orphan_proc_lock(ls, lkb); 5966 else 5967 unlock_proc_lock(ls, lkb); 5968 5969 /* this removes the reference for the proc->locks list 5970 added by dlm_user_request, it may result in the lkb 5971 being freed */ 5972 5973 dlm_put_lkb(lkb); 5974 } 5975 5976 spin_lock(&ls->ls_clear_proc_locks); 5977 5978 /* in-progress unlocks */ 5979 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) { 5980 list_del_init(&lkb->lkb_ownqueue); 5981 set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags); 5982 dlm_put_lkb(lkb); 5983 } 5984 5985 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) { 5986 dlm_purge_lkb_callbacks(lkb); 5987 list_del_init(&lkb->lkb_cb_list); 5988 dlm_put_lkb(lkb); 5989 } 5990 5991 spin_unlock(&ls->ls_clear_proc_locks); 5992 dlm_unlock_recovery(ls); 5993 } 5994 5995 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) 5996 { 5997 struct dlm_lkb *lkb, *safe; 5998 5999 while (1) { 6000 lkb = NULL; 6001 spin_lock(&proc->locks_spin); 6002 if (!list_empty(&proc->locks)) { 6003 lkb = list_entry(proc->locks.next, struct dlm_lkb, 6004 lkb_ownqueue); 6005 list_del_init(&lkb->lkb_ownqueue); 6006 } 6007 spin_unlock(&proc->locks_spin); 6008 6009 if (!lkb) 6010 break; 6011 6012 set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags); 6013 unlock_proc_lock(ls, lkb); 6014 dlm_put_lkb(lkb); /* ref from proc->locks list */ 6015 } 6016 6017 spin_lock(&proc->locks_spin); 6018 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) { 6019 list_del_init(&lkb->lkb_ownqueue); 6020 set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags); 6021 dlm_put_lkb(lkb); 6022 } 6023 spin_unlock(&proc->locks_spin); 6024 6025 spin_lock(&proc->asts_spin); 6026 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) { 6027 dlm_purge_lkb_callbacks(lkb); 6028 list_del_init(&lkb->lkb_cb_list); 6029 dlm_put_lkb(lkb); 6030 } 6031 spin_unlock(&proc->asts_spin); 6032 } 6033 6034 /* pid of 0 means purge all orphans */ 6035 6036 static void do_purge(struct dlm_ls *ls, int nodeid, int pid) 6037 { 6038 struct dlm_lkb *lkb, *safe; 6039 6040 mutex_lock(&ls->ls_orphans_mutex); 6041 list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) { 6042 if (pid && lkb->lkb_ownpid != pid) 6043 continue; 6044 unlock_proc_lock(ls, lkb); 6045 list_del_init(&lkb->lkb_ownqueue); 6046 dlm_put_lkb(lkb); 6047 } 6048 mutex_unlock(&ls->ls_orphans_mutex); 6049 } 6050 6051 static int send_purge(struct dlm_ls *ls, int nodeid, int pid) 6052 { 6053 struct dlm_message *ms; 6054 struct dlm_mhandle *mh; 6055 int error; 6056 6057 error = _create_message(ls, sizeof(struct dlm_message), nodeid, 6058 DLM_MSG_PURGE, &ms, &mh, GFP_NOFS); 6059 if (error) 6060 return error; 6061 ms->m_nodeid = cpu_to_le32(nodeid); 6062 ms->m_pid = cpu_to_le32(pid); 6063 6064 return send_message(mh, ms, NULL, 0); 6065 } 6066 6067 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc, 6068 int nodeid, int pid) 6069 { 6070 int error = 0; 6071 6072 if (nodeid && (nodeid != dlm_our_nodeid())) { 6073 error = send_purge(ls, nodeid, pid); 6074 } else { 6075 dlm_lock_recovery(ls); 6076 if (pid == current->pid) 6077 purge_proc_locks(ls, proc); 6078 else 6079 do_purge(ls, nodeid, pid); 6080 dlm_unlock_recovery(ls); 6081 } 6082 return error; 6083 } 6084 6085 /* debug functionality */ 6086 int dlm_debug_add_lkb(struct dlm_ls *ls, uint32_t lkb_id, char *name, int len, 6087 int lkb_nodeid, unsigned int lkb_dflags, int lkb_status) 6088 { 6089 struct dlm_lksb *lksb; 6090 struct dlm_lkb *lkb; 6091 struct dlm_rsb *r; 6092 int error; 6093 6094 /* we currently can't set a valid user lock */ 6095 if (lkb_dflags & BIT(DLM_DFL_USER_BIT)) 6096 return -EOPNOTSUPP; 6097 6098 lksb = kzalloc(sizeof(*lksb), GFP_NOFS); 6099 if (!lksb) 6100 return -ENOMEM; 6101 6102 error = _create_lkb(ls, &lkb, lkb_id, lkb_id + 1); 6103 if (error) { 6104 kfree(lksb); 6105 return error; 6106 } 6107 6108 dlm_set_dflags_val(lkb, lkb_dflags); 6109 lkb->lkb_nodeid = lkb_nodeid; 6110 lkb->lkb_lksb = lksb; 6111 /* user specific pointer, just don't have it NULL for kernel locks */ 6112 if (~lkb_dflags & BIT(DLM_DFL_USER_BIT)) 6113 lkb->lkb_astparam = (void *)0xDEADBEEF; 6114 6115 error = find_rsb(ls, name, len, 0, R_REQUEST, &r); 6116 if (error) { 6117 kfree(lksb); 6118 __put_lkb(ls, lkb); 6119 return error; 6120 } 6121 6122 lock_rsb(r); 6123 attach_lkb(r, lkb); 6124 add_lkb(r, lkb, lkb_status); 6125 unlock_rsb(r); 6126 put_rsb(r); 6127 6128 return 0; 6129 } 6130 6131 int dlm_debug_add_lkb_to_waiters(struct dlm_ls *ls, uint32_t lkb_id, 6132 int mstype, int to_nodeid) 6133 { 6134 struct dlm_lkb *lkb; 6135 int error; 6136 6137 error = find_lkb(ls, lkb_id, &lkb); 6138 if (error) 6139 return error; 6140 6141 error = add_to_waiters(lkb, mstype, to_nodeid); 6142 dlm_put_lkb(lkb); 6143 return error; 6144 } 6145 6146