1 // SPDX-License-Identifier: GPL-2.0-only 2 /****************************************************************************** 3 ******************************************************************************* 4 ** 5 ** Copyright (C) 2005-2010 Red Hat, Inc. All rights reserved. 6 ** 7 ** 8 ******************************************************************************* 9 ******************************************************************************/ 10 11 /* Central locking logic has four stages: 12 13 dlm_lock() 14 dlm_unlock() 15 16 request_lock(ls, lkb) 17 convert_lock(ls, lkb) 18 unlock_lock(ls, lkb) 19 cancel_lock(ls, lkb) 20 21 _request_lock(r, lkb) 22 _convert_lock(r, lkb) 23 _unlock_lock(r, lkb) 24 _cancel_lock(r, lkb) 25 26 do_request(r, lkb) 27 do_convert(r, lkb) 28 do_unlock(r, lkb) 29 do_cancel(r, lkb) 30 31 Stage 1 (lock, unlock) is mainly about checking input args and 32 splitting into one of the four main operations: 33 34 dlm_lock = request_lock 35 dlm_lock+CONVERT = convert_lock 36 dlm_unlock = unlock_lock 37 dlm_unlock+CANCEL = cancel_lock 38 39 Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is 40 provided to the next stage. 41 42 Stage 3, _xxxx_lock(), determines if the operation is local or remote. 43 When remote, it calls send_xxxx(), when local it calls do_xxxx(). 44 45 Stage 4, do_xxxx(), is the guts of the operation. It manipulates the 46 given rsb and lkb and queues callbacks. 47 48 For remote operations, send_xxxx() results in the corresponding do_xxxx() 49 function being executed on the remote node. The connecting send/receive 50 calls on local (L) and remote (R) nodes: 51 52 L: send_xxxx() -> R: receive_xxxx() 53 R: do_xxxx() 54 L: receive_xxxx_reply() <- R: send_xxxx_reply() 55 */ 56 #include <trace/events/dlm.h> 57 58 #include <linux/types.h> 59 #include <linux/rbtree.h> 60 #include <linux/slab.h> 61 #include "dlm_internal.h" 62 #include <linux/dlm_device.h> 63 #include "memory.h" 64 #include "midcomms.h" 65 #include "requestqueue.h" 66 #include "util.h" 67 #include "dir.h" 68 #include "member.h" 69 #include "lockspace.h" 70 #include "ast.h" 71 #include "lock.h" 72 #include "rcom.h" 73 #include "recover.h" 74 #include "lvb_table.h" 75 #include "user.h" 76 #include "config.h" 77 78 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb); 79 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb); 80 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb); 81 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb); 82 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb); 83 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode); 84 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb); 85 static int send_remove(struct dlm_rsb *r); 86 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb); 87 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb); 88 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, 89 struct dlm_message *ms); 90 static int receive_extralen(struct dlm_message *ms); 91 static void do_purge(struct dlm_ls *ls, int nodeid, int pid); 92 static void del_timeout(struct dlm_lkb *lkb); 93 static void toss_rsb(struct kref *kref); 94 95 /* 96 * Lock compatibilty matrix - thanks Steve 97 * UN = Unlocked state. Not really a state, used as a flag 98 * PD = Padding. Used to make the matrix a nice power of two in size 99 * Other states are the same as the VMS DLM. 100 * Usage: matrix[grmode+1][rqmode+1] (although m[rq+1][gr+1] is the same) 101 */ 102 103 static const int __dlm_compat_matrix[8][8] = { 104 /* UN NL CR CW PR PW EX PD */ 105 {1, 1, 1, 1, 1, 1, 1, 0}, /* UN */ 106 {1, 1, 1, 1, 1, 1, 1, 0}, /* NL */ 107 {1, 1, 1, 1, 1, 1, 0, 0}, /* CR */ 108 {1, 1, 1, 1, 0, 0, 0, 0}, /* CW */ 109 {1, 1, 1, 0, 1, 0, 0, 0}, /* PR */ 110 {1, 1, 1, 0, 0, 0, 0, 0}, /* PW */ 111 {1, 1, 0, 0, 0, 0, 0, 0}, /* EX */ 112 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */ 113 }; 114 115 /* 116 * This defines the direction of transfer of LVB data. 117 * Granted mode is the row; requested mode is the column. 118 * Usage: matrix[grmode+1][rqmode+1] 119 * 1 = LVB is returned to the caller 120 * 0 = LVB is written to the resource 121 * -1 = nothing happens to the LVB 122 */ 123 124 const int dlm_lvb_operations[8][8] = { 125 /* UN NL CR CW PR PW EX PD*/ 126 { -1, 1, 1, 1, 1, 1, 1, -1 }, /* UN */ 127 { -1, 1, 1, 1, 1, 1, 1, 0 }, /* NL */ 128 { -1, -1, 1, 1, 1, 1, 1, 0 }, /* CR */ 129 { -1, -1, -1, 1, 1, 1, 1, 0 }, /* CW */ 130 { -1, -1, -1, -1, 1, 1, 1, 0 }, /* PR */ 131 { -1, 0, 0, 0, 0, 0, 1, 0 }, /* PW */ 132 { -1, 0, 0, 0, 0, 0, 0, 0 }, /* EX */ 133 { -1, 0, 0, 0, 0, 0, 0, 0 } /* PD */ 134 }; 135 136 #define modes_compat(gr, rq) \ 137 __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1] 138 139 int dlm_modes_compat(int mode1, int mode2) 140 { 141 return __dlm_compat_matrix[mode1 + 1][mode2 + 1]; 142 } 143 144 /* 145 * Compatibility matrix for conversions with QUECVT set. 146 * Granted mode is the row; requested mode is the column. 147 * Usage: matrix[grmode+1][rqmode+1] 148 */ 149 150 static const int __quecvt_compat_matrix[8][8] = { 151 /* UN NL CR CW PR PW EX PD */ 152 {0, 0, 0, 0, 0, 0, 0, 0}, /* UN */ 153 {0, 0, 1, 1, 1, 1, 1, 0}, /* NL */ 154 {0, 0, 0, 1, 1, 1, 1, 0}, /* CR */ 155 {0, 0, 0, 0, 1, 1, 1, 0}, /* CW */ 156 {0, 0, 0, 1, 0, 1, 1, 0}, /* PR */ 157 {0, 0, 0, 0, 0, 0, 1, 0}, /* PW */ 158 {0, 0, 0, 0, 0, 0, 0, 0}, /* EX */ 159 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */ 160 }; 161 162 void dlm_print_lkb(struct dlm_lkb *lkb) 163 { 164 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x " 165 "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n", 166 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags, 167 lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode, 168 lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid, 169 (unsigned long long)lkb->lkb_recover_seq); 170 } 171 172 static void dlm_print_rsb(struct dlm_rsb *r) 173 { 174 printk(KERN_ERR "rsb: nodeid %d master %d dir %d flags %lx first %x " 175 "rlc %d name %s\n", 176 r->res_nodeid, r->res_master_nodeid, r->res_dir_nodeid, 177 r->res_flags, r->res_first_lkid, r->res_recover_locks_count, 178 r->res_name); 179 } 180 181 void dlm_dump_rsb(struct dlm_rsb *r) 182 { 183 struct dlm_lkb *lkb; 184 185 dlm_print_rsb(r); 186 187 printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n", 188 list_empty(&r->res_root_list), list_empty(&r->res_recover_list)); 189 printk(KERN_ERR "rsb lookup list\n"); 190 list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup) 191 dlm_print_lkb(lkb); 192 printk(KERN_ERR "rsb grant queue:\n"); 193 list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue) 194 dlm_print_lkb(lkb); 195 printk(KERN_ERR "rsb convert queue:\n"); 196 list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue) 197 dlm_print_lkb(lkb); 198 printk(KERN_ERR "rsb wait queue:\n"); 199 list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue) 200 dlm_print_lkb(lkb); 201 } 202 203 /* Threads cannot use the lockspace while it's being recovered */ 204 205 static inline void dlm_lock_recovery(struct dlm_ls *ls) 206 { 207 down_read(&ls->ls_in_recovery); 208 } 209 210 void dlm_unlock_recovery(struct dlm_ls *ls) 211 { 212 up_read(&ls->ls_in_recovery); 213 } 214 215 int dlm_lock_recovery_try(struct dlm_ls *ls) 216 { 217 return down_read_trylock(&ls->ls_in_recovery); 218 } 219 220 static inline int can_be_queued(struct dlm_lkb *lkb) 221 { 222 return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE); 223 } 224 225 static inline int force_blocking_asts(struct dlm_lkb *lkb) 226 { 227 return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST); 228 } 229 230 static inline int is_demoted(struct dlm_lkb *lkb) 231 { 232 return (lkb->lkb_sbflags & DLM_SBF_DEMOTED); 233 } 234 235 static inline int is_altmode(struct dlm_lkb *lkb) 236 { 237 return (lkb->lkb_sbflags & DLM_SBF_ALTMODE); 238 } 239 240 static inline int is_granted(struct dlm_lkb *lkb) 241 { 242 return (lkb->lkb_status == DLM_LKSTS_GRANTED); 243 } 244 245 static inline int is_remote(struct dlm_rsb *r) 246 { 247 DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r);); 248 return !!r->res_nodeid; 249 } 250 251 static inline int is_process_copy(struct dlm_lkb *lkb) 252 { 253 return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY)); 254 } 255 256 static inline int is_master_copy(struct dlm_lkb *lkb) 257 { 258 return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0; 259 } 260 261 static inline int middle_conversion(struct dlm_lkb *lkb) 262 { 263 if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) || 264 (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW)) 265 return 1; 266 return 0; 267 } 268 269 static inline int down_conversion(struct dlm_lkb *lkb) 270 { 271 return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode); 272 } 273 274 static inline int is_overlap_unlock(struct dlm_lkb *lkb) 275 { 276 return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK; 277 } 278 279 static inline int is_overlap_cancel(struct dlm_lkb *lkb) 280 { 281 return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL; 282 } 283 284 static inline int is_overlap(struct dlm_lkb *lkb) 285 { 286 return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK | 287 DLM_IFL_OVERLAP_CANCEL)); 288 } 289 290 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) 291 { 292 if (is_master_copy(lkb)) 293 return; 294 295 del_timeout(lkb); 296 297 DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb);); 298 299 #ifdef CONFIG_DLM_DEPRECATED_API 300 /* if the operation was a cancel, then return -DLM_ECANCEL, if a 301 timeout caused the cancel then return -ETIMEDOUT */ 302 if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) { 303 lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL; 304 rv = -ETIMEDOUT; 305 } 306 #endif 307 308 if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) { 309 lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL; 310 rv = -EDEADLK; 311 } 312 313 dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags); 314 } 315 316 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb) 317 { 318 queue_cast(r, lkb, 319 is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL); 320 } 321 322 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode) 323 { 324 if (is_master_copy(lkb)) { 325 send_bast(r, lkb, rqmode); 326 } else { 327 dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0); 328 } 329 } 330 331 /* 332 * Basic operations on rsb's and lkb's 333 */ 334 335 /* This is only called to add a reference when the code already holds 336 a valid reference to the rsb, so there's no need for locking. */ 337 338 static inline void hold_rsb(struct dlm_rsb *r) 339 { 340 kref_get(&r->res_ref); 341 } 342 343 void dlm_hold_rsb(struct dlm_rsb *r) 344 { 345 hold_rsb(r); 346 } 347 348 /* When all references to the rsb are gone it's transferred to 349 the tossed list for later disposal. */ 350 351 static void put_rsb(struct dlm_rsb *r) 352 { 353 struct dlm_ls *ls = r->res_ls; 354 uint32_t bucket = r->res_bucket; 355 int rv; 356 357 rv = kref_put_lock(&r->res_ref, toss_rsb, 358 &ls->ls_rsbtbl[bucket].lock); 359 if (rv) 360 spin_unlock(&ls->ls_rsbtbl[bucket].lock); 361 } 362 363 void dlm_put_rsb(struct dlm_rsb *r) 364 { 365 put_rsb(r); 366 } 367 368 static int pre_rsb_struct(struct dlm_ls *ls) 369 { 370 struct dlm_rsb *r1, *r2; 371 int count = 0; 372 373 spin_lock(&ls->ls_new_rsb_spin); 374 if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) { 375 spin_unlock(&ls->ls_new_rsb_spin); 376 return 0; 377 } 378 spin_unlock(&ls->ls_new_rsb_spin); 379 380 r1 = dlm_allocate_rsb(ls); 381 r2 = dlm_allocate_rsb(ls); 382 383 spin_lock(&ls->ls_new_rsb_spin); 384 if (r1) { 385 list_add(&r1->res_hashchain, &ls->ls_new_rsb); 386 ls->ls_new_rsb_count++; 387 } 388 if (r2) { 389 list_add(&r2->res_hashchain, &ls->ls_new_rsb); 390 ls->ls_new_rsb_count++; 391 } 392 count = ls->ls_new_rsb_count; 393 spin_unlock(&ls->ls_new_rsb_spin); 394 395 if (!count) 396 return -ENOMEM; 397 return 0; 398 } 399 400 /* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can 401 unlock any spinlocks, go back and call pre_rsb_struct again. 402 Otherwise, take an rsb off the list and return it. */ 403 404 static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len, 405 struct dlm_rsb **r_ret) 406 { 407 struct dlm_rsb *r; 408 int count; 409 410 spin_lock(&ls->ls_new_rsb_spin); 411 if (list_empty(&ls->ls_new_rsb)) { 412 count = ls->ls_new_rsb_count; 413 spin_unlock(&ls->ls_new_rsb_spin); 414 log_debug(ls, "find_rsb retry %d %d %s", 415 count, dlm_config.ci_new_rsb_count, 416 (const char *)name); 417 return -EAGAIN; 418 } 419 420 r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain); 421 list_del(&r->res_hashchain); 422 /* Convert the empty list_head to a NULL rb_node for tree usage: */ 423 memset(&r->res_hashnode, 0, sizeof(struct rb_node)); 424 ls->ls_new_rsb_count--; 425 spin_unlock(&ls->ls_new_rsb_spin); 426 427 r->res_ls = ls; 428 r->res_length = len; 429 memcpy(r->res_name, name, len); 430 mutex_init(&r->res_mutex); 431 432 INIT_LIST_HEAD(&r->res_lookup); 433 INIT_LIST_HEAD(&r->res_grantqueue); 434 INIT_LIST_HEAD(&r->res_convertqueue); 435 INIT_LIST_HEAD(&r->res_waitqueue); 436 INIT_LIST_HEAD(&r->res_root_list); 437 INIT_LIST_HEAD(&r->res_recover_list); 438 439 *r_ret = r; 440 return 0; 441 } 442 443 static int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen) 444 { 445 char maxname[DLM_RESNAME_MAXLEN]; 446 447 memset(maxname, 0, DLM_RESNAME_MAXLEN); 448 memcpy(maxname, name, nlen); 449 return memcmp(r->res_name, maxname, DLM_RESNAME_MAXLEN); 450 } 451 452 int dlm_search_rsb_tree(struct rb_root *tree, const void *name, int len, 453 struct dlm_rsb **r_ret) 454 { 455 struct rb_node *node = tree->rb_node; 456 struct dlm_rsb *r; 457 int rc; 458 459 while (node) { 460 r = rb_entry(node, struct dlm_rsb, res_hashnode); 461 rc = rsb_cmp(r, name, len); 462 if (rc < 0) 463 node = node->rb_left; 464 else if (rc > 0) 465 node = node->rb_right; 466 else 467 goto found; 468 } 469 *r_ret = NULL; 470 return -EBADR; 471 472 found: 473 *r_ret = r; 474 return 0; 475 } 476 477 static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree) 478 { 479 struct rb_node **newn = &tree->rb_node; 480 struct rb_node *parent = NULL; 481 int rc; 482 483 while (*newn) { 484 struct dlm_rsb *cur = rb_entry(*newn, struct dlm_rsb, 485 res_hashnode); 486 487 parent = *newn; 488 rc = rsb_cmp(cur, rsb->res_name, rsb->res_length); 489 if (rc < 0) 490 newn = &parent->rb_left; 491 else if (rc > 0) 492 newn = &parent->rb_right; 493 else { 494 log_print("rsb_insert match"); 495 dlm_dump_rsb(rsb); 496 dlm_dump_rsb(cur); 497 return -EEXIST; 498 } 499 } 500 501 rb_link_node(&rsb->res_hashnode, parent, newn); 502 rb_insert_color(&rsb->res_hashnode, tree); 503 return 0; 504 } 505 506 /* 507 * Find rsb in rsbtbl and potentially create/add one 508 * 509 * Delaying the release of rsb's has a similar benefit to applications keeping 510 * NL locks on an rsb, but without the guarantee that the cached master value 511 * will still be valid when the rsb is reused. Apps aren't always smart enough 512 * to keep NL locks on an rsb that they may lock again shortly; this can lead 513 * to excessive master lookups and removals if we don't delay the release. 514 * 515 * Searching for an rsb means looking through both the normal list and toss 516 * list. When found on the toss list the rsb is moved to the normal list with 517 * ref count of 1; when found on normal list the ref count is incremented. 518 * 519 * rsb's on the keep list are being used locally and refcounted. 520 * rsb's on the toss list are not being used locally, and are not refcounted. 521 * 522 * The toss list rsb's were either 523 * - previously used locally but not any more (were on keep list, then 524 * moved to toss list when last refcount dropped) 525 * - created and put on toss list as a directory record for a lookup 526 * (we are the dir node for the res, but are not using the res right now, 527 * but some other node is) 528 * 529 * The purpose of find_rsb() is to return a refcounted rsb for local use. 530 * So, if the given rsb is on the toss list, it is moved to the keep list 531 * before being returned. 532 * 533 * toss_rsb() happens when all local usage of the rsb is done, i.e. no 534 * more refcounts exist, so the rsb is moved from the keep list to the 535 * toss list. 536 * 537 * rsb's on both keep and toss lists are used for doing a name to master 538 * lookups. rsb's that are in use locally (and being refcounted) are on 539 * the keep list, rsb's that are not in use locally (not refcounted) and 540 * only exist for name/master lookups are on the toss list. 541 * 542 * rsb's on the toss list who's dir_nodeid is not local can have stale 543 * name/master mappings. So, remote requests on such rsb's can potentially 544 * return with an error, which means the mapping is stale and needs to 545 * be updated with a new lookup. (The idea behind MASTER UNCERTAIN and 546 * first_lkid is to keep only a single outstanding request on an rsb 547 * while that rsb has a potentially stale master.) 548 */ 549 550 static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, 551 uint32_t hash, uint32_t b, 552 int dir_nodeid, int from_nodeid, 553 unsigned int flags, struct dlm_rsb **r_ret) 554 { 555 struct dlm_rsb *r = NULL; 556 int our_nodeid = dlm_our_nodeid(); 557 int from_local = 0; 558 int from_other = 0; 559 int from_dir = 0; 560 int create = 0; 561 int error; 562 563 if (flags & R_RECEIVE_REQUEST) { 564 if (from_nodeid == dir_nodeid) 565 from_dir = 1; 566 else 567 from_other = 1; 568 } else if (flags & R_REQUEST) { 569 from_local = 1; 570 } 571 572 /* 573 * flags & R_RECEIVE_RECOVER is from dlm_recover_master_copy, so 574 * from_nodeid has sent us a lock in dlm_recover_locks, believing 575 * we're the new master. Our local recovery may not have set 576 * res_master_nodeid to our_nodeid yet, so allow either. Don't 577 * create the rsb; dlm_recover_process_copy() will handle EBADR 578 * by resending. 579 * 580 * If someone sends us a request, we are the dir node, and we do 581 * not find the rsb anywhere, then recreate it. This happens if 582 * someone sends us a request after we have removed/freed an rsb 583 * from our toss list. (They sent a request instead of lookup 584 * because they are using an rsb from their toss list.) 585 */ 586 587 if (from_local || from_dir || 588 (from_other && (dir_nodeid == our_nodeid))) { 589 create = 1; 590 } 591 592 retry: 593 if (create) { 594 error = pre_rsb_struct(ls); 595 if (error < 0) 596 goto out; 597 } 598 599 spin_lock(&ls->ls_rsbtbl[b].lock); 600 601 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); 602 if (error) 603 goto do_toss; 604 605 /* 606 * rsb is active, so we can't check master_nodeid without lock_rsb. 607 */ 608 609 kref_get(&r->res_ref); 610 goto out_unlock; 611 612 613 do_toss: 614 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); 615 if (error) 616 goto do_new; 617 618 /* 619 * rsb found inactive (master_nodeid may be out of date unless 620 * we are the dir_nodeid or were the master) No other thread 621 * is using this rsb because it's on the toss list, so we can 622 * look at or update res_master_nodeid without lock_rsb. 623 */ 624 625 if ((r->res_master_nodeid != our_nodeid) && from_other) { 626 /* our rsb was not master, and another node (not the dir node) 627 has sent us a request */ 628 log_debug(ls, "find_rsb toss from_other %d master %d dir %d %s", 629 from_nodeid, r->res_master_nodeid, dir_nodeid, 630 r->res_name); 631 error = -ENOTBLK; 632 goto out_unlock; 633 } 634 635 if ((r->res_master_nodeid != our_nodeid) && from_dir) { 636 /* don't think this should ever happen */ 637 log_error(ls, "find_rsb toss from_dir %d master %d", 638 from_nodeid, r->res_master_nodeid); 639 dlm_print_rsb(r); 640 /* fix it and go on */ 641 r->res_master_nodeid = our_nodeid; 642 r->res_nodeid = 0; 643 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN); 644 r->res_first_lkid = 0; 645 } 646 647 if (from_local && (r->res_master_nodeid != our_nodeid)) { 648 /* Because we have held no locks on this rsb, 649 res_master_nodeid could have become stale. */ 650 rsb_set_flag(r, RSB_MASTER_UNCERTAIN); 651 r->res_first_lkid = 0; 652 } 653 654 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); 655 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep); 656 goto out_unlock; 657 658 659 do_new: 660 /* 661 * rsb not found 662 */ 663 664 if (error == -EBADR && !create) 665 goto out_unlock; 666 667 error = get_rsb_struct(ls, name, len, &r); 668 if (error == -EAGAIN) { 669 spin_unlock(&ls->ls_rsbtbl[b].lock); 670 goto retry; 671 } 672 if (error) 673 goto out_unlock; 674 675 r->res_hash = hash; 676 r->res_bucket = b; 677 r->res_dir_nodeid = dir_nodeid; 678 kref_init(&r->res_ref); 679 680 if (from_dir) { 681 /* want to see how often this happens */ 682 log_debug(ls, "find_rsb new from_dir %d recreate %s", 683 from_nodeid, r->res_name); 684 r->res_master_nodeid = our_nodeid; 685 r->res_nodeid = 0; 686 goto out_add; 687 } 688 689 if (from_other && (dir_nodeid != our_nodeid)) { 690 /* should never happen */ 691 log_error(ls, "find_rsb new from_other %d dir %d our %d %s", 692 from_nodeid, dir_nodeid, our_nodeid, r->res_name); 693 dlm_free_rsb(r); 694 r = NULL; 695 error = -ENOTBLK; 696 goto out_unlock; 697 } 698 699 if (from_other) { 700 log_debug(ls, "find_rsb new from_other %d dir %d %s", 701 from_nodeid, dir_nodeid, r->res_name); 702 } 703 704 if (dir_nodeid == our_nodeid) { 705 /* When we are the dir nodeid, we can set the master 706 node immediately */ 707 r->res_master_nodeid = our_nodeid; 708 r->res_nodeid = 0; 709 } else { 710 /* set_master will send_lookup to dir_nodeid */ 711 r->res_master_nodeid = 0; 712 r->res_nodeid = -1; 713 } 714 715 out_add: 716 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep); 717 out_unlock: 718 spin_unlock(&ls->ls_rsbtbl[b].lock); 719 out: 720 *r_ret = r; 721 return error; 722 } 723 724 /* During recovery, other nodes can send us new MSTCPY locks (from 725 dlm_recover_locks) before we've made ourself master (in 726 dlm_recover_masters). */ 727 728 static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len, 729 uint32_t hash, uint32_t b, 730 int dir_nodeid, int from_nodeid, 731 unsigned int flags, struct dlm_rsb **r_ret) 732 { 733 struct dlm_rsb *r = NULL; 734 int our_nodeid = dlm_our_nodeid(); 735 int recover = (flags & R_RECEIVE_RECOVER); 736 int error; 737 738 retry: 739 error = pre_rsb_struct(ls); 740 if (error < 0) 741 goto out; 742 743 spin_lock(&ls->ls_rsbtbl[b].lock); 744 745 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); 746 if (error) 747 goto do_toss; 748 749 /* 750 * rsb is active, so we can't check master_nodeid without lock_rsb. 751 */ 752 753 kref_get(&r->res_ref); 754 goto out_unlock; 755 756 757 do_toss: 758 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); 759 if (error) 760 goto do_new; 761 762 /* 763 * rsb found inactive. No other thread is using this rsb because 764 * it's on the toss list, so we can look at or update 765 * res_master_nodeid without lock_rsb. 766 */ 767 768 if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) { 769 /* our rsb is not master, and another node has sent us a 770 request; this should never happen */ 771 log_error(ls, "find_rsb toss from_nodeid %d master %d dir %d", 772 from_nodeid, r->res_master_nodeid, dir_nodeid); 773 dlm_print_rsb(r); 774 error = -ENOTBLK; 775 goto out_unlock; 776 } 777 778 if (!recover && (r->res_master_nodeid != our_nodeid) && 779 (dir_nodeid == our_nodeid)) { 780 /* our rsb is not master, and we are dir; may as well fix it; 781 this should never happen */ 782 log_error(ls, "find_rsb toss our %d master %d dir %d", 783 our_nodeid, r->res_master_nodeid, dir_nodeid); 784 dlm_print_rsb(r); 785 r->res_master_nodeid = our_nodeid; 786 r->res_nodeid = 0; 787 } 788 789 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); 790 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep); 791 goto out_unlock; 792 793 794 do_new: 795 /* 796 * rsb not found 797 */ 798 799 error = get_rsb_struct(ls, name, len, &r); 800 if (error == -EAGAIN) { 801 spin_unlock(&ls->ls_rsbtbl[b].lock); 802 goto retry; 803 } 804 if (error) 805 goto out_unlock; 806 807 r->res_hash = hash; 808 r->res_bucket = b; 809 r->res_dir_nodeid = dir_nodeid; 810 r->res_master_nodeid = dir_nodeid; 811 r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid; 812 kref_init(&r->res_ref); 813 814 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep); 815 out_unlock: 816 spin_unlock(&ls->ls_rsbtbl[b].lock); 817 out: 818 *r_ret = r; 819 return error; 820 } 821 822 static int find_rsb(struct dlm_ls *ls, const void *name, int len, 823 int from_nodeid, unsigned int flags, 824 struct dlm_rsb **r_ret) 825 { 826 uint32_t hash, b; 827 int dir_nodeid; 828 829 if (len > DLM_RESNAME_MAXLEN) 830 return -EINVAL; 831 832 hash = jhash(name, len, 0); 833 b = hash & (ls->ls_rsbtbl_size - 1); 834 835 dir_nodeid = dlm_hash2nodeid(ls, hash); 836 837 if (dlm_no_directory(ls)) 838 return find_rsb_nodir(ls, name, len, hash, b, dir_nodeid, 839 from_nodeid, flags, r_ret); 840 else 841 return find_rsb_dir(ls, name, len, hash, b, dir_nodeid, 842 from_nodeid, flags, r_ret); 843 } 844 845 /* we have received a request and found that res_master_nodeid != our_nodeid, 846 so we need to return an error or make ourself the master */ 847 848 static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r, 849 int from_nodeid) 850 { 851 if (dlm_no_directory(ls)) { 852 log_error(ls, "find_rsb keep from_nodeid %d master %d dir %d", 853 from_nodeid, r->res_master_nodeid, 854 r->res_dir_nodeid); 855 dlm_print_rsb(r); 856 return -ENOTBLK; 857 } 858 859 if (from_nodeid != r->res_dir_nodeid) { 860 /* our rsb is not master, and another node (not the dir node) 861 has sent us a request. this is much more common when our 862 master_nodeid is zero, so limit debug to non-zero. */ 863 864 if (r->res_master_nodeid) { 865 log_debug(ls, "validate master from_other %d master %d " 866 "dir %d first %x %s", from_nodeid, 867 r->res_master_nodeid, r->res_dir_nodeid, 868 r->res_first_lkid, r->res_name); 869 } 870 return -ENOTBLK; 871 } else { 872 /* our rsb is not master, but the dir nodeid has sent us a 873 request; this could happen with master 0 / res_nodeid -1 */ 874 875 if (r->res_master_nodeid) { 876 log_error(ls, "validate master from_dir %d master %d " 877 "first %x %s", 878 from_nodeid, r->res_master_nodeid, 879 r->res_first_lkid, r->res_name); 880 } 881 882 r->res_master_nodeid = dlm_our_nodeid(); 883 r->res_nodeid = 0; 884 return 0; 885 } 886 } 887 888 static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_nodeid, 889 int from_nodeid, bool toss_list, unsigned int flags, 890 int *r_nodeid, int *result) 891 { 892 int fix_master = (flags & DLM_LU_RECOVER_MASTER); 893 int from_master = (flags & DLM_LU_RECOVER_DIR); 894 895 if (r->res_dir_nodeid != our_nodeid) { 896 /* should not happen, but may as well fix it and carry on */ 897 log_error(ls, "%s res_dir %d our %d %s", __func__, 898 r->res_dir_nodeid, our_nodeid, r->res_name); 899 r->res_dir_nodeid = our_nodeid; 900 } 901 902 if (fix_master && dlm_is_removed(ls, r->res_master_nodeid)) { 903 /* Recovery uses this function to set a new master when 904 * the previous master failed. Setting NEW_MASTER will 905 * force dlm_recover_masters to call recover_master on this 906 * rsb even though the res_nodeid is no longer removed. 907 */ 908 909 r->res_master_nodeid = from_nodeid; 910 r->res_nodeid = from_nodeid; 911 rsb_set_flag(r, RSB_NEW_MASTER); 912 913 if (toss_list) { 914 /* I don't think we should ever find it on toss list. */ 915 log_error(ls, "%s fix_master on toss", __func__); 916 dlm_dump_rsb(r); 917 } 918 } 919 920 if (from_master && (r->res_master_nodeid != from_nodeid)) { 921 /* this will happen if from_nodeid became master during 922 * a previous recovery cycle, and we aborted the previous 923 * cycle before recovering this master value 924 */ 925 926 log_limit(ls, "%s from_master %d master_nodeid %d res_nodeid %d first %x %s", 927 __func__, from_nodeid, r->res_master_nodeid, 928 r->res_nodeid, r->res_first_lkid, r->res_name); 929 930 if (r->res_master_nodeid == our_nodeid) { 931 log_error(ls, "from_master %d our_master", from_nodeid); 932 dlm_dump_rsb(r); 933 goto ret_assign; 934 } 935 936 r->res_master_nodeid = from_nodeid; 937 r->res_nodeid = from_nodeid; 938 rsb_set_flag(r, RSB_NEW_MASTER); 939 } 940 941 if (!r->res_master_nodeid) { 942 /* this will happen if recovery happens while we're looking 943 * up the master for this rsb 944 */ 945 946 log_debug(ls, "%s master 0 to %d first %x %s", __func__, 947 from_nodeid, r->res_first_lkid, r->res_name); 948 r->res_master_nodeid = from_nodeid; 949 r->res_nodeid = from_nodeid; 950 } 951 952 if (!from_master && !fix_master && 953 (r->res_master_nodeid == from_nodeid)) { 954 /* this can happen when the master sends remove, the dir node 955 * finds the rsb on the keep list and ignores the remove, 956 * and the former master sends a lookup 957 */ 958 959 log_limit(ls, "%s from master %d flags %x first %x %s", 960 __func__, from_nodeid, flags, r->res_first_lkid, 961 r->res_name); 962 } 963 964 ret_assign: 965 *r_nodeid = r->res_master_nodeid; 966 if (result) 967 *result = DLM_LU_MATCH; 968 } 969 970 /* 971 * We're the dir node for this res and another node wants to know the 972 * master nodeid. During normal operation (non recovery) this is only 973 * called from receive_lookup(); master lookups when the local node is 974 * the dir node are done by find_rsb(). 975 * 976 * normal operation, we are the dir node for a resource 977 * . _request_lock 978 * . set_master 979 * . send_lookup 980 * . receive_lookup 981 * . dlm_master_lookup flags 0 982 * 983 * recover directory, we are rebuilding dir for all resources 984 * . dlm_recover_directory 985 * . dlm_rcom_names 986 * remote node sends back the rsb names it is master of and we are dir of 987 * . dlm_master_lookup RECOVER_DIR (fix_master 0, from_master 1) 988 * we either create new rsb setting remote node as master, or find existing 989 * rsb and set master to be the remote node. 990 * 991 * recover masters, we are finding the new master for resources 992 * . dlm_recover_masters 993 * . recover_master 994 * . dlm_send_rcom_lookup 995 * . receive_rcom_lookup 996 * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0) 997 */ 998 999 int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, char *name, int len, 1000 unsigned int flags, int *r_nodeid, int *result) 1001 { 1002 struct dlm_rsb *r = NULL; 1003 uint32_t hash, b; 1004 int our_nodeid = dlm_our_nodeid(); 1005 int dir_nodeid, error; 1006 1007 if (len > DLM_RESNAME_MAXLEN) 1008 return -EINVAL; 1009 1010 if (from_nodeid == our_nodeid) { 1011 log_error(ls, "dlm_master_lookup from our_nodeid %d flags %x", 1012 our_nodeid, flags); 1013 return -EINVAL; 1014 } 1015 1016 hash = jhash(name, len, 0); 1017 b = hash & (ls->ls_rsbtbl_size - 1); 1018 1019 dir_nodeid = dlm_hash2nodeid(ls, hash); 1020 if (dir_nodeid != our_nodeid) { 1021 log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d", 1022 from_nodeid, dir_nodeid, our_nodeid, hash, 1023 ls->ls_num_nodes); 1024 *r_nodeid = -1; 1025 return -EINVAL; 1026 } 1027 1028 retry: 1029 error = pre_rsb_struct(ls); 1030 if (error < 0) 1031 return error; 1032 1033 spin_lock(&ls->ls_rsbtbl[b].lock); 1034 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); 1035 if (!error) { 1036 /* because the rsb is active, we need to lock_rsb before 1037 * checking/changing re_master_nodeid 1038 */ 1039 1040 hold_rsb(r); 1041 spin_unlock(&ls->ls_rsbtbl[b].lock); 1042 lock_rsb(r); 1043 1044 __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, false, 1045 flags, r_nodeid, result); 1046 1047 /* the rsb was active */ 1048 unlock_rsb(r); 1049 put_rsb(r); 1050 1051 return 0; 1052 } 1053 1054 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); 1055 if (error) 1056 goto not_found; 1057 1058 /* because the rsb is inactive (on toss list), it's not refcounted 1059 * and lock_rsb is not used, but is protected by the rsbtbl lock 1060 */ 1061 1062 __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, true, flags, 1063 r_nodeid, result); 1064 1065 r->res_toss_time = jiffies; 1066 /* the rsb was inactive (on toss list) */ 1067 spin_unlock(&ls->ls_rsbtbl[b].lock); 1068 1069 return 0; 1070 1071 not_found: 1072 error = get_rsb_struct(ls, name, len, &r); 1073 if (error == -EAGAIN) { 1074 spin_unlock(&ls->ls_rsbtbl[b].lock); 1075 goto retry; 1076 } 1077 if (error) 1078 goto out_unlock; 1079 1080 r->res_hash = hash; 1081 r->res_bucket = b; 1082 r->res_dir_nodeid = our_nodeid; 1083 r->res_master_nodeid = from_nodeid; 1084 r->res_nodeid = from_nodeid; 1085 kref_init(&r->res_ref); 1086 r->res_toss_time = jiffies; 1087 1088 error = rsb_insert(r, &ls->ls_rsbtbl[b].toss); 1089 if (error) { 1090 /* should never happen */ 1091 dlm_free_rsb(r); 1092 spin_unlock(&ls->ls_rsbtbl[b].lock); 1093 goto retry; 1094 } 1095 1096 if (result) 1097 *result = DLM_LU_ADD; 1098 *r_nodeid = from_nodeid; 1099 out_unlock: 1100 spin_unlock(&ls->ls_rsbtbl[b].lock); 1101 return error; 1102 } 1103 1104 static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash) 1105 { 1106 struct rb_node *n; 1107 struct dlm_rsb *r; 1108 int i; 1109 1110 for (i = 0; i < ls->ls_rsbtbl_size; i++) { 1111 spin_lock(&ls->ls_rsbtbl[i].lock); 1112 for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) { 1113 r = rb_entry(n, struct dlm_rsb, res_hashnode); 1114 if (r->res_hash == hash) 1115 dlm_dump_rsb(r); 1116 } 1117 spin_unlock(&ls->ls_rsbtbl[i].lock); 1118 } 1119 } 1120 1121 void dlm_dump_rsb_name(struct dlm_ls *ls, char *name, int len) 1122 { 1123 struct dlm_rsb *r = NULL; 1124 uint32_t hash, b; 1125 int error; 1126 1127 hash = jhash(name, len, 0); 1128 b = hash & (ls->ls_rsbtbl_size - 1); 1129 1130 spin_lock(&ls->ls_rsbtbl[b].lock); 1131 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); 1132 if (!error) 1133 goto out_dump; 1134 1135 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); 1136 if (error) 1137 goto out; 1138 out_dump: 1139 dlm_dump_rsb(r); 1140 out: 1141 spin_unlock(&ls->ls_rsbtbl[b].lock); 1142 } 1143 1144 static void toss_rsb(struct kref *kref) 1145 { 1146 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref); 1147 struct dlm_ls *ls = r->res_ls; 1148 1149 DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r);); 1150 kref_init(&r->res_ref); 1151 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep); 1152 rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss); 1153 r->res_toss_time = jiffies; 1154 ls->ls_rsbtbl[r->res_bucket].flags |= DLM_RTF_SHRINK; 1155 if (r->res_lvbptr) { 1156 dlm_free_lvb(r->res_lvbptr); 1157 r->res_lvbptr = NULL; 1158 } 1159 } 1160 1161 /* See comment for unhold_lkb */ 1162 1163 static void unhold_rsb(struct dlm_rsb *r) 1164 { 1165 int rv; 1166 rv = kref_put(&r->res_ref, toss_rsb); 1167 DLM_ASSERT(!rv, dlm_dump_rsb(r);); 1168 } 1169 1170 static void kill_rsb(struct kref *kref) 1171 { 1172 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref); 1173 1174 /* All work is done after the return from kref_put() so we 1175 can release the write_lock before the remove and free. */ 1176 1177 DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r);); 1178 DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r);); 1179 DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r);); 1180 DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r);); 1181 DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r);); 1182 DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r);); 1183 } 1184 1185 /* Attaching/detaching lkb's from rsb's is for rsb reference counting. 1186 The rsb must exist as long as any lkb's for it do. */ 1187 1188 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb) 1189 { 1190 hold_rsb(r); 1191 lkb->lkb_resource = r; 1192 } 1193 1194 static void detach_lkb(struct dlm_lkb *lkb) 1195 { 1196 if (lkb->lkb_resource) { 1197 put_rsb(lkb->lkb_resource); 1198 lkb->lkb_resource = NULL; 1199 } 1200 } 1201 1202 static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret, 1203 int start, int end) 1204 { 1205 struct dlm_lkb *lkb; 1206 int rv; 1207 1208 lkb = dlm_allocate_lkb(ls); 1209 if (!lkb) 1210 return -ENOMEM; 1211 1212 lkb->lkb_last_bast_mode = -1; 1213 lkb->lkb_nodeid = -1; 1214 lkb->lkb_grmode = DLM_LOCK_IV; 1215 kref_init(&lkb->lkb_ref); 1216 INIT_LIST_HEAD(&lkb->lkb_ownqueue); 1217 INIT_LIST_HEAD(&lkb->lkb_rsb_lookup); 1218 #ifdef CONFIG_DLM_DEPRECATED_API 1219 INIT_LIST_HEAD(&lkb->lkb_time_list); 1220 #endif 1221 INIT_LIST_HEAD(&lkb->lkb_cb_list); 1222 INIT_LIST_HEAD(&lkb->lkb_callbacks); 1223 spin_lock_init(&lkb->lkb_cb_lock); 1224 INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work); 1225 1226 idr_preload(GFP_NOFS); 1227 spin_lock(&ls->ls_lkbidr_spin); 1228 rv = idr_alloc(&ls->ls_lkbidr, lkb, start, end, GFP_NOWAIT); 1229 if (rv >= 0) 1230 lkb->lkb_id = rv; 1231 spin_unlock(&ls->ls_lkbidr_spin); 1232 idr_preload_end(); 1233 1234 if (rv < 0) { 1235 log_error(ls, "create_lkb idr error %d", rv); 1236 dlm_free_lkb(lkb); 1237 return rv; 1238 } 1239 1240 *lkb_ret = lkb; 1241 return 0; 1242 } 1243 1244 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret) 1245 { 1246 return _create_lkb(ls, lkb_ret, 1, 0); 1247 } 1248 1249 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret) 1250 { 1251 struct dlm_lkb *lkb; 1252 1253 spin_lock(&ls->ls_lkbidr_spin); 1254 lkb = idr_find(&ls->ls_lkbidr, lkid); 1255 if (lkb) 1256 kref_get(&lkb->lkb_ref); 1257 spin_unlock(&ls->ls_lkbidr_spin); 1258 1259 *lkb_ret = lkb; 1260 return lkb ? 0 : -ENOENT; 1261 } 1262 1263 static void kill_lkb(struct kref *kref) 1264 { 1265 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref); 1266 1267 /* All work is done after the return from kref_put() so we 1268 can release the write_lock before the detach_lkb */ 1269 1270 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb);); 1271 } 1272 1273 /* __put_lkb() is used when an lkb may not have an rsb attached to 1274 it so we need to provide the lockspace explicitly */ 1275 1276 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb) 1277 { 1278 uint32_t lkid = lkb->lkb_id; 1279 int rv; 1280 1281 rv = kref_put_lock(&lkb->lkb_ref, kill_lkb, 1282 &ls->ls_lkbidr_spin); 1283 if (rv) { 1284 idr_remove(&ls->ls_lkbidr, lkid); 1285 spin_unlock(&ls->ls_lkbidr_spin); 1286 1287 detach_lkb(lkb); 1288 1289 /* for local/process lkbs, lvbptr points to caller's lksb */ 1290 if (lkb->lkb_lvbptr && is_master_copy(lkb)) 1291 dlm_free_lvb(lkb->lkb_lvbptr); 1292 dlm_free_lkb(lkb); 1293 } 1294 1295 return rv; 1296 } 1297 1298 int dlm_put_lkb(struct dlm_lkb *lkb) 1299 { 1300 struct dlm_ls *ls; 1301 1302 DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb);); 1303 DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb);); 1304 1305 ls = lkb->lkb_resource->res_ls; 1306 return __put_lkb(ls, lkb); 1307 } 1308 1309 /* This is only called to add a reference when the code already holds 1310 a valid reference to the lkb, so there's no need for locking. */ 1311 1312 static inline void hold_lkb(struct dlm_lkb *lkb) 1313 { 1314 kref_get(&lkb->lkb_ref); 1315 } 1316 1317 static void unhold_lkb_assert(struct kref *kref) 1318 { 1319 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref); 1320 1321 DLM_ASSERT(false, dlm_print_lkb(lkb);); 1322 } 1323 1324 /* This is called when we need to remove a reference and are certain 1325 it's not the last ref. e.g. del_lkb is always called between a 1326 find_lkb/put_lkb and is always the inverse of a previous add_lkb. 1327 put_lkb would work fine, but would involve unnecessary locking */ 1328 1329 static inline void unhold_lkb(struct dlm_lkb *lkb) 1330 { 1331 kref_put(&lkb->lkb_ref, unhold_lkb_assert); 1332 } 1333 1334 static void lkb_add_ordered(struct list_head *new, struct list_head *head, 1335 int mode) 1336 { 1337 struct dlm_lkb *lkb = NULL, *iter; 1338 1339 list_for_each_entry(iter, head, lkb_statequeue) 1340 if (iter->lkb_rqmode < mode) { 1341 lkb = iter; 1342 list_add_tail(new, &iter->lkb_statequeue); 1343 break; 1344 } 1345 1346 if (!lkb) 1347 list_add_tail(new, head); 1348 } 1349 1350 /* add/remove lkb to rsb's grant/convert/wait queue */ 1351 1352 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status) 1353 { 1354 kref_get(&lkb->lkb_ref); 1355 1356 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb);); 1357 1358 lkb->lkb_timestamp = ktime_get(); 1359 1360 lkb->lkb_status = status; 1361 1362 switch (status) { 1363 case DLM_LKSTS_WAITING: 1364 if (lkb->lkb_exflags & DLM_LKF_HEADQUE) 1365 list_add(&lkb->lkb_statequeue, &r->res_waitqueue); 1366 else 1367 list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue); 1368 break; 1369 case DLM_LKSTS_GRANTED: 1370 /* convention says granted locks kept in order of grmode */ 1371 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue, 1372 lkb->lkb_grmode); 1373 break; 1374 case DLM_LKSTS_CONVERT: 1375 if (lkb->lkb_exflags & DLM_LKF_HEADQUE) 1376 list_add(&lkb->lkb_statequeue, &r->res_convertqueue); 1377 else 1378 list_add_tail(&lkb->lkb_statequeue, 1379 &r->res_convertqueue); 1380 break; 1381 default: 1382 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status);); 1383 } 1384 } 1385 1386 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb) 1387 { 1388 lkb->lkb_status = 0; 1389 list_del(&lkb->lkb_statequeue); 1390 unhold_lkb(lkb); 1391 } 1392 1393 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts) 1394 { 1395 hold_lkb(lkb); 1396 del_lkb(r, lkb); 1397 add_lkb(r, lkb, sts); 1398 unhold_lkb(lkb); 1399 } 1400 1401 static int msg_reply_type(int mstype) 1402 { 1403 switch (mstype) { 1404 case DLM_MSG_REQUEST: 1405 return DLM_MSG_REQUEST_REPLY; 1406 case DLM_MSG_CONVERT: 1407 return DLM_MSG_CONVERT_REPLY; 1408 case DLM_MSG_UNLOCK: 1409 return DLM_MSG_UNLOCK_REPLY; 1410 case DLM_MSG_CANCEL: 1411 return DLM_MSG_CANCEL_REPLY; 1412 case DLM_MSG_LOOKUP: 1413 return DLM_MSG_LOOKUP_REPLY; 1414 } 1415 return -1; 1416 } 1417 1418 /* add/remove lkb from global waiters list of lkb's waiting for 1419 a reply from a remote node */ 1420 1421 static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid) 1422 { 1423 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 1424 int error = 0; 1425 1426 mutex_lock(&ls->ls_waiters_mutex); 1427 1428 if (is_overlap_unlock(lkb) || 1429 (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) { 1430 error = -EINVAL; 1431 goto out; 1432 } 1433 1434 if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) { 1435 switch (mstype) { 1436 case DLM_MSG_UNLOCK: 1437 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK; 1438 break; 1439 case DLM_MSG_CANCEL: 1440 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL; 1441 break; 1442 default: 1443 error = -EBUSY; 1444 goto out; 1445 } 1446 lkb->lkb_wait_count++; 1447 hold_lkb(lkb); 1448 1449 log_debug(ls, "addwait %x cur %d overlap %d count %d f %x", 1450 lkb->lkb_id, lkb->lkb_wait_type, mstype, 1451 lkb->lkb_wait_count, lkb->lkb_flags); 1452 goto out; 1453 } 1454 1455 DLM_ASSERT(!lkb->lkb_wait_count, 1456 dlm_print_lkb(lkb); 1457 printk("wait_count %d\n", lkb->lkb_wait_count);); 1458 1459 lkb->lkb_wait_count++; 1460 lkb->lkb_wait_type = mstype; 1461 lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */ 1462 hold_lkb(lkb); 1463 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters); 1464 out: 1465 if (error) 1466 log_error(ls, "addwait error %x %d flags %x %d %d %s", 1467 lkb->lkb_id, error, lkb->lkb_flags, mstype, 1468 lkb->lkb_wait_type, lkb->lkb_resource->res_name); 1469 mutex_unlock(&ls->ls_waiters_mutex); 1470 return error; 1471 } 1472 1473 /* We clear the RESEND flag because we might be taking an lkb off the waiters 1474 list as part of process_requestqueue (e.g. a lookup that has an optimized 1475 request reply on the requestqueue) between dlm_recover_waiters_pre() which 1476 set RESEND and dlm_recover_waiters_post() */ 1477 1478 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype, 1479 struct dlm_message *ms) 1480 { 1481 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 1482 int overlap_done = 0; 1483 1484 if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) { 1485 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id); 1486 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK; 1487 overlap_done = 1; 1488 goto out_del; 1489 } 1490 1491 if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) { 1492 log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id); 1493 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; 1494 overlap_done = 1; 1495 goto out_del; 1496 } 1497 1498 /* Cancel state was preemptively cleared by a successful convert, 1499 see next comment, nothing to do. */ 1500 1501 if ((mstype == DLM_MSG_CANCEL_REPLY) && 1502 (lkb->lkb_wait_type != DLM_MSG_CANCEL)) { 1503 log_debug(ls, "remwait %x cancel_reply wait_type %d", 1504 lkb->lkb_id, lkb->lkb_wait_type); 1505 return -1; 1506 } 1507 1508 /* Remove for the convert reply, and premptively remove for the 1509 cancel reply. A convert has been granted while there's still 1510 an outstanding cancel on it (the cancel is moot and the result 1511 in the cancel reply should be 0). We preempt the cancel reply 1512 because the app gets the convert result and then can follow up 1513 with another op, like convert. This subsequent op would see the 1514 lingering state of the cancel and fail with -EBUSY. */ 1515 1516 if ((mstype == DLM_MSG_CONVERT_REPLY) && 1517 (lkb->lkb_wait_type == DLM_MSG_CONVERT) && 1518 is_overlap_cancel(lkb) && ms && !ms->m_result) { 1519 log_debug(ls, "remwait %x convert_reply zap overlap_cancel", 1520 lkb->lkb_id); 1521 lkb->lkb_wait_type = 0; 1522 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; 1523 lkb->lkb_wait_count--; 1524 unhold_lkb(lkb); 1525 goto out_del; 1526 } 1527 1528 /* N.B. type of reply may not always correspond to type of original 1529 msg due to lookup->request optimization, verify others? */ 1530 1531 if (lkb->lkb_wait_type) { 1532 lkb->lkb_wait_type = 0; 1533 goto out_del; 1534 } 1535 1536 log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait", 1537 lkb->lkb_id, ms ? le32_to_cpu(ms->m_header.h_nodeid) : 0, 1538 lkb->lkb_remid, mstype, lkb->lkb_flags); 1539 return -1; 1540 1541 out_del: 1542 /* the force-unlock/cancel has completed and we haven't recvd a reply 1543 to the op that was in progress prior to the unlock/cancel; we 1544 give up on any reply to the earlier op. FIXME: not sure when/how 1545 this would happen */ 1546 1547 if (overlap_done && lkb->lkb_wait_type) { 1548 log_error(ls, "remwait error %x reply %d wait_type %d overlap", 1549 lkb->lkb_id, mstype, lkb->lkb_wait_type); 1550 lkb->lkb_wait_count--; 1551 unhold_lkb(lkb); 1552 lkb->lkb_wait_type = 0; 1553 } 1554 1555 DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb);); 1556 1557 lkb->lkb_flags &= ~DLM_IFL_RESEND; 1558 lkb->lkb_wait_count--; 1559 if (!lkb->lkb_wait_count) 1560 list_del_init(&lkb->lkb_wait_reply); 1561 unhold_lkb(lkb); 1562 return 0; 1563 } 1564 1565 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype) 1566 { 1567 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 1568 int error; 1569 1570 mutex_lock(&ls->ls_waiters_mutex); 1571 error = _remove_from_waiters(lkb, mstype, NULL); 1572 mutex_unlock(&ls->ls_waiters_mutex); 1573 return error; 1574 } 1575 1576 /* Handles situations where we might be processing a "fake" or "stub" reply in 1577 which we can't try to take waiters_mutex again. */ 1578 1579 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms) 1580 { 1581 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 1582 int error; 1583 1584 if (ms->m_flags != cpu_to_le32(DLM_IFL_STUB_MS)) 1585 mutex_lock(&ls->ls_waiters_mutex); 1586 error = _remove_from_waiters(lkb, le32_to_cpu(ms->m_type), ms); 1587 if (ms->m_flags != cpu_to_le32(DLM_IFL_STUB_MS)) 1588 mutex_unlock(&ls->ls_waiters_mutex); 1589 return error; 1590 } 1591 1592 /* If there's an rsb for the same resource being removed, ensure 1593 * that the remove message is sent before the new lookup message. 1594 */ 1595 1596 #define DLM_WAIT_PENDING_COND(ls, r) \ 1597 (ls->ls_remove_len && \ 1598 !rsb_cmp(r, ls->ls_remove_name, \ 1599 ls->ls_remove_len)) 1600 1601 static void wait_pending_remove(struct dlm_rsb *r) 1602 { 1603 struct dlm_ls *ls = r->res_ls; 1604 restart: 1605 spin_lock(&ls->ls_remove_spin); 1606 if (DLM_WAIT_PENDING_COND(ls, r)) { 1607 log_debug(ls, "delay lookup for remove dir %d %s", 1608 r->res_dir_nodeid, r->res_name); 1609 spin_unlock(&ls->ls_remove_spin); 1610 wait_event(ls->ls_remove_wait, !DLM_WAIT_PENDING_COND(ls, r)); 1611 goto restart; 1612 } 1613 spin_unlock(&ls->ls_remove_spin); 1614 } 1615 1616 /* 1617 * ls_remove_spin protects ls_remove_name and ls_remove_len which are 1618 * read by other threads in wait_pending_remove. ls_remove_names 1619 * and ls_remove_lens are only used by the scan thread, so they do 1620 * not need protection. 1621 */ 1622 1623 static void shrink_bucket(struct dlm_ls *ls, int b) 1624 { 1625 struct rb_node *n, *next; 1626 struct dlm_rsb *r; 1627 char *name; 1628 int our_nodeid = dlm_our_nodeid(); 1629 int remote_count = 0; 1630 int need_shrink = 0; 1631 int i, len, rv; 1632 1633 memset(&ls->ls_remove_lens, 0, sizeof(int) * DLM_REMOVE_NAMES_MAX); 1634 1635 spin_lock(&ls->ls_rsbtbl[b].lock); 1636 1637 if (!(ls->ls_rsbtbl[b].flags & DLM_RTF_SHRINK)) { 1638 spin_unlock(&ls->ls_rsbtbl[b].lock); 1639 return; 1640 } 1641 1642 for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = next) { 1643 next = rb_next(n); 1644 r = rb_entry(n, struct dlm_rsb, res_hashnode); 1645 1646 /* If we're the directory record for this rsb, and 1647 we're not the master of it, then we need to wait 1648 for the master node to send us a dir remove for 1649 before removing the dir record. */ 1650 1651 if (!dlm_no_directory(ls) && 1652 (r->res_master_nodeid != our_nodeid) && 1653 (dlm_dir_nodeid(r) == our_nodeid)) { 1654 continue; 1655 } 1656 1657 need_shrink = 1; 1658 1659 if (!time_after_eq(jiffies, r->res_toss_time + 1660 dlm_config.ci_toss_secs * HZ)) { 1661 continue; 1662 } 1663 1664 if (!dlm_no_directory(ls) && 1665 (r->res_master_nodeid == our_nodeid) && 1666 (dlm_dir_nodeid(r) != our_nodeid)) { 1667 1668 /* We're the master of this rsb but we're not 1669 the directory record, so we need to tell the 1670 dir node to remove the dir record. */ 1671 1672 ls->ls_remove_lens[remote_count] = r->res_length; 1673 memcpy(ls->ls_remove_names[remote_count], r->res_name, 1674 DLM_RESNAME_MAXLEN); 1675 remote_count++; 1676 1677 if (remote_count >= DLM_REMOVE_NAMES_MAX) 1678 break; 1679 continue; 1680 } 1681 1682 if (!kref_put(&r->res_ref, kill_rsb)) { 1683 log_error(ls, "tossed rsb in use %s", r->res_name); 1684 continue; 1685 } 1686 1687 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); 1688 dlm_free_rsb(r); 1689 } 1690 1691 if (need_shrink) 1692 ls->ls_rsbtbl[b].flags |= DLM_RTF_SHRINK; 1693 else 1694 ls->ls_rsbtbl[b].flags &= ~DLM_RTF_SHRINK; 1695 spin_unlock(&ls->ls_rsbtbl[b].lock); 1696 1697 /* 1698 * While searching for rsb's to free, we found some that require 1699 * remote removal. We leave them in place and find them again here 1700 * so there is a very small gap between removing them from the toss 1701 * list and sending the removal. Keeping this gap small is 1702 * important to keep us (the master node) from being out of sync 1703 * with the remote dir node for very long. 1704 * 1705 * From the time the rsb is removed from toss until just after 1706 * send_remove, the rsb name is saved in ls_remove_name. A new 1707 * lookup checks this to ensure that a new lookup message for the 1708 * same resource name is not sent just before the remove message. 1709 */ 1710 1711 for (i = 0; i < remote_count; i++) { 1712 name = ls->ls_remove_names[i]; 1713 len = ls->ls_remove_lens[i]; 1714 1715 spin_lock(&ls->ls_rsbtbl[b].lock); 1716 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); 1717 if (rv) { 1718 spin_unlock(&ls->ls_rsbtbl[b].lock); 1719 log_debug(ls, "remove_name not toss %s", name); 1720 continue; 1721 } 1722 1723 if (r->res_master_nodeid != our_nodeid) { 1724 spin_unlock(&ls->ls_rsbtbl[b].lock); 1725 log_debug(ls, "remove_name master %d dir %d our %d %s", 1726 r->res_master_nodeid, r->res_dir_nodeid, 1727 our_nodeid, name); 1728 continue; 1729 } 1730 1731 if (r->res_dir_nodeid == our_nodeid) { 1732 /* should never happen */ 1733 spin_unlock(&ls->ls_rsbtbl[b].lock); 1734 log_error(ls, "remove_name dir %d master %d our %d %s", 1735 r->res_dir_nodeid, r->res_master_nodeid, 1736 our_nodeid, name); 1737 continue; 1738 } 1739 1740 if (!time_after_eq(jiffies, r->res_toss_time + 1741 dlm_config.ci_toss_secs * HZ)) { 1742 spin_unlock(&ls->ls_rsbtbl[b].lock); 1743 log_debug(ls, "remove_name toss_time %lu now %lu %s", 1744 r->res_toss_time, jiffies, name); 1745 continue; 1746 } 1747 1748 if (!kref_put(&r->res_ref, kill_rsb)) { 1749 spin_unlock(&ls->ls_rsbtbl[b].lock); 1750 log_error(ls, "remove_name in use %s", name); 1751 continue; 1752 } 1753 1754 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); 1755 1756 /* block lookup of same name until we've sent remove */ 1757 spin_lock(&ls->ls_remove_spin); 1758 ls->ls_remove_len = len; 1759 memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN); 1760 spin_unlock(&ls->ls_remove_spin); 1761 spin_unlock(&ls->ls_rsbtbl[b].lock); 1762 1763 send_remove(r); 1764 1765 /* allow lookup of name again */ 1766 spin_lock(&ls->ls_remove_spin); 1767 ls->ls_remove_len = 0; 1768 memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN); 1769 spin_unlock(&ls->ls_remove_spin); 1770 wake_up(&ls->ls_remove_wait); 1771 1772 dlm_free_rsb(r); 1773 } 1774 } 1775 1776 void dlm_scan_rsbs(struct dlm_ls *ls) 1777 { 1778 int i; 1779 1780 for (i = 0; i < ls->ls_rsbtbl_size; i++) { 1781 shrink_bucket(ls, i); 1782 if (dlm_locking_stopped(ls)) 1783 break; 1784 cond_resched(); 1785 } 1786 } 1787 1788 #ifdef CONFIG_DLM_DEPRECATED_API 1789 static void add_timeout(struct dlm_lkb *lkb) 1790 { 1791 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 1792 1793 if (is_master_copy(lkb)) 1794 return; 1795 1796 if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) && 1797 !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) { 1798 lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN; 1799 goto add_it; 1800 } 1801 if (lkb->lkb_exflags & DLM_LKF_TIMEOUT) 1802 goto add_it; 1803 return; 1804 1805 add_it: 1806 DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb);); 1807 mutex_lock(&ls->ls_timeout_mutex); 1808 hold_lkb(lkb); 1809 list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout); 1810 mutex_unlock(&ls->ls_timeout_mutex); 1811 } 1812 1813 static void del_timeout(struct dlm_lkb *lkb) 1814 { 1815 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 1816 1817 mutex_lock(&ls->ls_timeout_mutex); 1818 if (!list_empty(&lkb->lkb_time_list)) { 1819 list_del_init(&lkb->lkb_time_list); 1820 unhold_lkb(lkb); 1821 } 1822 mutex_unlock(&ls->ls_timeout_mutex); 1823 } 1824 1825 /* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and 1826 lkb_lksb_timeout without lock_rsb? Note: we can't lock timeout_mutex 1827 and then lock rsb because of lock ordering in add_timeout. We may need 1828 to specify some special timeout-related bits in the lkb that are just to 1829 be accessed under the timeout_mutex. */ 1830 1831 void dlm_scan_timeout(struct dlm_ls *ls) 1832 { 1833 struct dlm_rsb *r; 1834 struct dlm_lkb *lkb = NULL, *iter; 1835 int do_cancel, do_warn; 1836 s64 wait_us; 1837 1838 for (;;) { 1839 if (dlm_locking_stopped(ls)) 1840 break; 1841 1842 do_cancel = 0; 1843 do_warn = 0; 1844 mutex_lock(&ls->ls_timeout_mutex); 1845 list_for_each_entry(iter, &ls->ls_timeout, lkb_time_list) { 1846 1847 wait_us = ktime_to_us(ktime_sub(ktime_get(), 1848 iter->lkb_timestamp)); 1849 1850 if ((iter->lkb_exflags & DLM_LKF_TIMEOUT) && 1851 wait_us >= (iter->lkb_timeout_cs * 10000)) 1852 do_cancel = 1; 1853 1854 if ((iter->lkb_flags & DLM_IFL_WATCH_TIMEWARN) && 1855 wait_us >= dlm_config.ci_timewarn_cs * 10000) 1856 do_warn = 1; 1857 1858 if (!do_cancel && !do_warn) 1859 continue; 1860 hold_lkb(iter); 1861 lkb = iter; 1862 break; 1863 } 1864 mutex_unlock(&ls->ls_timeout_mutex); 1865 1866 if (!lkb) 1867 break; 1868 1869 r = lkb->lkb_resource; 1870 hold_rsb(r); 1871 lock_rsb(r); 1872 1873 if (do_warn) { 1874 /* clear flag so we only warn once */ 1875 lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN; 1876 if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT)) 1877 del_timeout(lkb); 1878 dlm_timeout_warn(lkb); 1879 } 1880 1881 if (do_cancel) { 1882 log_debug(ls, "timeout cancel %x node %d %s", 1883 lkb->lkb_id, lkb->lkb_nodeid, r->res_name); 1884 lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN; 1885 lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL; 1886 del_timeout(lkb); 1887 _cancel_lock(r, lkb); 1888 } 1889 1890 unlock_rsb(r); 1891 unhold_rsb(r); 1892 dlm_put_lkb(lkb); 1893 } 1894 } 1895 1896 /* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping 1897 dlm_recoverd before checking/setting ls_recover_begin. */ 1898 1899 void dlm_adjust_timeouts(struct dlm_ls *ls) 1900 { 1901 struct dlm_lkb *lkb; 1902 u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin); 1903 1904 ls->ls_recover_begin = 0; 1905 mutex_lock(&ls->ls_timeout_mutex); 1906 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) 1907 lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us); 1908 mutex_unlock(&ls->ls_timeout_mutex); 1909 } 1910 #else 1911 static void add_timeout(struct dlm_lkb *lkb) { } 1912 static void del_timeout(struct dlm_lkb *lkb) { } 1913 #endif 1914 1915 /* lkb is master or local copy */ 1916 1917 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 1918 { 1919 int b, len = r->res_ls->ls_lvblen; 1920 1921 /* b=1 lvb returned to caller 1922 b=0 lvb written to rsb or invalidated 1923 b=-1 do nothing */ 1924 1925 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1]; 1926 1927 if (b == 1) { 1928 if (!lkb->lkb_lvbptr) 1929 return; 1930 1931 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK)) 1932 return; 1933 1934 if (!r->res_lvbptr) 1935 return; 1936 1937 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len); 1938 lkb->lkb_lvbseq = r->res_lvbseq; 1939 1940 } else if (b == 0) { 1941 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) { 1942 rsb_set_flag(r, RSB_VALNOTVALID); 1943 return; 1944 } 1945 1946 if (!lkb->lkb_lvbptr) 1947 return; 1948 1949 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK)) 1950 return; 1951 1952 if (!r->res_lvbptr) 1953 r->res_lvbptr = dlm_allocate_lvb(r->res_ls); 1954 1955 if (!r->res_lvbptr) 1956 return; 1957 1958 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len); 1959 r->res_lvbseq++; 1960 lkb->lkb_lvbseq = r->res_lvbseq; 1961 rsb_clear_flag(r, RSB_VALNOTVALID); 1962 } 1963 1964 if (rsb_flag(r, RSB_VALNOTVALID)) 1965 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID; 1966 } 1967 1968 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb) 1969 { 1970 if (lkb->lkb_grmode < DLM_LOCK_PW) 1971 return; 1972 1973 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) { 1974 rsb_set_flag(r, RSB_VALNOTVALID); 1975 return; 1976 } 1977 1978 if (!lkb->lkb_lvbptr) 1979 return; 1980 1981 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK)) 1982 return; 1983 1984 if (!r->res_lvbptr) 1985 r->res_lvbptr = dlm_allocate_lvb(r->res_ls); 1986 1987 if (!r->res_lvbptr) 1988 return; 1989 1990 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen); 1991 r->res_lvbseq++; 1992 rsb_clear_flag(r, RSB_VALNOTVALID); 1993 } 1994 1995 /* lkb is process copy (pc) */ 1996 1997 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb, 1998 struct dlm_message *ms) 1999 { 2000 int b; 2001 2002 if (!lkb->lkb_lvbptr) 2003 return; 2004 2005 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK)) 2006 return; 2007 2008 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1]; 2009 if (b == 1) { 2010 int len = receive_extralen(ms); 2011 if (len > r->res_ls->ls_lvblen) 2012 len = r->res_ls->ls_lvblen; 2013 memcpy(lkb->lkb_lvbptr, ms->m_extra, len); 2014 lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq); 2015 } 2016 } 2017 2018 /* Manipulate lkb's on rsb's convert/granted/waiting queues 2019 remove_lock -- used for unlock, removes lkb from granted 2020 revert_lock -- used for cancel, moves lkb from convert to granted 2021 grant_lock -- used for request and convert, adds lkb to granted or 2022 moves lkb from convert or waiting to granted 2023 2024 Each of these is used for master or local copy lkb's. There is 2025 also a _pc() variation used to make the corresponding change on 2026 a process copy (pc) lkb. */ 2027 2028 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 2029 { 2030 del_lkb(r, lkb); 2031 lkb->lkb_grmode = DLM_LOCK_IV; 2032 /* this unhold undoes the original ref from create_lkb() 2033 so this leads to the lkb being freed */ 2034 unhold_lkb(lkb); 2035 } 2036 2037 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 2038 { 2039 set_lvb_unlock(r, lkb); 2040 _remove_lock(r, lkb); 2041 } 2042 2043 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb) 2044 { 2045 _remove_lock(r, lkb); 2046 } 2047 2048 /* returns: 0 did nothing 2049 1 moved lock to granted 2050 -1 removed lock */ 2051 2052 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 2053 { 2054 int rv = 0; 2055 2056 lkb->lkb_rqmode = DLM_LOCK_IV; 2057 2058 switch (lkb->lkb_status) { 2059 case DLM_LKSTS_GRANTED: 2060 break; 2061 case DLM_LKSTS_CONVERT: 2062 move_lkb(r, lkb, DLM_LKSTS_GRANTED); 2063 rv = 1; 2064 break; 2065 case DLM_LKSTS_WAITING: 2066 del_lkb(r, lkb); 2067 lkb->lkb_grmode = DLM_LOCK_IV; 2068 /* this unhold undoes the original ref from create_lkb() 2069 so this leads to the lkb being freed */ 2070 unhold_lkb(lkb); 2071 rv = -1; 2072 break; 2073 default: 2074 log_print("invalid status for revert %d", lkb->lkb_status); 2075 } 2076 return rv; 2077 } 2078 2079 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb) 2080 { 2081 return revert_lock(r, lkb); 2082 } 2083 2084 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 2085 { 2086 if (lkb->lkb_grmode != lkb->lkb_rqmode) { 2087 lkb->lkb_grmode = lkb->lkb_rqmode; 2088 if (lkb->lkb_status) 2089 move_lkb(r, lkb, DLM_LKSTS_GRANTED); 2090 else 2091 add_lkb(r, lkb, DLM_LKSTS_GRANTED); 2092 } 2093 2094 lkb->lkb_rqmode = DLM_LOCK_IV; 2095 lkb->lkb_highbast = 0; 2096 } 2097 2098 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 2099 { 2100 set_lvb_lock(r, lkb); 2101 _grant_lock(r, lkb); 2102 } 2103 2104 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb, 2105 struct dlm_message *ms) 2106 { 2107 set_lvb_lock_pc(r, lkb, ms); 2108 _grant_lock(r, lkb); 2109 } 2110 2111 /* called by grant_pending_locks() which means an async grant message must 2112 be sent to the requesting node in addition to granting the lock if the 2113 lkb belongs to a remote node. */ 2114 2115 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb) 2116 { 2117 grant_lock(r, lkb); 2118 if (is_master_copy(lkb)) 2119 send_grant(r, lkb); 2120 else 2121 queue_cast(r, lkb, 0); 2122 } 2123 2124 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to 2125 change the granted/requested modes. We're munging things accordingly in 2126 the process copy. 2127 CONVDEADLK: our grmode may have been forced down to NL to resolve a 2128 conversion deadlock 2129 ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become 2130 compatible with other granted locks */ 2131 2132 static void munge_demoted(struct dlm_lkb *lkb) 2133 { 2134 if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) { 2135 log_print("munge_demoted %x invalid modes gr %d rq %d", 2136 lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode); 2137 return; 2138 } 2139 2140 lkb->lkb_grmode = DLM_LOCK_NL; 2141 } 2142 2143 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms) 2144 { 2145 if (ms->m_type != cpu_to_le32(DLM_MSG_REQUEST_REPLY) && 2146 ms->m_type != cpu_to_le32(DLM_MSG_GRANT)) { 2147 log_print("munge_altmode %x invalid reply type %d", 2148 lkb->lkb_id, le32_to_cpu(ms->m_type)); 2149 return; 2150 } 2151 2152 if (lkb->lkb_exflags & DLM_LKF_ALTPR) 2153 lkb->lkb_rqmode = DLM_LOCK_PR; 2154 else if (lkb->lkb_exflags & DLM_LKF_ALTCW) 2155 lkb->lkb_rqmode = DLM_LOCK_CW; 2156 else { 2157 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags); 2158 dlm_print_lkb(lkb); 2159 } 2160 } 2161 2162 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head) 2163 { 2164 struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb, 2165 lkb_statequeue); 2166 if (lkb->lkb_id == first->lkb_id) 2167 return 1; 2168 2169 return 0; 2170 } 2171 2172 /* Check if the given lkb conflicts with another lkb on the queue. */ 2173 2174 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb) 2175 { 2176 struct dlm_lkb *this; 2177 2178 list_for_each_entry(this, head, lkb_statequeue) { 2179 if (this == lkb) 2180 continue; 2181 if (!modes_compat(this, lkb)) 2182 return 1; 2183 } 2184 return 0; 2185 } 2186 2187 /* 2188 * "A conversion deadlock arises with a pair of lock requests in the converting 2189 * queue for one resource. The granted mode of each lock blocks the requested 2190 * mode of the other lock." 2191 * 2192 * Part 2: if the granted mode of lkb is preventing an earlier lkb in the 2193 * convert queue from being granted, then deadlk/demote lkb. 2194 * 2195 * Example: 2196 * Granted Queue: empty 2197 * Convert Queue: NL->EX (first lock) 2198 * PR->EX (second lock) 2199 * 2200 * The first lock can't be granted because of the granted mode of the second 2201 * lock and the second lock can't be granted because it's not first in the 2202 * list. We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we 2203 * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK 2204 * flag set and return DEMOTED in the lksb flags. 2205 * 2206 * Originally, this function detected conv-deadlk in a more limited scope: 2207 * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or 2208 * - if lkb1 was the first entry in the queue (not just earlier), and was 2209 * blocked by the granted mode of lkb2, and there was nothing on the 2210 * granted queue preventing lkb1 from being granted immediately, i.e. 2211 * lkb2 was the only thing preventing lkb1 from being granted. 2212 * 2213 * That second condition meant we'd only say there was conv-deadlk if 2214 * resolving it (by demotion) would lead to the first lock on the convert 2215 * queue being granted right away. It allowed conversion deadlocks to exist 2216 * between locks on the convert queue while they couldn't be granted anyway. 2217 * 2218 * Now, we detect and take action on conversion deadlocks immediately when 2219 * they're created, even if they may not be immediately consequential. If 2220 * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted 2221 * mode that would prevent lkb1's conversion from being granted, we do a 2222 * deadlk/demote on lkb2 right away and don't let it onto the convert queue. 2223 * I think this means that the lkb_is_ahead condition below should always 2224 * be zero, i.e. there will never be conv-deadlk between two locks that are 2225 * both already on the convert queue. 2226 */ 2227 2228 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2) 2229 { 2230 struct dlm_lkb *lkb1; 2231 int lkb_is_ahead = 0; 2232 2233 list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) { 2234 if (lkb1 == lkb2) { 2235 lkb_is_ahead = 1; 2236 continue; 2237 } 2238 2239 if (!lkb_is_ahead) { 2240 if (!modes_compat(lkb2, lkb1)) 2241 return 1; 2242 } else { 2243 if (!modes_compat(lkb2, lkb1) && 2244 !modes_compat(lkb1, lkb2)) 2245 return 1; 2246 } 2247 } 2248 return 0; 2249 } 2250 2251 /* 2252 * Return 1 if the lock can be granted, 0 otherwise. 2253 * Also detect and resolve conversion deadlocks. 2254 * 2255 * lkb is the lock to be granted 2256 * 2257 * now is 1 if the function is being called in the context of the 2258 * immediate request, it is 0 if called later, after the lock has been 2259 * queued. 2260 * 2261 * recover is 1 if dlm_recover_grant() is trying to grant conversions 2262 * after recovery. 2263 * 2264 * References are from chapter 6 of "VAXcluster Principles" by Roy Davis 2265 */ 2266 2267 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now, 2268 int recover) 2269 { 2270 int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV); 2271 2272 /* 2273 * 6-10: Version 5.4 introduced an option to address the phenomenon of 2274 * a new request for a NL mode lock being blocked. 2275 * 2276 * 6-11: If the optional EXPEDITE flag is used with the new NL mode 2277 * request, then it would be granted. In essence, the use of this flag 2278 * tells the Lock Manager to expedite theis request by not considering 2279 * what may be in the CONVERTING or WAITING queues... As of this 2280 * writing, the EXPEDITE flag can be used only with new requests for NL 2281 * mode locks. This flag is not valid for conversion requests. 2282 * 2283 * A shortcut. Earlier checks return an error if EXPEDITE is used in a 2284 * conversion or used with a non-NL requested mode. We also know an 2285 * EXPEDITE request is always granted immediately, so now must always 2286 * be 1. The full condition to grant an expedite request: (now && 2287 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can 2288 * therefore be shortened to just checking the flag. 2289 */ 2290 2291 if (lkb->lkb_exflags & DLM_LKF_EXPEDITE) 2292 return 1; 2293 2294 /* 2295 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be 2296 * added to the remaining conditions. 2297 */ 2298 2299 if (queue_conflict(&r->res_grantqueue, lkb)) 2300 return 0; 2301 2302 /* 2303 * 6-3: By default, a conversion request is immediately granted if the 2304 * requested mode is compatible with the modes of all other granted 2305 * locks 2306 */ 2307 2308 if (queue_conflict(&r->res_convertqueue, lkb)) 2309 return 0; 2310 2311 /* 2312 * The RECOVER_GRANT flag means dlm_recover_grant() is granting 2313 * locks for a recovered rsb, on which lkb's have been rebuilt. 2314 * The lkb's may have been rebuilt on the queues in a different 2315 * order than they were in on the previous master. So, granting 2316 * queued conversions in order after recovery doesn't make sense 2317 * since the order hasn't been preserved anyway. The new order 2318 * could also have created a new "in place" conversion deadlock. 2319 * (e.g. old, failed master held granted EX, with PR->EX, NL->EX. 2320 * After recovery, there would be no granted locks, and possibly 2321 * NL->EX, PR->EX, an in-place conversion deadlock.) So, after 2322 * recovery, grant conversions without considering order. 2323 */ 2324 2325 if (conv && recover) 2326 return 1; 2327 2328 /* 2329 * 6-5: But the default algorithm for deciding whether to grant or 2330 * queue conversion requests does not by itself guarantee that such 2331 * requests are serviced on a "first come first serve" basis. This, in 2332 * turn, can lead to a phenomenon known as "indefinate postponement". 2333 * 2334 * 6-7: This issue is dealt with by using the optional QUECVT flag with 2335 * the system service employed to request a lock conversion. This flag 2336 * forces certain conversion requests to be queued, even if they are 2337 * compatible with the granted modes of other locks on the same 2338 * resource. Thus, the use of this flag results in conversion requests 2339 * being ordered on a "first come first servce" basis. 2340 * 2341 * DCT: This condition is all about new conversions being able to occur 2342 * "in place" while the lock remains on the granted queue (assuming 2343 * nothing else conflicts.) IOW if QUECVT isn't set, a conversion 2344 * doesn't _have_ to go onto the convert queue where it's processed in 2345 * order. The "now" variable is necessary to distinguish converts 2346 * being received and processed for the first time now, because once a 2347 * convert is moved to the conversion queue the condition below applies 2348 * requiring fifo granting. 2349 */ 2350 2351 if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT)) 2352 return 1; 2353 2354 /* 2355 * Even if the convert is compat with all granted locks, 2356 * QUECVT forces it behind other locks on the convert queue. 2357 */ 2358 2359 if (now && conv && (lkb->lkb_exflags & DLM_LKF_QUECVT)) { 2360 if (list_empty(&r->res_convertqueue)) 2361 return 1; 2362 else 2363 return 0; 2364 } 2365 2366 /* 2367 * The NOORDER flag is set to avoid the standard vms rules on grant 2368 * order. 2369 */ 2370 2371 if (lkb->lkb_exflags & DLM_LKF_NOORDER) 2372 return 1; 2373 2374 /* 2375 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be 2376 * granted until all other conversion requests ahead of it are granted 2377 * and/or canceled. 2378 */ 2379 2380 if (!now && conv && first_in_list(lkb, &r->res_convertqueue)) 2381 return 1; 2382 2383 /* 2384 * 6-4: By default, a new request is immediately granted only if all 2385 * three of the following conditions are satisfied when the request is 2386 * issued: 2387 * - The queue of ungranted conversion requests for the resource is 2388 * empty. 2389 * - The queue of ungranted new requests for the resource is empty. 2390 * - The mode of the new request is compatible with the most 2391 * restrictive mode of all granted locks on the resource. 2392 */ 2393 2394 if (now && !conv && list_empty(&r->res_convertqueue) && 2395 list_empty(&r->res_waitqueue)) 2396 return 1; 2397 2398 /* 2399 * 6-4: Once a lock request is in the queue of ungranted new requests, 2400 * it cannot be granted until the queue of ungranted conversion 2401 * requests is empty, all ungranted new requests ahead of it are 2402 * granted and/or canceled, and it is compatible with the granted mode 2403 * of the most restrictive lock granted on the resource. 2404 */ 2405 2406 if (!now && !conv && list_empty(&r->res_convertqueue) && 2407 first_in_list(lkb, &r->res_waitqueue)) 2408 return 1; 2409 2410 return 0; 2411 } 2412 2413 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now, 2414 int recover, int *err) 2415 { 2416 int rv; 2417 int8_t alt = 0, rqmode = lkb->lkb_rqmode; 2418 int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV); 2419 2420 if (err) 2421 *err = 0; 2422 2423 rv = _can_be_granted(r, lkb, now, recover); 2424 if (rv) 2425 goto out; 2426 2427 /* 2428 * The CONVDEADLK flag is non-standard and tells the dlm to resolve 2429 * conversion deadlocks by demoting grmode to NL, otherwise the dlm 2430 * cancels one of the locks. 2431 */ 2432 2433 if (is_convert && can_be_queued(lkb) && 2434 conversion_deadlock_detect(r, lkb)) { 2435 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) { 2436 lkb->lkb_grmode = DLM_LOCK_NL; 2437 lkb->lkb_sbflags |= DLM_SBF_DEMOTED; 2438 } else if (err) { 2439 *err = -EDEADLK; 2440 } else { 2441 log_print("can_be_granted deadlock %x now %d", 2442 lkb->lkb_id, now); 2443 dlm_dump_rsb(r); 2444 } 2445 goto out; 2446 } 2447 2448 /* 2449 * The ALTPR and ALTCW flags are non-standard and tell the dlm to try 2450 * to grant a request in a mode other than the normal rqmode. It's a 2451 * simple way to provide a big optimization to applications that can 2452 * use them. 2453 */ 2454 2455 if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR)) 2456 alt = DLM_LOCK_PR; 2457 else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW)) 2458 alt = DLM_LOCK_CW; 2459 2460 if (alt) { 2461 lkb->lkb_rqmode = alt; 2462 rv = _can_be_granted(r, lkb, now, 0); 2463 if (rv) 2464 lkb->lkb_sbflags |= DLM_SBF_ALTMODE; 2465 else 2466 lkb->lkb_rqmode = rqmode; 2467 } 2468 out: 2469 return rv; 2470 } 2471 2472 /* Returns the highest requested mode of all blocked conversions; sets 2473 cw if there's a blocked conversion to DLM_LOCK_CW. */ 2474 2475 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw, 2476 unsigned int *count) 2477 { 2478 struct dlm_lkb *lkb, *s; 2479 int recover = rsb_flag(r, RSB_RECOVER_GRANT); 2480 int hi, demoted, quit, grant_restart, demote_restart; 2481 int deadlk; 2482 2483 quit = 0; 2484 restart: 2485 grant_restart = 0; 2486 demote_restart = 0; 2487 hi = DLM_LOCK_IV; 2488 2489 list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) { 2490 demoted = is_demoted(lkb); 2491 deadlk = 0; 2492 2493 if (can_be_granted(r, lkb, 0, recover, &deadlk)) { 2494 grant_lock_pending(r, lkb); 2495 grant_restart = 1; 2496 if (count) 2497 (*count)++; 2498 continue; 2499 } 2500 2501 if (!demoted && is_demoted(lkb)) { 2502 log_print("WARN: pending demoted %x node %d %s", 2503 lkb->lkb_id, lkb->lkb_nodeid, r->res_name); 2504 demote_restart = 1; 2505 continue; 2506 } 2507 2508 if (deadlk) { 2509 /* 2510 * If DLM_LKB_NODLKWT flag is set and conversion 2511 * deadlock is detected, we request blocking AST and 2512 * down (or cancel) conversion. 2513 */ 2514 if (lkb->lkb_exflags & DLM_LKF_NODLCKWT) { 2515 if (lkb->lkb_highbast < lkb->lkb_rqmode) { 2516 queue_bast(r, lkb, lkb->lkb_rqmode); 2517 lkb->lkb_highbast = lkb->lkb_rqmode; 2518 } 2519 } else { 2520 log_print("WARN: pending deadlock %x node %d %s", 2521 lkb->lkb_id, lkb->lkb_nodeid, 2522 r->res_name); 2523 dlm_dump_rsb(r); 2524 } 2525 continue; 2526 } 2527 2528 hi = max_t(int, lkb->lkb_rqmode, hi); 2529 2530 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW) 2531 *cw = 1; 2532 } 2533 2534 if (grant_restart) 2535 goto restart; 2536 if (demote_restart && !quit) { 2537 quit = 1; 2538 goto restart; 2539 } 2540 2541 return max_t(int, high, hi); 2542 } 2543 2544 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw, 2545 unsigned int *count) 2546 { 2547 struct dlm_lkb *lkb, *s; 2548 2549 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) { 2550 if (can_be_granted(r, lkb, 0, 0, NULL)) { 2551 grant_lock_pending(r, lkb); 2552 if (count) 2553 (*count)++; 2554 } else { 2555 high = max_t(int, lkb->lkb_rqmode, high); 2556 if (lkb->lkb_rqmode == DLM_LOCK_CW) 2557 *cw = 1; 2558 } 2559 } 2560 2561 return high; 2562 } 2563 2564 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked 2565 on either the convert or waiting queue. 2566 high is the largest rqmode of all locks blocked on the convert or 2567 waiting queue. */ 2568 2569 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw) 2570 { 2571 if (gr->lkb_grmode == DLM_LOCK_PR && cw) { 2572 if (gr->lkb_highbast < DLM_LOCK_EX) 2573 return 1; 2574 return 0; 2575 } 2576 2577 if (gr->lkb_highbast < high && 2578 !__dlm_compat_matrix[gr->lkb_grmode+1][high+1]) 2579 return 1; 2580 return 0; 2581 } 2582 2583 static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count) 2584 { 2585 struct dlm_lkb *lkb, *s; 2586 int high = DLM_LOCK_IV; 2587 int cw = 0; 2588 2589 if (!is_master(r)) { 2590 log_print("grant_pending_locks r nodeid %d", r->res_nodeid); 2591 dlm_dump_rsb(r); 2592 return; 2593 } 2594 2595 high = grant_pending_convert(r, high, &cw, count); 2596 high = grant_pending_wait(r, high, &cw, count); 2597 2598 if (high == DLM_LOCK_IV) 2599 return; 2600 2601 /* 2602 * If there are locks left on the wait/convert queue then send blocking 2603 * ASTs to granted locks based on the largest requested mode (high) 2604 * found above. 2605 */ 2606 2607 list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) { 2608 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) { 2609 if (cw && high == DLM_LOCK_PR && 2610 lkb->lkb_grmode == DLM_LOCK_PR) 2611 queue_bast(r, lkb, DLM_LOCK_CW); 2612 else 2613 queue_bast(r, lkb, high); 2614 lkb->lkb_highbast = high; 2615 } 2616 } 2617 } 2618 2619 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq) 2620 { 2621 if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) || 2622 (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) { 2623 if (gr->lkb_highbast < DLM_LOCK_EX) 2624 return 1; 2625 return 0; 2626 } 2627 2628 if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq)) 2629 return 1; 2630 return 0; 2631 } 2632 2633 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head, 2634 struct dlm_lkb *lkb) 2635 { 2636 struct dlm_lkb *gr; 2637 2638 list_for_each_entry(gr, head, lkb_statequeue) { 2639 /* skip self when sending basts to convertqueue */ 2640 if (gr == lkb) 2641 continue; 2642 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) { 2643 queue_bast(r, gr, lkb->lkb_rqmode); 2644 gr->lkb_highbast = lkb->lkb_rqmode; 2645 } 2646 } 2647 } 2648 2649 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb) 2650 { 2651 send_bast_queue(r, &r->res_grantqueue, lkb); 2652 } 2653 2654 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb) 2655 { 2656 send_bast_queue(r, &r->res_grantqueue, lkb); 2657 send_bast_queue(r, &r->res_convertqueue, lkb); 2658 } 2659 2660 /* set_master(r, lkb) -- set the master nodeid of a resource 2661 2662 The purpose of this function is to set the nodeid field in the given 2663 lkb using the nodeid field in the given rsb. If the rsb's nodeid is 2664 known, it can just be copied to the lkb and the function will return 2665 0. If the rsb's nodeid is _not_ known, it needs to be looked up 2666 before it can be copied to the lkb. 2667 2668 When the rsb nodeid is being looked up remotely, the initial lkb 2669 causing the lookup is kept on the ls_waiters list waiting for the 2670 lookup reply. Other lkb's waiting for the same rsb lookup are kept 2671 on the rsb's res_lookup list until the master is verified. 2672 2673 Return values: 2674 0: nodeid is set in rsb/lkb and the caller should go ahead and use it 2675 1: the rsb master is not available and the lkb has been placed on 2676 a wait queue 2677 */ 2678 2679 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb) 2680 { 2681 int our_nodeid = dlm_our_nodeid(); 2682 2683 if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) { 2684 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN); 2685 r->res_first_lkid = lkb->lkb_id; 2686 lkb->lkb_nodeid = r->res_nodeid; 2687 return 0; 2688 } 2689 2690 if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) { 2691 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup); 2692 return 1; 2693 } 2694 2695 if (r->res_master_nodeid == our_nodeid) { 2696 lkb->lkb_nodeid = 0; 2697 return 0; 2698 } 2699 2700 if (r->res_master_nodeid) { 2701 lkb->lkb_nodeid = r->res_master_nodeid; 2702 return 0; 2703 } 2704 2705 if (dlm_dir_nodeid(r) == our_nodeid) { 2706 /* This is a somewhat unusual case; find_rsb will usually 2707 have set res_master_nodeid when dir nodeid is local, but 2708 there are cases where we become the dir node after we've 2709 past find_rsb and go through _request_lock again. 2710 confirm_master() or process_lookup_list() needs to be 2711 called after this. */ 2712 log_debug(r->res_ls, "set_master %x self master %d dir %d %s", 2713 lkb->lkb_id, r->res_master_nodeid, r->res_dir_nodeid, 2714 r->res_name); 2715 r->res_master_nodeid = our_nodeid; 2716 r->res_nodeid = 0; 2717 lkb->lkb_nodeid = 0; 2718 return 0; 2719 } 2720 2721 wait_pending_remove(r); 2722 2723 r->res_first_lkid = lkb->lkb_id; 2724 send_lookup(r, lkb); 2725 return 1; 2726 } 2727 2728 static void process_lookup_list(struct dlm_rsb *r) 2729 { 2730 struct dlm_lkb *lkb, *safe; 2731 2732 list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) { 2733 list_del_init(&lkb->lkb_rsb_lookup); 2734 _request_lock(r, lkb); 2735 schedule(); 2736 } 2737 } 2738 2739 /* confirm_master -- confirm (or deny) an rsb's master nodeid */ 2740 2741 static void confirm_master(struct dlm_rsb *r, int error) 2742 { 2743 struct dlm_lkb *lkb; 2744 2745 if (!r->res_first_lkid) 2746 return; 2747 2748 switch (error) { 2749 case 0: 2750 case -EINPROGRESS: 2751 r->res_first_lkid = 0; 2752 process_lookup_list(r); 2753 break; 2754 2755 case -EAGAIN: 2756 case -EBADR: 2757 case -ENOTBLK: 2758 /* the remote request failed and won't be retried (it was 2759 a NOQUEUE, or has been canceled/unlocked); make a waiting 2760 lkb the first_lkid */ 2761 2762 r->res_first_lkid = 0; 2763 2764 if (!list_empty(&r->res_lookup)) { 2765 lkb = list_entry(r->res_lookup.next, struct dlm_lkb, 2766 lkb_rsb_lookup); 2767 list_del_init(&lkb->lkb_rsb_lookup); 2768 r->res_first_lkid = lkb->lkb_id; 2769 _request_lock(r, lkb); 2770 } 2771 break; 2772 2773 default: 2774 log_error(r->res_ls, "confirm_master unknown error %d", error); 2775 } 2776 } 2777 2778 #ifdef CONFIG_DLM_DEPRECATED_API 2779 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags, 2780 int namelen, unsigned long timeout_cs, 2781 void (*ast) (void *astparam), 2782 void *astparam, 2783 void (*bast) (void *astparam, int mode), 2784 struct dlm_args *args) 2785 #else 2786 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags, 2787 int namelen, void (*ast)(void *astparam), 2788 void *astparam, 2789 void (*bast)(void *astparam, int mode), 2790 struct dlm_args *args) 2791 #endif 2792 { 2793 int rv = -EINVAL; 2794 2795 /* check for invalid arg usage */ 2796 2797 if (mode < 0 || mode > DLM_LOCK_EX) 2798 goto out; 2799 2800 if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN)) 2801 goto out; 2802 2803 if (flags & DLM_LKF_CANCEL) 2804 goto out; 2805 2806 if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT)) 2807 goto out; 2808 2809 if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT)) 2810 goto out; 2811 2812 if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE) 2813 goto out; 2814 2815 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT) 2816 goto out; 2817 2818 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT) 2819 goto out; 2820 2821 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE) 2822 goto out; 2823 2824 if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL) 2825 goto out; 2826 2827 if (!ast || !lksb) 2828 goto out; 2829 2830 if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr) 2831 goto out; 2832 2833 if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid) 2834 goto out; 2835 2836 /* these args will be copied to the lkb in validate_lock_args, 2837 it cannot be done now because when converting locks, fields in 2838 an active lkb cannot be modified before locking the rsb */ 2839 2840 args->flags = flags; 2841 args->astfn = ast; 2842 args->astparam = astparam; 2843 args->bastfn = bast; 2844 #ifdef CONFIG_DLM_DEPRECATED_API 2845 args->timeout = timeout_cs; 2846 #endif 2847 args->mode = mode; 2848 args->lksb = lksb; 2849 rv = 0; 2850 out: 2851 return rv; 2852 } 2853 2854 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args) 2855 { 2856 if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK | 2857 DLM_LKF_FORCEUNLOCK)) 2858 return -EINVAL; 2859 2860 if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK) 2861 return -EINVAL; 2862 2863 args->flags = flags; 2864 args->astparam = astarg; 2865 return 0; 2866 } 2867 2868 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, 2869 struct dlm_args *args) 2870 { 2871 int rv = -EBUSY; 2872 2873 if (args->flags & DLM_LKF_CONVERT) { 2874 if (lkb->lkb_status != DLM_LKSTS_GRANTED) 2875 goto out; 2876 2877 /* lock not allowed if there's any op in progress */ 2878 if (lkb->lkb_wait_type || lkb->lkb_wait_count) 2879 goto out; 2880 2881 if (is_overlap(lkb)) 2882 goto out; 2883 2884 rv = -EINVAL; 2885 if (lkb->lkb_flags & DLM_IFL_MSTCPY) 2886 goto out; 2887 2888 if (args->flags & DLM_LKF_QUECVT && 2889 !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1]) 2890 goto out; 2891 } 2892 2893 lkb->lkb_exflags = args->flags; 2894 lkb->lkb_sbflags = 0; 2895 lkb->lkb_astfn = args->astfn; 2896 lkb->lkb_astparam = args->astparam; 2897 lkb->lkb_bastfn = args->bastfn; 2898 lkb->lkb_rqmode = args->mode; 2899 lkb->lkb_lksb = args->lksb; 2900 lkb->lkb_lvbptr = args->lksb->sb_lvbptr; 2901 lkb->lkb_ownpid = (int) current->pid; 2902 #ifdef CONFIG_DLM_DEPRECATED_API 2903 lkb->lkb_timeout_cs = args->timeout; 2904 #endif 2905 rv = 0; 2906 out: 2907 switch (rv) { 2908 case 0: 2909 break; 2910 case -EINVAL: 2911 /* annoy the user because dlm usage is wrong */ 2912 WARN_ON(1); 2913 log_error(ls, "%s %d %x %x %x %d %d %s", __func__, 2914 rv, lkb->lkb_id, lkb->lkb_flags, args->flags, 2915 lkb->lkb_status, lkb->lkb_wait_type, 2916 lkb->lkb_resource->res_name); 2917 break; 2918 default: 2919 log_debug(ls, "%s %d %x %x %x %d %d %s", __func__, 2920 rv, lkb->lkb_id, lkb->lkb_flags, args->flags, 2921 lkb->lkb_status, lkb->lkb_wait_type, 2922 lkb->lkb_resource->res_name); 2923 break; 2924 } 2925 2926 return rv; 2927 } 2928 2929 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0 2930 for success */ 2931 2932 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here 2933 because there may be a lookup in progress and it's valid to do 2934 cancel/unlockf on it */ 2935 2936 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args) 2937 { 2938 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 2939 int rv = -EBUSY; 2940 2941 /* normal unlock not allowed if there's any op in progress */ 2942 if (!(args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) && 2943 (lkb->lkb_wait_type || lkb->lkb_wait_count)) 2944 goto out; 2945 2946 /* an lkb may be waiting for an rsb lookup to complete where the 2947 lookup was initiated by another lock */ 2948 2949 if (!list_empty(&lkb->lkb_rsb_lookup)) { 2950 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) { 2951 log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id); 2952 list_del_init(&lkb->lkb_rsb_lookup); 2953 queue_cast(lkb->lkb_resource, lkb, 2954 args->flags & DLM_LKF_CANCEL ? 2955 -DLM_ECANCEL : -DLM_EUNLOCK); 2956 unhold_lkb(lkb); /* undoes create_lkb() */ 2957 } 2958 /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */ 2959 goto out; 2960 } 2961 2962 rv = -EINVAL; 2963 if (lkb->lkb_flags & DLM_IFL_MSTCPY) { 2964 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id); 2965 dlm_print_lkb(lkb); 2966 goto out; 2967 } 2968 2969 /* an lkb may still exist even though the lock is EOL'ed due to a 2970 * cancel, unlock or failed noqueue request; an app can't use these 2971 * locks; return same error as if the lkid had not been found at all 2972 */ 2973 2974 if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) { 2975 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id); 2976 rv = -ENOENT; 2977 goto out; 2978 } 2979 2980 /* cancel not allowed with another cancel/unlock in progress */ 2981 2982 if (args->flags & DLM_LKF_CANCEL) { 2983 if (lkb->lkb_exflags & DLM_LKF_CANCEL) 2984 goto out; 2985 2986 if (is_overlap(lkb)) 2987 goto out; 2988 2989 /* don't let scand try to do a cancel */ 2990 del_timeout(lkb); 2991 2992 if (lkb->lkb_flags & DLM_IFL_RESEND) { 2993 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL; 2994 rv = -EBUSY; 2995 goto out; 2996 } 2997 2998 /* there's nothing to cancel */ 2999 if (lkb->lkb_status == DLM_LKSTS_GRANTED && 3000 !lkb->lkb_wait_type) { 3001 rv = -EBUSY; 3002 goto out; 3003 } 3004 3005 switch (lkb->lkb_wait_type) { 3006 case DLM_MSG_LOOKUP: 3007 case DLM_MSG_REQUEST: 3008 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL; 3009 rv = -EBUSY; 3010 goto out; 3011 case DLM_MSG_UNLOCK: 3012 case DLM_MSG_CANCEL: 3013 goto out; 3014 } 3015 /* add_to_waiters() will set OVERLAP_CANCEL */ 3016 goto out_ok; 3017 } 3018 3019 /* do we need to allow a force-unlock if there's a normal unlock 3020 already in progress? in what conditions could the normal unlock 3021 fail such that we'd want to send a force-unlock to be sure? */ 3022 3023 if (args->flags & DLM_LKF_FORCEUNLOCK) { 3024 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK) 3025 goto out; 3026 3027 if (is_overlap_unlock(lkb)) 3028 goto out; 3029 3030 /* don't let scand try to do a cancel */ 3031 del_timeout(lkb); 3032 3033 if (lkb->lkb_flags & DLM_IFL_RESEND) { 3034 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK; 3035 rv = -EBUSY; 3036 goto out; 3037 } 3038 3039 switch (lkb->lkb_wait_type) { 3040 case DLM_MSG_LOOKUP: 3041 case DLM_MSG_REQUEST: 3042 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK; 3043 rv = -EBUSY; 3044 goto out; 3045 case DLM_MSG_UNLOCK: 3046 goto out; 3047 } 3048 /* add_to_waiters() will set OVERLAP_UNLOCK */ 3049 } 3050 3051 out_ok: 3052 /* an overlapping op shouldn't blow away exflags from other op */ 3053 lkb->lkb_exflags |= args->flags; 3054 lkb->lkb_sbflags = 0; 3055 lkb->lkb_astparam = args->astparam; 3056 rv = 0; 3057 out: 3058 switch (rv) { 3059 case 0: 3060 break; 3061 case -EINVAL: 3062 /* annoy the user because dlm usage is wrong */ 3063 WARN_ON(1); 3064 log_error(ls, "%s %d %x %x %x %x %d %s", __func__, rv, 3065 lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags, 3066 args->flags, lkb->lkb_wait_type, 3067 lkb->lkb_resource->res_name); 3068 break; 3069 default: 3070 log_debug(ls, "%s %d %x %x %x %x %d %s", __func__, rv, 3071 lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags, 3072 args->flags, lkb->lkb_wait_type, 3073 lkb->lkb_resource->res_name); 3074 break; 3075 } 3076 3077 return rv; 3078 } 3079 3080 /* 3081 * Four stage 4 varieties: 3082 * do_request(), do_convert(), do_unlock(), do_cancel() 3083 * These are called on the master node for the given lock and 3084 * from the central locking logic. 3085 */ 3086 3087 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb) 3088 { 3089 int error = 0; 3090 3091 if (can_be_granted(r, lkb, 1, 0, NULL)) { 3092 grant_lock(r, lkb); 3093 queue_cast(r, lkb, 0); 3094 goto out; 3095 } 3096 3097 if (can_be_queued(lkb)) { 3098 error = -EINPROGRESS; 3099 add_lkb(r, lkb, DLM_LKSTS_WAITING); 3100 add_timeout(lkb); 3101 goto out; 3102 } 3103 3104 error = -EAGAIN; 3105 queue_cast(r, lkb, -EAGAIN); 3106 out: 3107 return error; 3108 } 3109 3110 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb, 3111 int error) 3112 { 3113 switch (error) { 3114 case -EAGAIN: 3115 if (force_blocking_asts(lkb)) 3116 send_blocking_asts_all(r, lkb); 3117 break; 3118 case -EINPROGRESS: 3119 send_blocking_asts(r, lkb); 3120 break; 3121 } 3122 } 3123 3124 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb) 3125 { 3126 int error = 0; 3127 int deadlk = 0; 3128 3129 /* changing an existing lock may allow others to be granted */ 3130 3131 if (can_be_granted(r, lkb, 1, 0, &deadlk)) { 3132 grant_lock(r, lkb); 3133 queue_cast(r, lkb, 0); 3134 goto out; 3135 } 3136 3137 /* can_be_granted() detected that this lock would block in a conversion 3138 deadlock, so we leave it on the granted queue and return EDEADLK in 3139 the ast for the convert. */ 3140 3141 if (deadlk && !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) { 3142 /* it's left on the granted queue */ 3143 revert_lock(r, lkb); 3144 queue_cast(r, lkb, -EDEADLK); 3145 error = -EDEADLK; 3146 goto out; 3147 } 3148 3149 /* is_demoted() means the can_be_granted() above set the grmode 3150 to NL, and left us on the granted queue. This auto-demotion 3151 (due to CONVDEADLK) might mean other locks, and/or this lock, are 3152 now grantable. We have to try to grant other converting locks 3153 before we try again to grant this one. */ 3154 3155 if (is_demoted(lkb)) { 3156 grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL); 3157 if (_can_be_granted(r, lkb, 1, 0)) { 3158 grant_lock(r, lkb); 3159 queue_cast(r, lkb, 0); 3160 goto out; 3161 } 3162 /* else fall through and move to convert queue */ 3163 } 3164 3165 if (can_be_queued(lkb)) { 3166 error = -EINPROGRESS; 3167 del_lkb(r, lkb); 3168 add_lkb(r, lkb, DLM_LKSTS_CONVERT); 3169 add_timeout(lkb); 3170 goto out; 3171 } 3172 3173 error = -EAGAIN; 3174 queue_cast(r, lkb, -EAGAIN); 3175 out: 3176 return error; 3177 } 3178 3179 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb, 3180 int error) 3181 { 3182 switch (error) { 3183 case 0: 3184 grant_pending_locks(r, NULL); 3185 /* grant_pending_locks also sends basts */ 3186 break; 3187 case -EAGAIN: 3188 if (force_blocking_asts(lkb)) 3189 send_blocking_asts_all(r, lkb); 3190 break; 3191 case -EINPROGRESS: 3192 send_blocking_asts(r, lkb); 3193 break; 3194 } 3195 } 3196 3197 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb) 3198 { 3199 remove_lock(r, lkb); 3200 queue_cast(r, lkb, -DLM_EUNLOCK); 3201 return -DLM_EUNLOCK; 3202 } 3203 3204 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb, 3205 int error) 3206 { 3207 grant_pending_locks(r, NULL); 3208 } 3209 3210 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */ 3211 3212 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb) 3213 { 3214 int error; 3215 3216 error = revert_lock(r, lkb); 3217 if (error) { 3218 queue_cast(r, lkb, -DLM_ECANCEL); 3219 return -DLM_ECANCEL; 3220 } 3221 return 0; 3222 } 3223 3224 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb, 3225 int error) 3226 { 3227 if (error) 3228 grant_pending_locks(r, NULL); 3229 } 3230 3231 /* 3232 * Four stage 3 varieties: 3233 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock() 3234 */ 3235 3236 /* add a new lkb to a possibly new rsb, called by requesting process */ 3237 3238 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 3239 { 3240 int error; 3241 3242 /* set_master: sets lkb nodeid from r */ 3243 3244 error = set_master(r, lkb); 3245 if (error < 0) 3246 goto out; 3247 if (error) { 3248 error = 0; 3249 goto out; 3250 } 3251 3252 if (is_remote(r)) { 3253 /* receive_request() calls do_request() on remote node */ 3254 error = send_request(r, lkb); 3255 } else { 3256 error = do_request(r, lkb); 3257 /* for remote locks the request_reply is sent 3258 between do_request and do_request_effects */ 3259 do_request_effects(r, lkb, error); 3260 } 3261 out: 3262 return error; 3263 } 3264 3265 /* change some property of an existing lkb, e.g. mode */ 3266 3267 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 3268 { 3269 int error; 3270 3271 if (is_remote(r)) { 3272 /* receive_convert() calls do_convert() on remote node */ 3273 error = send_convert(r, lkb); 3274 } else { 3275 error = do_convert(r, lkb); 3276 /* for remote locks the convert_reply is sent 3277 between do_convert and do_convert_effects */ 3278 do_convert_effects(r, lkb, error); 3279 } 3280 3281 return error; 3282 } 3283 3284 /* remove an existing lkb from the granted queue */ 3285 3286 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 3287 { 3288 int error; 3289 3290 if (is_remote(r)) { 3291 /* receive_unlock() calls do_unlock() on remote node */ 3292 error = send_unlock(r, lkb); 3293 } else { 3294 error = do_unlock(r, lkb); 3295 /* for remote locks the unlock_reply is sent 3296 between do_unlock and do_unlock_effects */ 3297 do_unlock_effects(r, lkb, error); 3298 } 3299 3300 return error; 3301 } 3302 3303 /* remove an existing lkb from the convert or wait queue */ 3304 3305 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 3306 { 3307 int error; 3308 3309 if (is_remote(r)) { 3310 /* receive_cancel() calls do_cancel() on remote node */ 3311 error = send_cancel(r, lkb); 3312 } else { 3313 error = do_cancel(r, lkb); 3314 /* for remote locks the cancel_reply is sent 3315 between do_cancel and do_cancel_effects */ 3316 do_cancel_effects(r, lkb, error); 3317 } 3318 3319 return error; 3320 } 3321 3322 /* 3323 * Four stage 2 varieties: 3324 * request_lock(), convert_lock(), unlock_lock(), cancel_lock() 3325 */ 3326 3327 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, 3328 const void *name, int len, 3329 struct dlm_args *args) 3330 { 3331 struct dlm_rsb *r; 3332 int error; 3333 3334 error = validate_lock_args(ls, lkb, args); 3335 if (error) 3336 return error; 3337 3338 error = find_rsb(ls, name, len, 0, R_REQUEST, &r); 3339 if (error) 3340 return error; 3341 3342 lock_rsb(r); 3343 3344 attach_lkb(r, lkb); 3345 lkb->lkb_lksb->sb_lkid = lkb->lkb_id; 3346 3347 error = _request_lock(r, lkb); 3348 3349 unlock_rsb(r); 3350 put_rsb(r); 3351 return error; 3352 } 3353 3354 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, 3355 struct dlm_args *args) 3356 { 3357 struct dlm_rsb *r; 3358 int error; 3359 3360 r = lkb->lkb_resource; 3361 3362 hold_rsb(r); 3363 lock_rsb(r); 3364 3365 error = validate_lock_args(ls, lkb, args); 3366 if (error) 3367 goto out; 3368 3369 error = _convert_lock(r, lkb); 3370 out: 3371 unlock_rsb(r); 3372 put_rsb(r); 3373 return error; 3374 } 3375 3376 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, 3377 struct dlm_args *args) 3378 { 3379 struct dlm_rsb *r; 3380 int error; 3381 3382 r = lkb->lkb_resource; 3383 3384 hold_rsb(r); 3385 lock_rsb(r); 3386 3387 error = validate_unlock_args(lkb, args); 3388 if (error) 3389 goto out; 3390 3391 error = _unlock_lock(r, lkb); 3392 out: 3393 unlock_rsb(r); 3394 put_rsb(r); 3395 return error; 3396 } 3397 3398 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, 3399 struct dlm_args *args) 3400 { 3401 struct dlm_rsb *r; 3402 int error; 3403 3404 r = lkb->lkb_resource; 3405 3406 hold_rsb(r); 3407 lock_rsb(r); 3408 3409 error = validate_unlock_args(lkb, args); 3410 if (error) 3411 goto out; 3412 3413 error = _cancel_lock(r, lkb); 3414 out: 3415 unlock_rsb(r); 3416 put_rsb(r); 3417 return error; 3418 } 3419 3420 /* 3421 * Two stage 1 varieties: dlm_lock() and dlm_unlock() 3422 */ 3423 3424 int dlm_lock(dlm_lockspace_t *lockspace, 3425 int mode, 3426 struct dlm_lksb *lksb, 3427 uint32_t flags, 3428 const void *name, 3429 unsigned int namelen, 3430 uint32_t parent_lkid, 3431 void (*ast) (void *astarg), 3432 void *astarg, 3433 void (*bast) (void *astarg, int mode)) 3434 { 3435 struct dlm_ls *ls; 3436 struct dlm_lkb *lkb; 3437 struct dlm_args args; 3438 int error, convert = flags & DLM_LKF_CONVERT; 3439 3440 ls = dlm_find_lockspace_local(lockspace); 3441 if (!ls) 3442 return -EINVAL; 3443 3444 dlm_lock_recovery(ls); 3445 3446 if (convert) 3447 error = find_lkb(ls, lksb->sb_lkid, &lkb); 3448 else 3449 error = create_lkb(ls, &lkb); 3450 3451 if (error) 3452 goto out; 3453 3454 trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags); 3455 3456 #ifdef CONFIG_DLM_DEPRECATED_API 3457 error = set_lock_args(mode, lksb, flags, namelen, 0, ast, 3458 astarg, bast, &args); 3459 #else 3460 error = set_lock_args(mode, lksb, flags, namelen, ast, astarg, bast, 3461 &args); 3462 #endif 3463 if (error) 3464 goto out_put; 3465 3466 if (convert) 3467 error = convert_lock(ls, lkb, &args); 3468 else 3469 error = request_lock(ls, lkb, name, namelen, &args); 3470 3471 if (error == -EINPROGRESS) 3472 error = 0; 3473 out_put: 3474 trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, true); 3475 3476 if (convert || error) 3477 __put_lkb(ls, lkb); 3478 if (error == -EAGAIN || error == -EDEADLK) 3479 error = 0; 3480 out: 3481 dlm_unlock_recovery(ls); 3482 dlm_put_lockspace(ls); 3483 return error; 3484 } 3485 3486 int dlm_unlock(dlm_lockspace_t *lockspace, 3487 uint32_t lkid, 3488 uint32_t flags, 3489 struct dlm_lksb *lksb, 3490 void *astarg) 3491 { 3492 struct dlm_ls *ls; 3493 struct dlm_lkb *lkb; 3494 struct dlm_args args; 3495 int error; 3496 3497 ls = dlm_find_lockspace_local(lockspace); 3498 if (!ls) 3499 return -EINVAL; 3500 3501 dlm_lock_recovery(ls); 3502 3503 error = find_lkb(ls, lkid, &lkb); 3504 if (error) 3505 goto out; 3506 3507 trace_dlm_unlock_start(ls, lkb, flags); 3508 3509 error = set_unlock_args(flags, astarg, &args); 3510 if (error) 3511 goto out_put; 3512 3513 if (flags & DLM_LKF_CANCEL) 3514 error = cancel_lock(ls, lkb, &args); 3515 else 3516 error = unlock_lock(ls, lkb, &args); 3517 3518 if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL) 3519 error = 0; 3520 if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK))) 3521 error = 0; 3522 out_put: 3523 trace_dlm_unlock_end(ls, lkb, flags, error); 3524 3525 dlm_put_lkb(lkb); 3526 out: 3527 dlm_unlock_recovery(ls); 3528 dlm_put_lockspace(ls); 3529 return error; 3530 } 3531 3532 /* 3533 * send/receive routines for remote operations and replies 3534 * 3535 * send_args 3536 * send_common 3537 * send_request receive_request 3538 * send_convert receive_convert 3539 * send_unlock receive_unlock 3540 * send_cancel receive_cancel 3541 * send_grant receive_grant 3542 * send_bast receive_bast 3543 * send_lookup receive_lookup 3544 * send_remove receive_remove 3545 * 3546 * send_common_reply 3547 * receive_request_reply send_request_reply 3548 * receive_convert_reply send_convert_reply 3549 * receive_unlock_reply send_unlock_reply 3550 * receive_cancel_reply send_cancel_reply 3551 * receive_lookup_reply send_lookup_reply 3552 */ 3553 3554 static int _create_message(struct dlm_ls *ls, int mb_len, 3555 int to_nodeid, int mstype, 3556 struct dlm_message **ms_ret, 3557 struct dlm_mhandle **mh_ret) 3558 { 3559 struct dlm_message *ms; 3560 struct dlm_mhandle *mh; 3561 char *mb; 3562 3563 /* get_buffer gives us a message handle (mh) that we need to 3564 pass into midcomms_commit and a message buffer (mb) that we 3565 write our data into */ 3566 3567 mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, GFP_NOFS, &mb); 3568 if (!mh) 3569 return -ENOBUFS; 3570 3571 ms = (struct dlm_message *) mb; 3572 3573 ms->m_header.h_version = cpu_to_le32(DLM_HEADER_MAJOR | DLM_HEADER_MINOR); 3574 ms->m_header.u.h_lockspace = cpu_to_le32(ls->ls_global_id); 3575 ms->m_header.h_nodeid = cpu_to_le32(dlm_our_nodeid()); 3576 ms->m_header.h_length = cpu_to_le16(mb_len); 3577 ms->m_header.h_cmd = DLM_MSG; 3578 3579 ms->m_type = cpu_to_le32(mstype); 3580 3581 *mh_ret = mh; 3582 *ms_ret = ms; 3583 return 0; 3584 } 3585 3586 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb, 3587 int to_nodeid, int mstype, 3588 struct dlm_message **ms_ret, 3589 struct dlm_mhandle **mh_ret) 3590 { 3591 int mb_len = sizeof(struct dlm_message); 3592 3593 switch (mstype) { 3594 case DLM_MSG_REQUEST: 3595 case DLM_MSG_LOOKUP: 3596 case DLM_MSG_REMOVE: 3597 mb_len += r->res_length; 3598 break; 3599 case DLM_MSG_CONVERT: 3600 case DLM_MSG_UNLOCK: 3601 case DLM_MSG_REQUEST_REPLY: 3602 case DLM_MSG_CONVERT_REPLY: 3603 case DLM_MSG_GRANT: 3604 if (lkb && lkb->lkb_lvbptr) 3605 mb_len += r->res_ls->ls_lvblen; 3606 break; 3607 } 3608 3609 return _create_message(r->res_ls, mb_len, to_nodeid, mstype, 3610 ms_ret, mh_ret); 3611 } 3612 3613 /* further lowcomms enhancements or alternate implementations may make 3614 the return value from this function useful at some point */ 3615 3616 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms, 3617 const void *name, int namelen) 3618 { 3619 dlm_midcomms_commit_mhandle(mh, name, namelen); 3620 return 0; 3621 } 3622 3623 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb, 3624 struct dlm_message *ms) 3625 { 3626 ms->m_nodeid = cpu_to_le32(lkb->lkb_nodeid); 3627 ms->m_pid = cpu_to_le32(lkb->lkb_ownpid); 3628 ms->m_lkid = cpu_to_le32(lkb->lkb_id); 3629 ms->m_remid = cpu_to_le32(lkb->lkb_remid); 3630 ms->m_exflags = cpu_to_le32(lkb->lkb_exflags); 3631 ms->m_sbflags = cpu_to_le32(lkb->lkb_sbflags); 3632 ms->m_flags = cpu_to_le32(lkb->lkb_flags); 3633 ms->m_lvbseq = cpu_to_le32(lkb->lkb_lvbseq); 3634 ms->m_status = cpu_to_le32(lkb->lkb_status); 3635 ms->m_grmode = cpu_to_le32(lkb->lkb_grmode); 3636 ms->m_rqmode = cpu_to_le32(lkb->lkb_rqmode); 3637 ms->m_hash = cpu_to_le32(r->res_hash); 3638 3639 /* m_result and m_bastmode are set from function args, 3640 not from lkb fields */ 3641 3642 if (lkb->lkb_bastfn) 3643 ms->m_asts |= cpu_to_le32(DLM_CB_BAST); 3644 if (lkb->lkb_astfn) 3645 ms->m_asts |= cpu_to_le32(DLM_CB_CAST); 3646 3647 /* compare with switch in create_message; send_remove() doesn't 3648 use send_args() */ 3649 3650 switch (ms->m_type) { 3651 case cpu_to_le32(DLM_MSG_REQUEST): 3652 case cpu_to_le32(DLM_MSG_LOOKUP): 3653 memcpy(ms->m_extra, r->res_name, r->res_length); 3654 break; 3655 case cpu_to_le32(DLM_MSG_CONVERT): 3656 case cpu_to_le32(DLM_MSG_UNLOCK): 3657 case cpu_to_le32(DLM_MSG_REQUEST_REPLY): 3658 case cpu_to_le32(DLM_MSG_CONVERT_REPLY): 3659 case cpu_to_le32(DLM_MSG_GRANT): 3660 if (!lkb->lkb_lvbptr || !(lkb->lkb_exflags & DLM_LKF_VALBLK)) 3661 break; 3662 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen); 3663 break; 3664 } 3665 } 3666 3667 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype) 3668 { 3669 struct dlm_message *ms; 3670 struct dlm_mhandle *mh; 3671 int to_nodeid, error; 3672 3673 to_nodeid = r->res_nodeid; 3674 3675 error = add_to_waiters(lkb, mstype, to_nodeid); 3676 if (error) 3677 return error; 3678 3679 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh); 3680 if (error) 3681 goto fail; 3682 3683 send_args(r, lkb, ms); 3684 3685 error = send_message(mh, ms, r->res_name, r->res_length); 3686 if (error) 3687 goto fail; 3688 return 0; 3689 3690 fail: 3691 remove_from_waiters(lkb, msg_reply_type(mstype)); 3692 return error; 3693 } 3694 3695 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb) 3696 { 3697 return send_common(r, lkb, DLM_MSG_REQUEST); 3698 } 3699 3700 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb) 3701 { 3702 int error; 3703 3704 error = send_common(r, lkb, DLM_MSG_CONVERT); 3705 3706 /* down conversions go without a reply from the master */ 3707 if (!error && down_conversion(lkb)) { 3708 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY); 3709 r->res_ls->ls_stub_ms.m_flags = cpu_to_le32(DLM_IFL_STUB_MS); 3710 r->res_ls->ls_stub_ms.m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY); 3711 r->res_ls->ls_stub_ms.m_result = 0; 3712 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms); 3713 } 3714 3715 return error; 3716 } 3717 3718 /* FIXME: if this lkb is the only lock we hold on the rsb, then set 3719 MASTER_UNCERTAIN to force the next request on the rsb to confirm 3720 that the master is still correct. */ 3721 3722 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb) 3723 { 3724 return send_common(r, lkb, DLM_MSG_UNLOCK); 3725 } 3726 3727 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb) 3728 { 3729 return send_common(r, lkb, DLM_MSG_CANCEL); 3730 } 3731 3732 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb) 3733 { 3734 struct dlm_message *ms; 3735 struct dlm_mhandle *mh; 3736 int to_nodeid, error; 3737 3738 to_nodeid = lkb->lkb_nodeid; 3739 3740 error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh); 3741 if (error) 3742 goto out; 3743 3744 send_args(r, lkb, ms); 3745 3746 ms->m_result = 0; 3747 3748 error = send_message(mh, ms, r->res_name, r->res_length); 3749 out: 3750 return error; 3751 } 3752 3753 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode) 3754 { 3755 struct dlm_message *ms; 3756 struct dlm_mhandle *mh; 3757 int to_nodeid, error; 3758 3759 to_nodeid = lkb->lkb_nodeid; 3760 3761 error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh); 3762 if (error) 3763 goto out; 3764 3765 send_args(r, lkb, ms); 3766 3767 ms->m_bastmode = cpu_to_le32(mode); 3768 3769 error = send_message(mh, ms, r->res_name, r->res_length); 3770 out: 3771 return error; 3772 } 3773 3774 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb) 3775 { 3776 struct dlm_message *ms; 3777 struct dlm_mhandle *mh; 3778 int to_nodeid, error; 3779 3780 to_nodeid = dlm_dir_nodeid(r); 3781 3782 error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid); 3783 if (error) 3784 return error; 3785 3786 error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh); 3787 if (error) 3788 goto fail; 3789 3790 send_args(r, lkb, ms); 3791 3792 error = send_message(mh, ms, r->res_name, r->res_length); 3793 if (error) 3794 goto fail; 3795 return 0; 3796 3797 fail: 3798 remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY); 3799 return error; 3800 } 3801 3802 static int send_remove(struct dlm_rsb *r) 3803 { 3804 struct dlm_message *ms; 3805 struct dlm_mhandle *mh; 3806 int to_nodeid, error; 3807 3808 to_nodeid = dlm_dir_nodeid(r); 3809 3810 error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh); 3811 if (error) 3812 goto out; 3813 3814 memcpy(ms->m_extra, r->res_name, r->res_length); 3815 ms->m_hash = cpu_to_le32(r->res_hash); 3816 3817 error = send_message(mh, ms, r->res_name, r->res_length); 3818 out: 3819 return error; 3820 } 3821 3822 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, 3823 int mstype, int rv) 3824 { 3825 struct dlm_message *ms; 3826 struct dlm_mhandle *mh; 3827 int to_nodeid, error; 3828 3829 to_nodeid = lkb->lkb_nodeid; 3830 3831 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh); 3832 if (error) 3833 goto out; 3834 3835 send_args(r, lkb, ms); 3836 3837 ms->m_result = cpu_to_le32(to_dlm_errno(rv)); 3838 3839 error = send_message(mh, ms, r->res_name, r->res_length); 3840 out: 3841 return error; 3842 } 3843 3844 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) 3845 { 3846 return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv); 3847 } 3848 3849 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) 3850 { 3851 return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv); 3852 } 3853 3854 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) 3855 { 3856 return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv); 3857 } 3858 3859 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) 3860 { 3861 return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv); 3862 } 3863 3864 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in, 3865 int ret_nodeid, int rv) 3866 { 3867 struct dlm_rsb *r = &ls->ls_stub_rsb; 3868 struct dlm_message *ms; 3869 struct dlm_mhandle *mh; 3870 int error, nodeid = le32_to_cpu(ms_in->m_header.h_nodeid); 3871 3872 error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh); 3873 if (error) 3874 goto out; 3875 3876 ms->m_lkid = ms_in->m_lkid; 3877 ms->m_result = cpu_to_le32(to_dlm_errno(rv)); 3878 ms->m_nodeid = cpu_to_le32(ret_nodeid); 3879 3880 error = send_message(mh, ms, ms_in->m_extra, receive_extralen(ms_in)); 3881 out: 3882 return error; 3883 } 3884 3885 /* which args we save from a received message depends heavily on the type 3886 of message, unlike the send side where we can safely send everything about 3887 the lkb for any type of message */ 3888 3889 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms) 3890 { 3891 lkb->lkb_exflags = le32_to_cpu(ms->m_exflags); 3892 lkb->lkb_sbflags = le32_to_cpu(ms->m_sbflags); 3893 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) | 3894 (le32_to_cpu(ms->m_flags) & 0x0000FFFF); 3895 } 3896 3897 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms) 3898 { 3899 if (ms->m_flags == cpu_to_le32(DLM_IFL_STUB_MS)) 3900 return; 3901 3902 lkb->lkb_sbflags = le32_to_cpu(ms->m_sbflags); 3903 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) | 3904 (le32_to_cpu(ms->m_flags) & 0x0000FFFF); 3905 } 3906 3907 static int receive_extralen(struct dlm_message *ms) 3908 { 3909 return (le16_to_cpu(ms->m_header.h_length) - 3910 sizeof(struct dlm_message)); 3911 } 3912 3913 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb, 3914 struct dlm_message *ms) 3915 { 3916 int len; 3917 3918 if (lkb->lkb_exflags & DLM_LKF_VALBLK) { 3919 if (!lkb->lkb_lvbptr) 3920 lkb->lkb_lvbptr = dlm_allocate_lvb(ls); 3921 if (!lkb->lkb_lvbptr) 3922 return -ENOMEM; 3923 len = receive_extralen(ms); 3924 if (len > ls->ls_lvblen) 3925 len = ls->ls_lvblen; 3926 memcpy(lkb->lkb_lvbptr, ms->m_extra, len); 3927 } 3928 return 0; 3929 } 3930 3931 static void fake_bastfn(void *astparam, int mode) 3932 { 3933 log_print("fake_bastfn should not be called"); 3934 } 3935 3936 static void fake_astfn(void *astparam) 3937 { 3938 log_print("fake_astfn should not be called"); 3939 } 3940 3941 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb, 3942 struct dlm_message *ms) 3943 { 3944 lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid); 3945 lkb->lkb_ownpid = le32_to_cpu(ms->m_pid); 3946 lkb->lkb_remid = le32_to_cpu(ms->m_lkid); 3947 lkb->lkb_grmode = DLM_LOCK_IV; 3948 lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode); 3949 3950 lkb->lkb_bastfn = (ms->m_asts & cpu_to_le32(DLM_CB_BAST)) ? &fake_bastfn : NULL; 3951 lkb->lkb_astfn = (ms->m_asts & cpu_to_le32(DLM_CB_CAST)) ? &fake_astfn : NULL; 3952 3953 if (lkb->lkb_exflags & DLM_LKF_VALBLK) { 3954 /* lkb was just created so there won't be an lvb yet */ 3955 lkb->lkb_lvbptr = dlm_allocate_lvb(ls); 3956 if (!lkb->lkb_lvbptr) 3957 return -ENOMEM; 3958 } 3959 3960 return 0; 3961 } 3962 3963 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb, 3964 struct dlm_message *ms) 3965 { 3966 if (lkb->lkb_status != DLM_LKSTS_GRANTED) 3967 return -EBUSY; 3968 3969 if (receive_lvb(ls, lkb, ms)) 3970 return -ENOMEM; 3971 3972 lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode); 3973 lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq); 3974 3975 return 0; 3976 } 3977 3978 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, 3979 struct dlm_message *ms) 3980 { 3981 if (receive_lvb(ls, lkb, ms)) 3982 return -ENOMEM; 3983 return 0; 3984 } 3985 3986 /* We fill in the stub-lkb fields with the info that send_xxxx_reply() 3987 uses to send a reply and that the remote end uses to process the reply. */ 3988 3989 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms) 3990 { 3991 struct dlm_lkb *lkb = &ls->ls_stub_lkb; 3992 lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid); 3993 lkb->lkb_remid = le32_to_cpu(ms->m_lkid); 3994 } 3995 3996 /* This is called after the rsb is locked so that we can safely inspect 3997 fields in the lkb. */ 3998 3999 static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms) 4000 { 4001 int from = le32_to_cpu(ms->m_header.h_nodeid); 4002 int error = 0; 4003 4004 /* currently mixing of user/kernel locks are not supported */ 4005 if (ms->m_flags & cpu_to_le32(DLM_IFL_USER) && 4006 ~lkb->lkb_flags & DLM_IFL_USER) { 4007 log_error(lkb->lkb_resource->res_ls, 4008 "got user dlm message for a kernel lock"); 4009 error = -EINVAL; 4010 goto out; 4011 } 4012 4013 switch (ms->m_type) { 4014 case cpu_to_le32(DLM_MSG_CONVERT): 4015 case cpu_to_le32(DLM_MSG_UNLOCK): 4016 case cpu_to_le32(DLM_MSG_CANCEL): 4017 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from) 4018 error = -EINVAL; 4019 break; 4020 4021 case cpu_to_le32(DLM_MSG_CONVERT_REPLY): 4022 case cpu_to_le32(DLM_MSG_UNLOCK_REPLY): 4023 case cpu_to_le32(DLM_MSG_CANCEL_REPLY): 4024 case cpu_to_le32(DLM_MSG_GRANT): 4025 case cpu_to_le32(DLM_MSG_BAST): 4026 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from) 4027 error = -EINVAL; 4028 break; 4029 4030 case cpu_to_le32(DLM_MSG_REQUEST_REPLY): 4031 if (!is_process_copy(lkb)) 4032 error = -EINVAL; 4033 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from) 4034 error = -EINVAL; 4035 break; 4036 4037 default: 4038 error = -EINVAL; 4039 } 4040 4041 out: 4042 if (error) 4043 log_error(lkb->lkb_resource->res_ls, 4044 "ignore invalid message %d from %d %x %x %x %d", 4045 le32_to_cpu(ms->m_type), from, lkb->lkb_id, 4046 lkb->lkb_remid, lkb->lkb_flags, lkb->lkb_nodeid); 4047 return error; 4048 } 4049 4050 static int receive_request(struct dlm_ls *ls, struct dlm_message *ms) 4051 { 4052 struct dlm_lkb *lkb; 4053 struct dlm_rsb *r; 4054 int from_nodeid; 4055 int error, namelen = 0; 4056 4057 from_nodeid = le32_to_cpu(ms->m_header.h_nodeid); 4058 4059 error = create_lkb(ls, &lkb); 4060 if (error) 4061 goto fail; 4062 4063 receive_flags(lkb, ms); 4064 lkb->lkb_flags |= DLM_IFL_MSTCPY; 4065 error = receive_request_args(ls, lkb, ms); 4066 if (error) { 4067 __put_lkb(ls, lkb); 4068 goto fail; 4069 } 4070 4071 /* The dir node is the authority on whether we are the master 4072 for this rsb or not, so if the master sends us a request, we should 4073 recreate the rsb if we've destroyed it. This race happens when we 4074 send a remove message to the dir node at the same time that the dir 4075 node sends us a request for the rsb. */ 4076 4077 namelen = receive_extralen(ms); 4078 4079 error = find_rsb(ls, ms->m_extra, namelen, from_nodeid, 4080 R_RECEIVE_REQUEST, &r); 4081 if (error) { 4082 __put_lkb(ls, lkb); 4083 goto fail; 4084 } 4085 4086 lock_rsb(r); 4087 4088 if (r->res_master_nodeid != dlm_our_nodeid()) { 4089 error = validate_master_nodeid(ls, r, from_nodeid); 4090 if (error) { 4091 unlock_rsb(r); 4092 put_rsb(r); 4093 __put_lkb(ls, lkb); 4094 goto fail; 4095 } 4096 } 4097 4098 attach_lkb(r, lkb); 4099 error = do_request(r, lkb); 4100 send_request_reply(r, lkb, error); 4101 do_request_effects(r, lkb, error); 4102 4103 unlock_rsb(r); 4104 put_rsb(r); 4105 4106 if (error == -EINPROGRESS) 4107 error = 0; 4108 if (error) 4109 dlm_put_lkb(lkb); 4110 return 0; 4111 4112 fail: 4113 /* TODO: instead of returning ENOTBLK, add the lkb to res_lookup 4114 and do this receive_request again from process_lookup_list once 4115 we get the lookup reply. This would avoid a many repeated 4116 ENOTBLK request failures when the lookup reply designating us 4117 as master is delayed. */ 4118 4119 if (error != -ENOTBLK) { 4120 log_limit(ls, "receive_request %x from %d %d", 4121 le32_to_cpu(ms->m_lkid), from_nodeid, error); 4122 } 4123 4124 setup_stub_lkb(ls, ms); 4125 send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); 4126 return error; 4127 } 4128 4129 static int receive_convert(struct dlm_ls *ls, struct dlm_message *ms) 4130 { 4131 struct dlm_lkb *lkb; 4132 struct dlm_rsb *r; 4133 int error, reply = 1; 4134 4135 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb); 4136 if (error) 4137 goto fail; 4138 4139 if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) { 4140 log_error(ls, "receive_convert %x remid %x recover_seq %llu " 4141 "remote %d %x", lkb->lkb_id, lkb->lkb_remid, 4142 (unsigned long long)lkb->lkb_recover_seq, 4143 le32_to_cpu(ms->m_header.h_nodeid), 4144 le32_to_cpu(ms->m_lkid)); 4145 error = -ENOENT; 4146 dlm_put_lkb(lkb); 4147 goto fail; 4148 } 4149 4150 r = lkb->lkb_resource; 4151 4152 hold_rsb(r); 4153 lock_rsb(r); 4154 4155 error = validate_message(lkb, ms); 4156 if (error) 4157 goto out; 4158 4159 receive_flags(lkb, ms); 4160 4161 error = receive_convert_args(ls, lkb, ms); 4162 if (error) { 4163 send_convert_reply(r, lkb, error); 4164 goto out; 4165 } 4166 4167 reply = !down_conversion(lkb); 4168 4169 error = do_convert(r, lkb); 4170 if (reply) 4171 send_convert_reply(r, lkb, error); 4172 do_convert_effects(r, lkb, error); 4173 out: 4174 unlock_rsb(r); 4175 put_rsb(r); 4176 dlm_put_lkb(lkb); 4177 return 0; 4178 4179 fail: 4180 setup_stub_lkb(ls, ms); 4181 send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); 4182 return error; 4183 } 4184 4185 static int receive_unlock(struct dlm_ls *ls, struct dlm_message *ms) 4186 { 4187 struct dlm_lkb *lkb; 4188 struct dlm_rsb *r; 4189 int error; 4190 4191 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb); 4192 if (error) 4193 goto fail; 4194 4195 if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) { 4196 log_error(ls, "receive_unlock %x remid %x remote %d %x", 4197 lkb->lkb_id, lkb->lkb_remid, 4198 le32_to_cpu(ms->m_header.h_nodeid), 4199 le32_to_cpu(ms->m_lkid)); 4200 error = -ENOENT; 4201 dlm_put_lkb(lkb); 4202 goto fail; 4203 } 4204 4205 r = lkb->lkb_resource; 4206 4207 hold_rsb(r); 4208 lock_rsb(r); 4209 4210 error = validate_message(lkb, ms); 4211 if (error) 4212 goto out; 4213 4214 receive_flags(lkb, ms); 4215 4216 error = receive_unlock_args(ls, lkb, ms); 4217 if (error) { 4218 send_unlock_reply(r, lkb, error); 4219 goto out; 4220 } 4221 4222 error = do_unlock(r, lkb); 4223 send_unlock_reply(r, lkb, error); 4224 do_unlock_effects(r, lkb, error); 4225 out: 4226 unlock_rsb(r); 4227 put_rsb(r); 4228 dlm_put_lkb(lkb); 4229 return 0; 4230 4231 fail: 4232 setup_stub_lkb(ls, ms); 4233 send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); 4234 return error; 4235 } 4236 4237 static int receive_cancel(struct dlm_ls *ls, struct dlm_message *ms) 4238 { 4239 struct dlm_lkb *lkb; 4240 struct dlm_rsb *r; 4241 int error; 4242 4243 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb); 4244 if (error) 4245 goto fail; 4246 4247 receive_flags(lkb, ms); 4248 4249 r = lkb->lkb_resource; 4250 4251 hold_rsb(r); 4252 lock_rsb(r); 4253 4254 error = validate_message(lkb, ms); 4255 if (error) 4256 goto out; 4257 4258 error = do_cancel(r, lkb); 4259 send_cancel_reply(r, lkb, error); 4260 do_cancel_effects(r, lkb, error); 4261 out: 4262 unlock_rsb(r); 4263 put_rsb(r); 4264 dlm_put_lkb(lkb); 4265 return 0; 4266 4267 fail: 4268 setup_stub_lkb(ls, ms); 4269 send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); 4270 return error; 4271 } 4272 4273 static int receive_grant(struct dlm_ls *ls, struct dlm_message *ms) 4274 { 4275 struct dlm_lkb *lkb; 4276 struct dlm_rsb *r; 4277 int error; 4278 4279 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb); 4280 if (error) 4281 return error; 4282 4283 r = lkb->lkb_resource; 4284 4285 hold_rsb(r); 4286 lock_rsb(r); 4287 4288 error = validate_message(lkb, ms); 4289 if (error) 4290 goto out; 4291 4292 receive_flags_reply(lkb, ms); 4293 if (is_altmode(lkb)) 4294 munge_altmode(lkb, ms); 4295 grant_lock_pc(r, lkb, ms); 4296 queue_cast(r, lkb, 0); 4297 out: 4298 unlock_rsb(r); 4299 put_rsb(r); 4300 dlm_put_lkb(lkb); 4301 return 0; 4302 } 4303 4304 static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms) 4305 { 4306 struct dlm_lkb *lkb; 4307 struct dlm_rsb *r; 4308 int error; 4309 4310 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb); 4311 if (error) 4312 return error; 4313 4314 r = lkb->lkb_resource; 4315 4316 hold_rsb(r); 4317 lock_rsb(r); 4318 4319 error = validate_message(lkb, ms); 4320 if (error) 4321 goto out; 4322 4323 queue_bast(r, lkb, le32_to_cpu(ms->m_bastmode)); 4324 lkb->lkb_highbast = le32_to_cpu(ms->m_bastmode); 4325 out: 4326 unlock_rsb(r); 4327 put_rsb(r); 4328 dlm_put_lkb(lkb); 4329 return 0; 4330 } 4331 4332 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms) 4333 { 4334 int len, error, ret_nodeid, from_nodeid, our_nodeid; 4335 4336 from_nodeid = le32_to_cpu(ms->m_header.h_nodeid); 4337 our_nodeid = dlm_our_nodeid(); 4338 4339 len = receive_extralen(ms); 4340 4341 error = dlm_master_lookup(ls, from_nodeid, ms->m_extra, len, 0, 4342 &ret_nodeid, NULL); 4343 4344 /* Optimization: we're master so treat lookup as a request */ 4345 if (!error && ret_nodeid == our_nodeid) { 4346 receive_request(ls, ms); 4347 return; 4348 } 4349 send_lookup_reply(ls, ms, ret_nodeid, error); 4350 } 4351 4352 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms) 4353 { 4354 char name[DLM_RESNAME_MAXLEN+1]; 4355 struct dlm_rsb *r; 4356 uint32_t hash, b; 4357 int rv, len, dir_nodeid, from_nodeid; 4358 4359 from_nodeid = le32_to_cpu(ms->m_header.h_nodeid); 4360 4361 len = receive_extralen(ms); 4362 4363 if (len > DLM_RESNAME_MAXLEN) { 4364 log_error(ls, "receive_remove from %d bad len %d", 4365 from_nodeid, len); 4366 return; 4367 } 4368 4369 dir_nodeid = dlm_hash2nodeid(ls, le32_to_cpu(ms->m_hash)); 4370 if (dir_nodeid != dlm_our_nodeid()) { 4371 log_error(ls, "receive_remove from %d bad nodeid %d", 4372 from_nodeid, dir_nodeid); 4373 return; 4374 } 4375 4376 /* Look for name on rsbtbl.toss, if it's there, kill it. 4377 If it's on rsbtbl.keep, it's being used, and we should ignore this 4378 message. This is an expected race between the dir node sending a 4379 request to the master node at the same time as the master node sends 4380 a remove to the dir node. The resolution to that race is for the 4381 dir node to ignore the remove message, and the master node to 4382 recreate the master rsb when it gets a request from the dir node for 4383 an rsb it doesn't have. */ 4384 4385 memset(name, 0, sizeof(name)); 4386 memcpy(name, ms->m_extra, len); 4387 4388 hash = jhash(name, len, 0); 4389 b = hash & (ls->ls_rsbtbl_size - 1); 4390 4391 spin_lock(&ls->ls_rsbtbl[b].lock); 4392 4393 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); 4394 if (rv) { 4395 /* verify the rsb is on keep list per comment above */ 4396 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); 4397 if (rv) { 4398 /* should not happen */ 4399 log_error(ls, "receive_remove from %d not found %s", 4400 from_nodeid, name); 4401 spin_unlock(&ls->ls_rsbtbl[b].lock); 4402 return; 4403 } 4404 if (r->res_master_nodeid != from_nodeid) { 4405 /* should not happen */ 4406 log_error(ls, "receive_remove keep from %d master %d", 4407 from_nodeid, r->res_master_nodeid); 4408 dlm_print_rsb(r); 4409 spin_unlock(&ls->ls_rsbtbl[b].lock); 4410 return; 4411 } 4412 4413 log_debug(ls, "receive_remove from %d master %d first %x %s", 4414 from_nodeid, r->res_master_nodeid, r->res_first_lkid, 4415 name); 4416 spin_unlock(&ls->ls_rsbtbl[b].lock); 4417 return; 4418 } 4419 4420 if (r->res_master_nodeid != from_nodeid) { 4421 log_error(ls, "receive_remove toss from %d master %d", 4422 from_nodeid, r->res_master_nodeid); 4423 dlm_print_rsb(r); 4424 spin_unlock(&ls->ls_rsbtbl[b].lock); 4425 return; 4426 } 4427 4428 if (kref_put(&r->res_ref, kill_rsb)) { 4429 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); 4430 spin_unlock(&ls->ls_rsbtbl[b].lock); 4431 dlm_free_rsb(r); 4432 } else { 4433 log_error(ls, "receive_remove from %d rsb ref error", 4434 from_nodeid); 4435 dlm_print_rsb(r); 4436 spin_unlock(&ls->ls_rsbtbl[b].lock); 4437 } 4438 } 4439 4440 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms) 4441 { 4442 do_purge(ls, le32_to_cpu(ms->m_nodeid), le32_to_cpu(ms->m_pid)); 4443 } 4444 4445 static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) 4446 { 4447 struct dlm_lkb *lkb; 4448 struct dlm_rsb *r; 4449 int error, mstype, result; 4450 int from_nodeid = le32_to_cpu(ms->m_header.h_nodeid); 4451 4452 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb); 4453 if (error) 4454 return error; 4455 4456 r = lkb->lkb_resource; 4457 hold_rsb(r); 4458 lock_rsb(r); 4459 4460 error = validate_message(lkb, ms); 4461 if (error) 4462 goto out; 4463 4464 mstype = lkb->lkb_wait_type; 4465 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY); 4466 if (error) { 4467 log_error(ls, "receive_request_reply %x remote %d %x result %d", 4468 lkb->lkb_id, from_nodeid, le32_to_cpu(ms->m_lkid), 4469 from_dlm_errno(le32_to_cpu(ms->m_result))); 4470 dlm_dump_rsb(r); 4471 goto out; 4472 } 4473 4474 /* Optimization: the dir node was also the master, so it took our 4475 lookup as a request and sent request reply instead of lookup reply */ 4476 if (mstype == DLM_MSG_LOOKUP) { 4477 r->res_master_nodeid = from_nodeid; 4478 r->res_nodeid = from_nodeid; 4479 lkb->lkb_nodeid = from_nodeid; 4480 } 4481 4482 /* this is the value returned from do_request() on the master */ 4483 result = from_dlm_errno(le32_to_cpu(ms->m_result)); 4484 4485 switch (result) { 4486 case -EAGAIN: 4487 /* request would block (be queued) on remote master */ 4488 queue_cast(r, lkb, -EAGAIN); 4489 confirm_master(r, -EAGAIN); 4490 unhold_lkb(lkb); /* undoes create_lkb() */ 4491 break; 4492 4493 case -EINPROGRESS: 4494 case 0: 4495 /* request was queued or granted on remote master */ 4496 receive_flags_reply(lkb, ms); 4497 lkb->lkb_remid = le32_to_cpu(ms->m_lkid); 4498 if (is_altmode(lkb)) 4499 munge_altmode(lkb, ms); 4500 if (result) { 4501 add_lkb(r, lkb, DLM_LKSTS_WAITING); 4502 add_timeout(lkb); 4503 } else { 4504 grant_lock_pc(r, lkb, ms); 4505 queue_cast(r, lkb, 0); 4506 } 4507 confirm_master(r, result); 4508 break; 4509 4510 case -EBADR: 4511 case -ENOTBLK: 4512 /* find_rsb failed to find rsb or rsb wasn't master */ 4513 log_limit(ls, "receive_request_reply %x from %d %d " 4514 "master %d dir %d first %x %s", lkb->lkb_id, 4515 from_nodeid, result, r->res_master_nodeid, 4516 r->res_dir_nodeid, r->res_first_lkid, r->res_name); 4517 4518 if (r->res_dir_nodeid != dlm_our_nodeid() && 4519 r->res_master_nodeid != dlm_our_nodeid()) { 4520 /* cause _request_lock->set_master->send_lookup */ 4521 r->res_master_nodeid = 0; 4522 r->res_nodeid = -1; 4523 lkb->lkb_nodeid = -1; 4524 } 4525 4526 if (is_overlap(lkb)) { 4527 /* we'll ignore error in cancel/unlock reply */ 4528 queue_cast_overlap(r, lkb); 4529 confirm_master(r, result); 4530 unhold_lkb(lkb); /* undoes create_lkb() */ 4531 } else { 4532 _request_lock(r, lkb); 4533 4534 if (r->res_master_nodeid == dlm_our_nodeid()) 4535 confirm_master(r, 0); 4536 } 4537 break; 4538 4539 default: 4540 log_error(ls, "receive_request_reply %x error %d", 4541 lkb->lkb_id, result); 4542 } 4543 4544 if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) { 4545 log_debug(ls, "receive_request_reply %x result %d unlock", 4546 lkb->lkb_id, result); 4547 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK; 4548 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; 4549 send_unlock(r, lkb); 4550 } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) { 4551 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id); 4552 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK; 4553 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; 4554 send_cancel(r, lkb); 4555 } else { 4556 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; 4557 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK; 4558 } 4559 out: 4560 unlock_rsb(r); 4561 put_rsb(r); 4562 dlm_put_lkb(lkb); 4563 return 0; 4564 } 4565 4566 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, 4567 struct dlm_message *ms) 4568 { 4569 /* this is the value returned from do_convert() on the master */ 4570 switch (from_dlm_errno(le32_to_cpu(ms->m_result))) { 4571 case -EAGAIN: 4572 /* convert would block (be queued) on remote master */ 4573 queue_cast(r, lkb, -EAGAIN); 4574 break; 4575 4576 case -EDEADLK: 4577 receive_flags_reply(lkb, ms); 4578 revert_lock_pc(r, lkb); 4579 queue_cast(r, lkb, -EDEADLK); 4580 break; 4581 4582 case -EINPROGRESS: 4583 /* convert was queued on remote master */ 4584 receive_flags_reply(lkb, ms); 4585 if (is_demoted(lkb)) 4586 munge_demoted(lkb); 4587 del_lkb(r, lkb); 4588 add_lkb(r, lkb, DLM_LKSTS_CONVERT); 4589 add_timeout(lkb); 4590 break; 4591 4592 case 0: 4593 /* convert was granted on remote master */ 4594 receive_flags_reply(lkb, ms); 4595 if (is_demoted(lkb)) 4596 munge_demoted(lkb); 4597 grant_lock_pc(r, lkb, ms); 4598 queue_cast(r, lkb, 0); 4599 break; 4600 4601 default: 4602 log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d", 4603 lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid), 4604 le32_to_cpu(ms->m_lkid), 4605 from_dlm_errno(le32_to_cpu(ms->m_result))); 4606 dlm_print_rsb(r); 4607 dlm_print_lkb(lkb); 4608 } 4609 } 4610 4611 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms) 4612 { 4613 struct dlm_rsb *r = lkb->lkb_resource; 4614 int error; 4615 4616 hold_rsb(r); 4617 lock_rsb(r); 4618 4619 error = validate_message(lkb, ms); 4620 if (error) 4621 goto out; 4622 4623 /* stub reply can happen with waiters_mutex held */ 4624 error = remove_from_waiters_ms(lkb, ms); 4625 if (error) 4626 goto out; 4627 4628 __receive_convert_reply(r, lkb, ms); 4629 out: 4630 unlock_rsb(r); 4631 put_rsb(r); 4632 } 4633 4634 static int receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms) 4635 { 4636 struct dlm_lkb *lkb; 4637 int error; 4638 4639 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb); 4640 if (error) 4641 return error; 4642 4643 _receive_convert_reply(lkb, ms); 4644 dlm_put_lkb(lkb); 4645 return 0; 4646 } 4647 4648 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms) 4649 { 4650 struct dlm_rsb *r = lkb->lkb_resource; 4651 int error; 4652 4653 hold_rsb(r); 4654 lock_rsb(r); 4655 4656 error = validate_message(lkb, ms); 4657 if (error) 4658 goto out; 4659 4660 /* stub reply can happen with waiters_mutex held */ 4661 error = remove_from_waiters_ms(lkb, ms); 4662 if (error) 4663 goto out; 4664 4665 /* this is the value returned from do_unlock() on the master */ 4666 4667 switch (from_dlm_errno(le32_to_cpu(ms->m_result))) { 4668 case -DLM_EUNLOCK: 4669 receive_flags_reply(lkb, ms); 4670 remove_lock_pc(r, lkb); 4671 queue_cast(r, lkb, -DLM_EUNLOCK); 4672 break; 4673 case -ENOENT: 4674 break; 4675 default: 4676 log_error(r->res_ls, "receive_unlock_reply %x error %d", 4677 lkb->lkb_id, from_dlm_errno(le32_to_cpu(ms->m_result))); 4678 } 4679 out: 4680 unlock_rsb(r); 4681 put_rsb(r); 4682 } 4683 4684 static int receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms) 4685 { 4686 struct dlm_lkb *lkb; 4687 int error; 4688 4689 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb); 4690 if (error) 4691 return error; 4692 4693 _receive_unlock_reply(lkb, ms); 4694 dlm_put_lkb(lkb); 4695 return 0; 4696 } 4697 4698 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms) 4699 { 4700 struct dlm_rsb *r = lkb->lkb_resource; 4701 int error; 4702 4703 hold_rsb(r); 4704 lock_rsb(r); 4705 4706 error = validate_message(lkb, ms); 4707 if (error) 4708 goto out; 4709 4710 /* stub reply can happen with waiters_mutex held */ 4711 error = remove_from_waiters_ms(lkb, ms); 4712 if (error) 4713 goto out; 4714 4715 /* this is the value returned from do_cancel() on the master */ 4716 4717 switch (from_dlm_errno(le32_to_cpu(ms->m_result))) { 4718 case -DLM_ECANCEL: 4719 receive_flags_reply(lkb, ms); 4720 revert_lock_pc(r, lkb); 4721 queue_cast(r, lkb, -DLM_ECANCEL); 4722 break; 4723 case 0: 4724 break; 4725 default: 4726 log_error(r->res_ls, "receive_cancel_reply %x error %d", 4727 lkb->lkb_id, 4728 from_dlm_errno(le32_to_cpu(ms->m_result))); 4729 } 4730 out: 4731 unlock_rsb(r); 4732 put_rsb(r); 4733 } 4734 4735 static int receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms) 4736 { 4737 struct dlm_lkb *lkb; 4738 int error; 4739 4740 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb); 4741 if (error) 4742 return error; 4743 4744 _receive_cancel_reply(lkb, ms); 4745 dlm_put_lkb(lkb); 4746 return 0; 4747 } 4748 4749 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms) 4750 { 4751 struct dlm_lkb *lkb; 4752 struct dlm_rsb *r; 4753 int error, ret_nodeid; 4754 int do_lookup_list = 0; 4755 4756 error = find_lkb(ls, le32_to_cpu(ms->m_lkid), &lkb); 4757 if (error) { 4758 log_error(ls, "%s no lkid %x", __func__, 4759 le32_to_cpu(ms->m_lkid)); 4760 return; 4761 } 4762 4763 /* ms->m_result is the value returned by dlm_master_lookup on dir node 4764 FIXME: will a non-zero error ever be returned? */ 4765 4766 r = lkb->lkb_resource; 4767 hold_rsb(r); 4768 lock_rsb(r); 4769 4770 error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY); 4771 if (error) 4772 goto out; 4773 4774 ret_nodeid = le32_to_cpu(ms->m_nodeid); 4775 4776 /* We sometimes receive a request from the dir node for this 4777 rsb before we've received the dir node's loookup_reply for it. 4778 The request from the dir node implies we're the master, so we set 4779 ourself as master in receive_request_reply, and verify here that 4780 we are indeed the master. */ 4781 4782 if (r->res_master_nodeid && (r->res_master_nodeid != ret_nodeid)) { 4783 /* This should never happen */ 4784 log_error(ls, "receive_lookup_reply %x from %d ret %d " 4785 "master %d dir %d our %d first %x %s", 4786 lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid), 4787 ret_nodeid, r->res_master_nodeid, r->res_dir_nodeid, 4788 dlm_our_nodeid(), r->res_first_lkid, r->res_name); 4789 } 4790 4791 if (ret_nodeid == dlm_our_nodeid()) { 4792 r->res_master_nodeid = ret_nodeid; 4793 r->res_nodeid = 0; 4794 do_lookup_list = 1; 4795 r->res_first_lkid = 0; 4796 } else if (ret_nodeid == -1) { 4797 /* the remote node doesn't believe it's the dir node */ 4798 log_error(ls, "receive_lookup_reply %x from %d bad ret_nodeid", 4799 lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid)); 4800 r->res_master_nodeid = 0; 4801 r->res_nodeid = -1; 4802 lkb->lkb_nodeid = -1; 4803 } else { 4804 /* set_master() will set lkb_nodeid from r */ 4805 r->res_master_nodeid = ret_nodeid; 4806 r->res_nodeid = ret_nodeid; 4807 } 4808 4809 if (is_overlap(lkb)) { 4810 log_debug(ls, "receive_lookup_reply %x unlock %x", 4811 lkb->lkb_id, lkb->lkb_flags); 4812 queue_cast_overlap(r, lkb); 4813 unhold_lkb(lkb); /* undoes create_lkb() */ 4814 goto out_list; 4815 } 4816 4817 _request_lock(r, lkb); 4818 4819 out_list: 4820 if (do_lookup_list) 4821 process_lookup_list(r); 4822 out: 4823 unlock_rsb(r); 4824 put_rsb(r); 4825 dlm_put_lkb(lkb); 4826 } 4827 4828 static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms, 4829 uint32_t saved_seq) 4830 { 4831 int error = 0, noent = 0; 4832 4833 if (!dlm_is_member(ls, le32_to_cpu(ms->m_header.h_nodeid))) { 4834 log_limit(ls, "receive %d from non-member %d %x %x %d", 4835 le32_to_cpu(ms->m_type), 4836 le32_to_cpu(ms->m_header.h_nodeid), 4837 le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid), 4838 from_dlm_errno(le32_to_cpu(ms->m_result))); 4839 return; 4840 } 4841 4842 switch (ms->m_type) { 4843 4844 /* messages sent to a master node */ 4845 4846 case cpu_to_le32(DLM_MSG_REQUEST): 4847 error = receive_request(ls, ms); 4848 break; 4849 4850 case cpu_to_le32(DLM_MSG_CONVERT): 4851 error = receive_convert(ls, ms); 4852 break; 4853 4854 case cpu_to_le32(DLM_MSG_UNLOCK): 4855 error = receive_unlock(ls, ms); 4856 break; 4857 4858 case cpu_to_le32(DLM_MSG_CANCEL): 4859 noent = 1; 4860 error = receive_cancel(ls, ms); 4861 break; 4862 4863 /* messages sent from a master node (replies to above) */ 4864 4865 case cpu_to_le32(DLM_MSG_REQUEST_REPLY): 4866 error = receive_request_reply(ls, ms); 4867 break; 4868 4869 case cpu_to_le32(DLM_MSG_CONVERT_REPLY): 4870 error = receive_convert_reply(ls, ms); 4871 break; 4872 4873 case cpu_to_le32(DLM_MSG_UNLOCK_REPLY): 4874 error = receive_unlock_reply(ls, ms); 4875 break; 4876 4877 case cpu_to_le32(DLM_MSG_CANCEL_REPLY): 4878 error = receive_cancel_reply(ls, ms); 4879 break; 4880 4881 /* messages sent from a master node (only two types of async msg) */ 4882 4883 case cpu_to_le32(DLM_MSG_GRANT): 4884 noent = 1; 4885 error = receive_grant(ls, ms); 4886 break; 4887 4888 case cpu_to_le32(DLM_MSG_BAST): 4889 noent = 1; 4890 error = receive_bast(ls, ms); 4891 break; 4892 4893 /* messages sent to a dir node */ 4894 4895 case cpu_to_le32(DLM_MSG_LOOKUP): 4896 receive_lookup(ls, ms); 4897 break; 4898 4899 case cpu_to_le32(DLM_MSG_REMOVE): 4900 receive_remove(ls, ms); 4901 break; 4902 4903 /* messages sent from a dir node (remove has no reply) */ 4904 4905 case cpu_to_le32(DLM_MSG_LOOKUP_REPLY): 4906 receive_lookup_reply(ls, ms); 4907 break; 4908 4909 /* other messages */ 4910 4911 case cpu_to_le32(DLM_MSG_PURGE): 4912 receive_purge(ls, ms); 4913 break; 4914 4915 default: 4916 log_error(ls, "unknown message type %d", 4917 le32_to_cpu(ms->m_type)); 4918 } 4919 4920 /* 4921 * When checking for ENOENT, we're checking the result of 4922 * find_lkb(m_remid): 4923 * 4924 * The lock id referenced in the message wasn't found. This may 4925 * happen in normal usage for the async messages and cancel, so 4926 * only use log_debug for them. 4927 * 4928 * Some errors are expected and normal. 4929 */ 4930 4931 if (error == -ENOENT && noent) { 4932 log_debug(ls, "receive %d no %x remote %d %x saved_seq %u", 4933 le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid), 4934 le32_to_cpu(ms->m_header.h_nodeid), 4935 le32_to_cpu(ms->m_lkid), saved_seq); 4936 } else if (error == -ENOENT) { 4937 log_error(ls, "receive %d no %x remote %d %x saved_seq %u", 4938 le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid), 4939 le32_to_cpu(ms->m_header.h_nodeid), 4940 le32_to_cpu(ms->m_lkid), saved_seq); 4941 4942 if (ms->m_type == cpu_to_le32(DLM_MSG_CONVERT)) 4943 dlm_dump_rsb_hash(ls, le32_to_cpu(ms->m_hash)); 4944 } 4945 4946 if (error == -EINVAL) { 4947 log_error(ls, "receive %d inval from %d lkid %x remid %x " 4948 "saved_seq %u", 4949 le32_to_cpu(ms->m_type), 4950 le32_to_cpu(ms->m_header.h_nodeid), 4951 le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid), 4952 saved_seq); 4953 } 4954 } 4955 4956 /* If the lockspace is in recovery mode (locking stopped), then normal 4957 messages are saved on the requestqueue for processing after recovery is 4958 done. When not in recovery mode, we wait for dlm_recoverd to drain saved 4959 messages off the requestqueue before we process new ones. This occurs right 4960 after recovery completes when we transition from saving all messages on 4961 requestqueue, to processing all the saved messages, to processing new 4962 messages as they arrive. */ 4963 4964 static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms, 4965 int nodeid) 4966 { 4967 if (dlm_locking_stopped(ls)) { 4968 /* If we were a member of this lockspace, left, and rejoined, 4969 other nodes may still be sending us messages from the 4970 lockspace generation before we left. */ 4971 if (!ls->ls_generation) { 4972 log_limit(ls, "receive %d from %d ignore old gen", 4973 le32_to_cpu(ms->m_type), nodeid); 4974 return; 4975 } 4976 4977 dlm_add_requestqueue(ls, nodeid, ms); 4978 } else { 4979 dlm_wait_requestqueue(ls); 4980 _receive_message(ls, ms, 0); 4981 } 4982 } 4983 4984 /* This is called by dlm_recoverd to process messages that were saved on 4985 the requestqueue. */ 4986 4987 void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms, 4988 uint32_t saved_seq) 4989 { 4990 _receive_message(ls, ms, saved_seq); 4991 } 4992 4993 /* This is called by the midcomms layer when something is received for 4994 the lockspace. It could be either a MSG (normal message sent as part of 4995 standard locking activity) or an RCOM (recovery message sent as part of 4996 lockspace recovery). */ 4997 4998 void dlm_receive_buffer(union dlm_packet *p, int nodeid) 4999 { 5000 struct dlm_header *hd = &p->header; 5001 struct dlm_ls *ls; 5002 int type = 0; 5003 5004 switch (hd->h_cmd) { 5005 case DLM_MSG: 5006 type = le32_to_cpu(p->message.m_type); 5007 break; 5008 case DLM_RCOM: 5009 type = le32_to_cpu(p->rcom.rc_type); 5010 break; 5011 default: 5012 log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid); 5013 return; 5014 } 5015 5016 if (le32_to_cpu(hd->h_nodeid) != nodeid) { 5017 log_print("invalid h_nodeid %d from %d lockspace %x", 5018 le32_to_cpu(hd->h_nodeid), nodeid, 5019 le32_to_cpu(hd->u.h_lockspace)); 5020 return; 5021 } 5022 5023 ls = dlm_find_lockspace_global(le32_to_cpu(hd->u.h_lockspace)); 5024 if (!ls) { 5025 if (dlm_config.ci_log_debug) { 5026 printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace " 5027 "%u from %d cmd %d type %d\n", 5028 le32_to_cpu(hd->u.h_lockspace), nodeid, 5029 hd->h_cmd, type); 5030 } 5031 5032 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS) 5033 dlm_send_ls_not_ready(nodeid, &p->rcom); 5034 return; 5035 } 5036 5037 /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to 5038 be inactive (in this ls) before transitioning to recovery mode */ 5039 5040 down_read(&ls->ls_recv_active); 5041 if (hd->h_cmd == DLM_MSG) 5042 dlm_receive_message(ls, &p->message, nodeid); 5043 else if (hd->h_cmd == DLM_RCOM) 5044 dlm_receive_rcom(ls, &p->rcom, nodeid); 5045 else 5046 log_error(ls, "invalid h_cmd %d from %d lockspace %x", 5047 hd->h_cmd, nodeid, le32_to_cpu(hd->u.h_lockspace)); 5048 up_read(&ls->ls_recv_active); 5049 5050 dlm_put_lockspace(ls); 5051 } 5052 5053 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb, 5054 struct dlm_message *ms_stub) 5055 { 5056 if (middle_conversion(lkb)) { 5057 hold_lkb(lkb); 5058 memset(ms_stub, 0, sizeof(struct dlm_message)); 5059 ms_stub->m_flags = cpu_to_le32(DLM_IFL_STUB_MS); 5060 ms_stub->m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY); 5061 ms_stub->m_result = cpu_to_le32(to_dlm_errno(-EINPROGRESS)); 5062 ms_stub->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid); 5063 _receive_convert_reply(lkb, ms_stub); 5064 5065 /* Same special case as in receive_rcom_lock_args() */ 5066 lkb->lkb_grmode = DLM_LOCK_IV; 5067 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT); 5068 unhold_lkb(lkb); 5069 5070 } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) { 5071 lkb->lkb_flags |= DLM_IFL_RESEND; 5072 } 5073 5074 /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down 5075 conversions are async; there's no reply from the remote master */ 5076 } 5077 5078 /* A waiting lkb needs recovery if the master node has failed, or 5079 the master node is changing (only when no directory is used) */ 5080 5081 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb, 5082 int dir_nodeid) 5083 { 5084 if (dlm_no_directory(ls)) 5085 return 1; 5086 5087 if (dlm_is_removed(ls, lkb->lkb_wait_nodeid)) 5088 return 1; 5089 5090 return 0; 5091 } 5092 5093 /* Recovery for locks that are waiting for replies from nodes that are now 5094 gone. We can just complete unlocks and cancels by faking a reply from the 5095 dead node. Requests and up-conversions we flag to be resent after 5096 recovery. Down-conversions can just be completed with a fake reply like 5097 unlocks. Conversions between PR and CW need special attention. */ 5098 5099 void dlm_recover_waiters_pre(struct dlm_ls *ls) 5100 { 5101 struct dlm_lkb *lkb, *safe; 5102 struct dlm_message *ms_stub; 5103 int wait_type, stub_unlock_result, stub_cancel_result; 5104 int dir_nodeid; 5105 5106 ms_stub = kmalloc(sizeof(*ms_stub), GFP_KERNEL); 5107 if (!ms_stub) 5108 return; 5109 5110 mutex_lock(&ls->ls_waiters_mutex); 5111 5112 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) { 5113 5114 dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource); 5115 5116 /* exclude debug messages about unlocks because there can be so 5117 many and they aren't very interesting */ 5118 5119 if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) { 5120 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d " 5121 "lkb_nodeid %d wait_nodeid %d dir_nodeid %d", 5122 lkb->lkb_id, 5123 lkb->lkb_remid, 5124 lkb->lkb_wait_type, 5125 lkb->lkb_resource->res_nodeid, 5126 lkb->lkb_nodeid, 5127 lkb->lkb_wait_nodeid, 5128 dir_nodeid); 5129 } 5130 5131 /* all outstanding lookups, regardless of destination will be 5132 resent after recovery is done */ 5133 5134 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) { 5135 lkb->lkb_flags |= DLM_IFL_RESEND; 5136 continue; 5137 } 5138 5139 if (!waiter_needs_recovery(ls, lkb, dir_nodeid)) 5140 continue; 5141 5142 wait_type = lkb->lkb_wait_type; 5143 stub_unlock_result = -DLM_EUNLOCK; 5144 stub_cancel_result = -DLM_ECANCEL; 5145 5146 /* Main reply may have been received leaving a zero wait_type, 5147 but a reply for the overlapping op may not have been 5148 received. In that case we need to fake the appropriate 5149 reply for the overlap op. */ 5150 5151 if (!wait_type) { 5152 if (is_overlap_cancel(lkb)) { 5153 wait_type = DLM_MSG_CANCEL; 5154 if (lkb->lkb_grmode == DLM_LOCK_IV) 5155 stub_cancel_result = 0; 5156 } 5157 if (is_overlap_unlock(lkb)) { 5158 wait_type = DLM_MSG_UNLOCK; 5159 if (lkb->lkb_grmode == DLM_LOCK_IV) 5160 stub_unlock_result = -ENOENT; 5161 } 5162 5163 log_debug(ls, "rwpre overlap %x %x %d %d %d", 5164 lkb->lkb_id, lkb->lkb_flags, wait_type, 5165 stub_cancel_result, stub_unlock_result); 5166 } 5167 5168 switch (wait_type) { 5169 5170 case DLM_MSG_REQUEST: 5171 lkb->lkb_flags |= DLM_IFL_RESEND; 5172 break; 5173 5174 case DLM_MSG_CONVERT: 5175 recover_convert_waiter(ls, lkb, ms_stub); 5176 break; 5177 5178 case DLM_MSG_UNLOCK: 5179 hold_lkb(lkb); 5180 memset(ms_stub, 0, sizeof(struct dlm_message)); 5181 ms_stub->m_flags = cpu_to_le32(DLM_IFL_STUB_MS); 5182 ms_stub->m_type = cpu_to_le32(DLM_MSG_UNLOCK_REPLY); 5183 ms_stub->m_result = cpu_to_le32(to_dlm_errno(stub_unlock_result)); 5184 ms_stub->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid); 5185 _receive_unlock_reply(lkb, ms_stub); 5186 dlm_put_lkb(lkb); 5187 break; 5188 5189 case DLM_MSG_CANCEL: 5190 hold_lkb(lkb); 5191 memset(ms_stub, 0, sizeof(struct dlm_message)); 5192 ms_stub->m_flags = cpu_to_le32(DLM_IFL_STUB_MS); 5193 ms_stub->m_type = cpu_to_le32(DLM_MSG_CANCEL_REPLY); 5194 ms_stub->m_result = cpu_to_le32(to_dlm_errno(stub_cancel_result)); 5195 ms_stub->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid); 5196 _receive_cancel_reply(lkb, ms_stub); 5197 dlm_put_lkb(lkb); 5198 break; 5199 5200 default: 5201 log_error(ls, "invalid lkb wait_type %d %d", 5202 lkb->lkb_wait_type, wait_type); 5203 } 5204 schedule(); 5205 } 5206 mutex_unlock(&ls->ls_waiters_mutex); 5207 kfree(ms_stub); 5208 } 5209 5210 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls) 5211 { 5212 struct dlm_lkb *lkb = NULL, *iter; 5213 5214 mutex_lock(&ls->ls_waiters_mutex); 5215 list_for_each_entry(iter, &ls->ls_waiters, lkb_wait_reply) { 5216 if (iter->lkb_flags & DLM_IFL_RESEND) { 5217 hold_lkb(iter); 5218 lkb = iter; 5219 break; 5220 } 5221 } 5222 mutex_unlock(&ls->ls_waiters_mutex); 5223 5224 return lkb; 5225 } 5226 5227 /* Deal with lookups and lkb's marked RESEND from _pre. We may now be the 5228 master or dir-node for r. Processing the lkb may result in it being placed 5229 back on waiters. */ 5230 5231 /* We do this after normal locking has been enabled and any saved messages 5232 (in requestqueue) have been processed. We should be confident that at 5233 this point we won't get or process a reply to any of these waiting 5234 operations. But, new ops may be coming in on the rsbs/locks here from 5235 userspace or remotely. */ 5236 5237 /* there may have been an overlap unlock/cancel prior to recovery or after 5238 recovery. if before, the lkb may still have a pos wait_count; if after, the 5239 overlap flag would just have been set and nothing new sent. we can be 5240 confident here than any replies to either the initial op or overlap ops 5241 prior to recovery have been received. */ 5242 5243 int dlm_recover_waiters_post(struct dlm_ls *ls) 5244 { 5245 struct dlm_lkb *lkb; 5246 struct dlm_rsb *r; 5247 int error = 0, mstype, err, oc, ou; 5248 5249 while (1) { 5250 if (dlm_locking_stopped(ls)) { 5251 log_debug(ls, "recover_waiters_post aborted"); 5252 error = -EINTR; 5253 break; 5254 } 5255 5256 lkb = find_resend_waiter(ls); 5257 if (!lkb) 5258 break; 5259 5260 r = lkb->lkb_resource; 5261 hold_rsb(r); 5262 lock_rsb(r); 5263 5264 mstype = lkb->lkb_wait_type; 5265 oc = is_overlap_cancel(lkb); 5266 ou = is_overlap_unlock(lkb); 5267 err = 0; 5268 5269 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d " 5270 "lkb_nodeid %d wait_nodeid %d dir_nodeid %d " 5271 "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype, 5272 r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid, 5273 dlm_dir_nodeid(r), oc, ou); 5274 5275 /* At this point we assume that we won't get a reply to any 5276 previous op or overlap op on this lock. First, do a big 5277 remove_from_waiters() for all previous ops. */ 5278 5279 lkb->lkb_flags &= ~DLM_IFL_RESEND; 5280 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK; 5281 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; 5282 lkb->lkb_wait_type = 0; 5283 /* drop all wait_count references we still 5284 * hold a reference for this iteration. 5285 */ 5286 while (lkb->lkb_wait_count) { 5287 lkb->lkb_wait_count--; 5288 unhold_lkb(lkb); 5289 } 5290 mutex_lock(&ls->ls_waiters_mutex); 5291 list_del_init(&lkb->lkb_wait_reply); 5292 mutex_unlock(&ls->ls_waiters_mutex); 5293 5294 if (oc || ou) { 5295 /* do an unlock or cancel instead of resending */ 5296 switch (mstype) { 5297 case DLM_MSG_LOOKUP: 5298 case DLM_MSG_REQUEST: 5299 queue_cast(r, lkb, ou ? -DLM_EUNLOCK : 5300 -DLM_ECANCEL); 5301 unhold_lkb(lkb); /* undoes create_lkb() */ 5302 break; 5303 case DLM_MSG_CONVERT: 5304 if (oc) { 5305 queue_cast(r, lkb, -DLM_ECANCEL); 5306 } else { 5307 lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK; 5308 _unlock_lock(r, lkb); 5309 } 5310 break; 5311 default: 5312 err = 1; 5313 } 5314 } else { 5315 switch (mstype) { 5316 case DLM_MSG_LOOKUP: 5317 case DLM_MSG_REQUEST: 5318 _request_lock(r, lkb); 5319 if (is_master(r)) 5320 confirm_master(r, 0); 5321 break; 5322 case DLM_MSG_CONVERT: 5323 _convert_lock(r, lkb); 5324 break; 5325 default: 5326 err = 1; 5327 } 5328 } 5329 5330 if (err) { 5331 log_error(ls, "waiter %x msg %d r_nodeid %d " 5332 "dir_nodeid %d overlap %d %d", 5333 lkb->lkb_id, mstype, r->res_nodeid, 5334 dlm_dir_nodeid(r), oc, ou); 5335 } 5336 unlock_rsb(r); 5337 put_rsb(r); 5338 dlm_put_lkb(lkb); 5339 } 5340 5341 return error; 5342 } 5343 5344 static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r, 5345 struct list_head *list) 5346 { 5347 struct dlm_lkb *lkb, *safe; 5348 5349 list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) { 5350 if (!is_master_copy(lkb)) 5351 continue; 5352 5353 /* don't purge lkbs we've added in recover_master_copy for 5354 the current recovery seq */ 5355 5356 if (lkb->lkb_recover_seq == ls->ls_recover_seq) 5357 continue; 5358 5359 del_lkb(r, lkb); 5360 5361 /* this put should free the lkb */ 5362 if (!dlm_put_lkb(lkb)) 5363 log_error(ls, "purged mstcpy lkb not released"); 5364 } 5365 } 5366 5367 void dlm_purge_mstcpy_locks(struct dlm_rsb *r) 5368 { 5369 struct dlm_ls *ls = r->res_ls; 5370 5371 purge_mstcpy_list(ls, r, &r->res_grantqueue); 5372 purge_mstcpy_list(ls, r, &r->res_convertqueue); 5373 purge_mstcpy_list(ls, r, &r->res_waitqueue); 5374 } 5375 5376 static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r, 5377 struct list_head *list, 5378 int nodeid_gone, unsigned int *count) 5379 { 5380 struct dlm_lkb *lkb, *safe; 5381 5382 list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) { 5383 if (!is_master_copy(lkb)) 5384 continue; 5385 5386 if ((lkb->lkb_nodeid == nodeid_gone) || 5387 dlm_is_removed(ls, lkb->lkb_nodeid)) { 5388 5389 /* tell recover_lvb to invalidate the lvb 5390 because a node holding EX/PW failed */ 5391 if ((lkb->lkb_exflags & DLM_LKF_VALBLK) && 5392 (lkb->lkb_grmode >= DLM_LOCK_PW)) { 5393 rsb_set_flag(r, RSB_RECOVER_LVB_INVAL); 5394 } 5395 5396 del_lkb(r, lkb); 5397 5398 /* this put should free the lkb */ 5399 if (!dlm_put_lkb(lkb)) 5400 log_error(ls, "purged dead lkb not released"); 5401 5402 rsb_set_flag(r, RSB_RECOVER_GRANT); 5403 5404 (*count)++; 5405 } 5406 } 5407 } 5408 5409 /* Get rid of locks held by nodes that are gone. */ 5410 5411 void dlm_recover_purge(struct dlm_ls *ls) 5412 { 5413 struct dlm_rsb *r; 5414 struct dlm_member *memb; 5415 int nodes_count = 0; 5416 int nodeid_gone = 0; 5417 unsigned int lkb_count = 0; 5418 5419 /* cache one removed nodeid to optimize the common 5420 case of a single node removed */ 5421 5422 list_for_each_entry(memb, &ls->ls_nodes_gone, list) { 5423 nodes_count++; 5424 nodeid_gone = memb->nodeid; 5425 } 5426 5427 if (!nodes_count) 5428 return; 5429 5430 down_write(&ls->ls_root_sem); 5431 list_for_each_entry(r, &ls->ls_root_list, res_root_list) { 5432 hold_rsb(r); 5433 lock_rsb(r); 5434 if (is_master(r)) { 5435 purge_dead_list(ls, r, &r->res_grantqueue, 5436 nodeid_gone, &lkb_count); 5437 purge_dead_list(ls, r, &r->res_convertqueue, 5438 nodeid_gone, &lkb_count); 5439 purge_dead_list(ls, r, &r->res_waitqueue, 5440 nodeid_gone, &lkb_count); 5441 } 5442 unlock_rsb(r); 5443 unhold_rsb(r); 5444 cond_resched(); 5445 } 5446 up_write(&ls->ls_root_sem); 5447 5448 if (lkb_count) 5449 log_rinfo(ls, "dlm_recover_purge %u locks for %u nodes", 5450 lkb_count, nodes_count); 5451 } 5452 5453 static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket) 5454 { 5455 struct rb_node *n; 5456 struct dlm_rsb *r; 5457 5458 spin_lock(&ls->ls_rsbtbl[bucket].lock); 5459 for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) { 5460 r = rb_entry(n, struct dlm_rsb, res_hashnode); 5461 5462 if (!rsb_flag(r, RSB_RECOVER_GRANT)) 5463 continue; 5464 if (!is_master(r)) { 5465 rsb_clear_flag(r, RSB_RECOVER_GRANT); 5466 continue; 5467 } 5468 hold_rsb(r); 5469 spin_unlock(&ls->ls_rsbtbl[bucket].lock); 5470 return r; 5471 } 5472 spin_unlock(&ls->ls_rsbtbl[bucket].lock); 5473 return NULL; 5474 } 5475 5476 /* 5477 * Attempt to grant locks on resources that we are the master of. 5478 * Locks may have become grantable during recovery because locks 5479 * from departed nodes have been purged (or not rebuilt), allowing 5480 * previously blocked locks to now be granted. The subset of rsb's 5481 * we are interested in are those with lkb's on either the convert or 5482 * waiting queues. 5483 * 5484 * Simplest would be to go through each master rsb and check for non-empty 5485 * convert or waiting queues, and attempt to grant on those rsbs. 5486 * Checking the queues requires lock_rsb, though, for which we'd need 5487 * to release the rsbtbl lock. This would make iterating through all 5488 * rsb's very inefficient. So, we rely on earlier recovery routines 5489 * to set RECOVER_GRANT on any rsb's that we should attempt to grant 5490 * locks for. 5491 */ 5492 5493 void dlm_recover_grant(struct dlm_ls *ls) 5494 { 5495 struct dlm_rsb *r; 5496 int bucket = 0; 5497 unsigned int count = 0; 5498 unsigned int rsb_count = 0; 5499 unsigned int lkb_count = 0; 5500 5501 while (1) { 5502 r = find_grant_rsb(ls, bucket); 5503 if (!r) { 5504 if (bucket == ls->ls_rsbtbl_size - 1) 5505 break; 5506 bucket++; 5507 continue; 5508 } 5509 rsb_count++; 5510 count = 0; 5511 lock_rsb(r); 5512 /* the RECOVER_GRANT flag is checked in the grant path */ 5513 grant_pending_locks(r, &count); 5514 rsb_clear_flag(r, RSB_RECOVER_GRANT); 5515 lkb_count += count; 5516 confirm_master(r, 0); 5517 unlock_rsb(r); 5518 put_rsb(r); 5519 cond_resched(); 5520 } 5521 5522 if (lkb_count) 5523 log_rinfo(ls, "dlm_recover_grant %u locks on %u resources", 5524 lkb_count, rsb_count); 5525 } 5526 5527 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid, 5528 uint32_t remid) 5529 { 5530 struct dlm_lkb *lkb; 5531 5532 list_for_each_entry(lkb, head, lkb_statequeue) { 5533 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid) 5534 return lkb; 5535 } 5536 return NULL; 5537 } 5538 5539 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid, 5540 uint32_t remid) 5541 { 5542 struct dlm_lkb *lkb; 5543 5544 lkb = search_remid_list(&r->res_grantqueue, nodeid, remid); 5545 if (lkb) 5546 return lkb; 5547 lkb = search_remid_list(&r->res_convertqueue, nodeid, remid); 5548 if (lkb) 5549 return lkb; 5550 lkb = search_remid_list(&r->res_waitqueue, nodeid, remid); 5551 if (lkb) 5552 return lkb; 5553 return NULL; 5554 } 5555 5556 /* needs at least dlm_rcom + rcom_lock */ 5557 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, 5558 struct dlm_rsb *r, struct dlm_rcom *rc) 5559 { 5560 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf; 5561 5562 lkb->lkb_nodeid = le32_to_cpu(rc->rc_header.h_nodeid); 5563 lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid); 5564 lkb->lkb_remid = le32_to_cpu(rl->rl_lkid); 5565 lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags); 5566 lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF; 5567 lkb->lkb_flags |= DLM_IFL_MSTCPY; 5568 lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq); 5569 lkb->lkb_rqmode = rl->rl_rqmode; 5570 lkb->lkb_grmode = rl->rl_grmode; 5571 /* don't set lkb_status because add_lkb wants to itself */ 5572 5573 lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL; 5574 lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL; 5575 5576 if (lkb->lkb_exflags & DLM_LKF_VALBLK) { 5577 int lvblen = le16_to_cpu(rc->rc_header.h_length) - 5578 sizeof(struct dlm_rcom) - sizeof(struct rcom_lock); 5579 if (lvblen > ls->ls_lvblen) 5580 return -EINVAL; 5581 lkb->lkb_lvbptr = dlm_allocate_lvb(ls); 5582 if (!lkb->lkb_lvbptr) 5583 return -ENOMEM; 5584 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen); 5585 } 5586 5587 /* Conversions between PR and CW (middle modes) need special handling. 5588 The real granted mode of these converting locks cannot be determined 5589 until all locks have been rebuilt on the rsb (recover_conversion) */ 5590 5591 if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) && 5592 middle_conversion(lkb)) { 5593 rl->rl_status = DLM_LKSTS_CONVERT; 5594 lkb->lkb_grmode = DLM_LOCK_IV; 5595 rsb_set_flag(r, RSB_RECOVER_CONVERT); 5596 } 5597 5598 return 0; 5599 } 5600 5601 /* This lkb may have been recovered in a previous aborted recovery so we need 5602 to check if the rsb already has an lkb with the given remote nodeid/lkid. 5603 If so we just send back a standard reply. If not, we create a new lkb with 5604 the given values and send back our lkid. We send back our lkid by sending 5605 back the rcom_lock struct we got but with the remid field filled in. */ 5606 5607 /* needs at least dlm_rcom + rcom_lock */ 5608 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc) 5609 { 5610 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf; 5611 struct dlm_rsb *r; 5612 struct dlm_lkb *lkb; 5613 uint32_t remid = 0; 5614 int from_nodeid = le32_to_cpu(rc->rc_header.h_nodeid); 5615 int error; 5616 5617 if (rl->rl_parent_lkid) { 5618 error = -EOPNOTSUPP; 5619 goto out; 5620 } 5621 5622 remid = le32_to_cpu(rl->rl_lkid); 5623 5624 /* In general we expect the rsb returned to be R_MASTER, but we don't 5625 have to require it. Recovery of masters on one node can overlap 5626 recovery of locks on another node, so one node can send us MSTCPY 5627 locks before we've made ourselves master of this rsb. We can still 5628 add new MSTCPY locks that we receive here without any harm; when 5629 we make ourselves master, dlm_recover_masters() won't touch the 5630 MSTCPY locks we've received early. */ 5631 5632 error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen), 5633 from_nodeid, R_RECEIVE_RECOVER, &r); 5634 if (error) 5635 goto out; 5636 5637 lock_rsb(r); 5638 5639 if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) { 5640 log_error(ls, "dlm_recover_master_copy remote %d %x not dir", 5641 from_nodeid, remid); 5642 error = -EBADR; 5643 goto out_unlock; 5644 } 5645 5646 lkb = search_remid(r, from_nodeid, remid); 5647 if (lkb) { 5648 error = -EEXIST; 5649 goto out_remid; 5650 } 5651 5652 error = create_lkb(ls, &lkb); 5653 if (error) 5654 goto out_unlock; 5655 5656 error = receive_rcom_lock_args(ls, lkb, r, rc); 5657 if (error) { 5658 __put_lkb(ls, lkb); 5659 goto out_unlock; 5660 } 5661 5662 attach_lkb(r, lkb); 5663 add_lkb(r, lkb, rl->rl_status); 5664 ls->ls_recover_locks_in++; 5665 5666 if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue)) 5667 rsb_set_flag(r, RSB_RECOVER_GRANT); 5668 5669 out_remid: 5670 /* this is the new value returned to the lock holder for 5671 saving in its process-copy lkb */ 5672 rl->rl_remid = cpu_to_le32(lkb->lkb_id); 5673 5674 lkb->lkb_recover_seq = ls->ls_recover_seq; 5675 5676 out_unlock: 5677 unlock_rsb(r); 5678 put_rsb(r); 5679 out: 5680 if (error && error != -EEXIST) 5681 log_rinfo(ls, "dlm_recover_master_copy remote %d %x error %d", 5682 from_nodeid, remid, error); 5683 rl->rl_result = cpu_to_le32(error); 5684 return error; 5685 } 5686 5687 /* needs at least dlm_rcom + rcom_lock */ 5688 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc) 5689 { 5690 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf; 5691 struct dlm_rsb *r; 5692 struct dlm_lkb *lkb; 5693 uint32_t lkid, remid; 5694 int error, result; 5695 5696 lkid = le32_to_cpu(rl->rl_lkid); 5697 remid = le32_to_cpu(rl->rl_remid); 5698 result = le32_to_cpu(rl->rl_result); 5699 5700 error = find_lkb(ls, lkid, &lkb); 5701 if (error) { 5702 log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d", 5703 lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid, 5704 result); 5705 return error; 5706 } 5707 5708 r = lkb->lkb_resource; 5709 hold_rsb(r); 5710 lock_rsb(r); 5711 5712 if (!is_process_copy(lkb)) { 5713 log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d", 5714 lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid, 5715 result); 5716 dlm_dump_rsb(r); 5717 unlock_rsb(r); 5718 put_rsb(r); 5719 dlm_put_lkb(lkb); 5720 return -EINVAL; 5721 } 5722 5723 switch (result) { 5724 case -EBADR: 5725 /* There's a chance the new master received our lock before 5726 dlm_recover_master_reply(), this wouldn't happen if we did 5727 a barrier between recover_masters and recover_locks. */ 5728 5729 log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d", 5730 lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid, 5731 result); 5732 5733 dlm_send_rcom_lock(r, lkb); 5734 goto out; 5735 case -EEXIST: 5736 case 0: 5737 lkb->lkb_remid = remid; 5738 break; 5739 default: 5740 log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk", 5741 lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid, 5742 result); 5743 } 5744 5745 /* an ack for dlm_recover_locks() which waits for replies from 5746 all the locks it sends to new masters */ 5747 dlm_recovered_lock(r); 5748 out: 5749 unlock_rsb(r); 5750 put_rsb(r); 5751 dlm_put_lkb(lkb); 5752 5753 return 0; 5754 } 5755 5756 #ifdef CONFIG_DLM_DEPRECATED_API 5757 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua, 5758 int mode, uint32_t flags, void *name, unsigned int namelen, 5759 unsigned long timeout_cs) 5760 #else 5761 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua, 5762 int mode, uint32_t flags, void *name, unsigned int namelen) 5763 #endif 5764 { 5765 struct dlm_lkb *lkb; 5766 struct dlm_args args; 5767 bool do_put = true; 5768 int error; 5769 5770 dlm_lock_recovery(ls); 5771 5772 error = create_lkb(ls, &lkb); 5773 if (error) { 5774 kfree(ua); 5775 goto out; 5776 } 5777 5778 trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags); 5779 5780 if (flags & DLM_LKF_VALBLK) { 5781 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS); 5782 if (!ua->lksb.sb_lvbptr) { 5783 kfree(ua); 5784 error = -ENOMEM; 5785 goto out_put; 5786 } 5787 } 5788 #ifdef CONFIG_DLM_DEPRECATED_API 5789 error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs, 5790 fake_astfn, ua, fake_bastfn, &args); 5791 #else 5792 error = set_lock_args(mode, &ua->lksb, flags, namelen, fake_astfn, ua, 5793 fake_bastfn, &args); 5794 #endif 5795 if (error) { 5796 kfree(ua->lksb.sb_lvbptr); 5797 ua->lksb.sb_lvbptr = NULL; 5798 kfree(ua); 5799 goto out_put; 5800 } 5801 5802 /* After ua is attached to lkb it will be freed by dlm_free_lkb(). 5803 When DLM_IFL_USER is set, the dlm knows that this is a userspace 5804 lock and that lkb_astparam is the dlm_user_args structure. */ 5805 lkb->lkb_flags |= DLM_IFL_USER; 5806 error = request_lock(ls, lkb, name, namelen, &args); 5807 5808 switch (error) { 5809 case 0: 5810 break; 5811 case -EINPROGRESS: 5812 error = 0; 5813 break; 5814 case -EAGAIN: 5815 error = 0; 5816 fallthrough; 5817 default: 5818 goto out_put; 5819 } 5820 5821 /* add this new lkb to the per-process list of locks */ 5822 spin_lock(&ua->proc->locks_spin); 5823 hold_lkb(lkb); 5824 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks); 5825 spin_unlock(&ua->proc->locks_spin); 5826 do_put = false; 5827 out_put: 5828 trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, false); 5829 if (do_put) 5830 __put_lkb(ls, lkb); 5831 out: 5832 dlm_unlock_recovery(ls); 5833 return error; 5834 } 5835 5836 #ifdef CONFIG_DLM_DEPRECATED_API 5837 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, 5838 int mode, uint32_t flags, uint32_t lkid, char *lvb_in, 5839 unsigned long timeout_cs) 5840 #else 5841 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, 5842 int mode, uint32_t flags, uint32_t lkid, char *lvb_in) 5843 #endif 5844 { 5845 struct dlm_lkb *lkb; 5846 struct dlm_args args; 5847 struct dlm_user_args *ua; 5848 int error; 5849 5850 dlm_lock_recovery(ls); 5851 5852 error = find_lkb(ls, lkid, &lkb); 5853 if (error) 5854 goto out; 5855 5856 trace_dlm_lock_start(ls, lkb, NULL, 0, mode, flags); 5857 5858 /* user can change the params on its lock when it converts it, or 5859 add an lvb that didn't exist before */ 5860 5861 ua = lkb->lkb_ua; 5862 5863 if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) { 5864 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS); 5865 if (!ua->lksb.sb_lvbptr) { 5866 error = -ENOMEM; 5867 goto out_put; 5868 } 5869 } 5870 if (lvb_in && ua->lksb.sb_lvbptr) 5871 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN); 5872 5873 ua->xid = ua_tmp->xid; 5874 ua->castparam = ua_tmp->castparam; 5875 ua->castaddr = ua_tmp->castaddr; 5876 ua->bastparam = ua_tmp->bastparam; 5877 ua->bastaddr = ua_tmp->bastaddr; 5878 ua->user_lksb = ua_tmp->user_lksb; 5879 5880 #ifdef CONFIG_DLM_DEPRECATED_API 5881 error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs, 5882 fake_astfn, ua, fake_bastfn, &args); 5883 #else 5884 error = set_lock_args(mode, &ua->lksb, flags, 0, fake_astfn, ua, 5885 fake_bastfn, &args); 5886 #endif 5887 if (error) 5888 goto out_put; 5889 5890 error = convert_lock(ls, lkb, &args); 5891 5892 if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK) 5893 error = 0; 5894 out_put: 5895 trace_dlm_lock_end(ls, lkb, NULL, 0, mode, flags, error, false); 5896 dlm_put_lkb(lkb); 5897 out: 5898 dlm_unlock_recovery(ls); 5899 kfree(ua_tmp); 5900 return error; 5901 } 5902 5903 /* 5904 * The caller asks for an orphan lock on a given resource with a given mode. 5905 * If a matching lock exists, it's moved to the owner's list of locks and 5906 * the lkid is returned. 5907 */ 5908 5909 int dlm_user_adopt_orphan(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, 5910 int mode, uint32_t flags, void *name, unsigned int namelen, 5911 uint32_t *lkid) 5912 { 5913 struct dlm_lkb *lkb = NULL, *iter; 5914 struct dlm_user_args *ua; 5915 int found_other_mode = 0; 5916 int rv = 0; 5917 5918 mutex_lock(&ls->ls_orphans_mutex); 5919 list_for_each_entry(iter, &ls->ls_orphans, lkb_ownqueue) { 5920 if (iter->lkb_resource->res_length != namelen) 5921 continue; 5922 if (memcmp(iter->lkb_resource->res_name, name, namelen)) 5923 continue; 5924 if (iter->lkb_grmode != mode) { 5925 found_other_mode = 1; 5926 continue; 5927 } 5928 5929 lkb = iter; 5930 list_del_init(&iter->lkb_ownqueue); 5931 iter->lkb_flags &= ~DLM_IFL_ORPHAN; 5932 *lkid = iter->lkb_id; 5933 break; 5934 } 5935 mutex_unlock(&ls->ls_orphans_mutex); 5936 5937 if (!lkb && found_other_mode) { 5938 rv = -EAGAIN; 5939 goto out; 5940 } 5941 5942 if (!lkb) { 5943 rv = -ENOENT; 5944 goto out; 5945 } 5946 5947 lkb->lkb_exflags = flags; 5948 lkb->lkb_ownpid = (int) current->pid; 5949 5950 ua = lkb->lkb_ua; 5951 5952 ua->proc = ua_tmp->proc; 5953 ua->xid = ua_tmp->xid; 5954 ua->castparam = ua_tmp->castparam; 5955 ua->castaddr = ua_tmp->castaddr; 5956 ua->bastparam = ua_tmp->bastparam; 5957 ua->bastaddr = ua_tmp->bastaddr; 5958 ua->user_lksb = ua_tmp->user_lksb; 5959 5960 /* 5961 * The lkb reference from the ls_orphans list was not 5962 * removed above, and is now considered the reference 5963 * for the proc locks list. 5964 */ 5965 5966 spin_lock(&ua->proc->locks_spin); 5967 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks); 5968 spin_unlock(&ua->proc->locks_spin); 5969 out: 5970 kfree(ua_tmp); 5971 return rv; 5972 } 5973 5974 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, 5975 uint32_t flags, uint32_t lkid, char *lvb_in) 5976 { 5977 struct dlm_lkb *lkb; 5978 struct dlm_args args; 5979 struct dlm_user_args *ua; 5980 int error; 5981 5982 dlm_lock_recovery(ls); 5983 5984 error = find_lkb(ls, lkid, &lkb); 5985 if (error) 5986 goto out; 5987 5988 trace_dlm_unlock_start(ls, lkb, flags); 5989 5990 ua = lkb->lkb_ua; 5991 5992 if (lvb_in && ua->lksb.sb_lvbptr) 5993 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN); 5994 if (ua_tmp->castparam) 5995 ua->castparam = ua_tmp->castparam; 5996 ua->user_lksb = ua_tmp->user_lksb; 5997 5998 error = set_unlock_args(flags, ua, &args); 5999 if (error) 6000 goto out_put; 6001 6002 error = unlock_lock(ls, lkb, &args); 6003 6004 if (error == -DLM_EUNLOCK) 6005 error = 0; 6006 /* from validate_unlock_args() */ 6007 if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK)) 6008 error = 0; 6009 if (error) 6010 goto out_put; 6011 6012 spin_lock(&ua->proc->locks_spin); 6013 /* dlm_user_add_cb() may have already taken lkb off the proc list */ 6014 if (!list_empty(&lkb->lkb_ownqueue)) 6015 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking); 6016 spin_unlock(&ua->proc->locks_spin); 6017 out_put: 6018 trace_dlm_unlock_end(ls, lkb, flags, error); 6019 dlm_put_lkb(lkb); 6020 out: 6021 dlm_unlock_recovery(ls); 6022 kfree(ua_tmp); 6023 return error; 6024 } 6025 6026 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, 6027 uint32_t flags, uint32_t lkid) 6028 { 6029 struct dlm_lkb *lkb; 6030 struct dlm_args args; 6031 struct dlm_user_args *ua; 6032 int error; 6033 6034 dlm_lock_recovery(ls); 6035 6036 error = find_lkb(ls, lkid, &lkb); 6037 if (error) 6038 goto out; 6039 6040 trace_dlm_unlock_start(ls, lkb, flags); 6041 6042 ua = lkb->lkb_ua; 6043 if (ua_tmp->castparam) 6044 ua->castparam = ua_tmp->castparam; 6045 ua->user_lksb = ua_tmp->user_lksb; 6046 6047 error = set_unlock_args(flags, ua, &args); 6048 if (error) 6049 goto out_put; 6050 6051 error = cancel_lock(ls, lkb, &args); 6052 6053 if (error == -DLM_ECANCEL) 6054 error = 0; 6055 /* from validate_unlock_args() */ 6056 if (error == -EBUSY) 6057 error = 0; 6058 out_put: 6059 trace_dlm_unlock_end(ls, lkb, flags, error); 6060 dlm_put_lkb(lkb); 6061 out: 6062 dlm_unlock_recovery(ls); 6063 kfree(ua_tmp); 6064 return error; 6065 } 6066 6067 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid) 6068 { 6069 struct dlm_lkb *lkb; 6070 struct dlm_args args; 6071 struct dlm_user_args *ua; 6072 struct dlm_rsb *r; 6073 int error; 6074 6075 dlm_lock_recovery(ls); 6076 6077 error = find_lkb(ls, lkid, &lkb); 6078 if (error) 6079 goto out; 6080 6081 trace_dlm_unlock_start(ls, lkb, flags); 6082 6083 ua = lkb->lkb_ua; 6084 6085 error = set_unlock_args(flags, ua, &args); 6086 if (error) 6087 goto out_put; 6088 6089 /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */ 6090 6091 r = lkb->lkb_resource; 6092 hold_rsb(r); 6093 lock_rsb(r); 6094 6095 error = validate_unlock_args(lkb, &args); 6096 if (error) 6097 goto out_r; 6098 lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL; 6099 6100 error = _cancel_lock(r, lkb); 6101 out_r: 6102 unlock_rsb(r); 6103 put_rsb(r); 6104 6105 if (error == -DLM_ECANCEL) 6106 error = 0; 6107 /* from validate_unlock_args() */ 6108 if (error == -EBUSY) 6109 error = 0; 6110 out_put: 6111 trace_dlm_unlock_end(ls, lkb, flags, error); 6112 dlm_put_lkb(lkb); 6113 out: 6114 dlm_unlock_recovery(ls); 6115 return error; 6116 } 6117 6118 /* lkb's that are removed from the waiters list by revert are just left on the 6119 orphans list with the granted orphan locks, to be freed by purge */ 6120 6121 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb) 6122 { 6123 struct dlm_args args; 6124 int error; 6125 6126 hold_lkb(lkb); /* reference for the ls_orphans list */ 6127 mutex_lock(&ls->ls_orphans_mutex); 6128 list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans); 6129 mutex_unlock(&ls->ls_orphans_mutex); 6130 6131 set_unlock_args(0, lkb->lkb_ua, &args); 6132 6133 error = cancel_lock(ls, lkb, &args); 6134 if (error == -DLM_ECANCEL) 6135 error = 0; 6136 return error; 6137 } 6138 6139 /* The FORCEUNLOCK flag allows the unlock to go ahead even if the lkb isn't 6140 granted. Regardless of what rsb queue the lock is on, it's removed and 6141 freed. The IVVALBLK flag causes the lvb on the resource to be invalidated 6142 if our lock is PW/EX (it's ignored if our granted mode is smaller.) */ 6143 6144 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb) 6145 { 6146 struct dlm_args args; 6147 int error; 6148 6149 set_unlock_args(DLM_LKF_FORCEUNLOCK | DLM_LKF_IVVALBLK, 6150 lkb->lkb_ua, &args); 6151 6152 error = unlock_lock(ls, lkb, &args); 6153 if (error == -DLM_EUNLOCK) 6154 error = 0; 6155 return error; 6156 } 6157 6158 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock() 6159 (which does lock_rsb) due to deadlock with receiving a message that does 6160 lock_rsb followed by dlm_user_add_cb() */ 6161 6162 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls, 6163 struct dlm_user_proc *proc) 6164 { 6165 struct dlm_lkb *lkb = NULL; 6166 6167 spin_lock(&ls->ls_clear_proc_locks); 6168 if (list_empty(&proc->locks)) 6169 goto out; 6170 6171 lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue); 6172 list_del_init(&lkb->lkb_ownqueue); 6173 6174 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT) 6175 lkb->lkb_flags |= DLM_IFL_ORPHAN; 6176 else 6177 lkb->lkb_flags |= DLM_IFL_DEAD; 6178 out: 6179 spin_unlock(&ls->ls_clear_proc_locks); 6180 return lkb; 6181 } 6182 6183 /* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which 6184 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts, 6185 which we clear here. */ 6186 6187 /* proc CLOSING flag is set so no more device_reads should look at proc->asts 6188 list, and no more device_writes should add lkb's to proc->locks list; so we 6189 shouldn't need to take asts_spin or locks_spin here. this assumes that 6190 device reads/writes/closes are serialized -- FIXME: we may need to serialize 6191 them ourself. */ 6192 6193 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) 6194 { 6195 struct dlm_lkb *lkb, *safe; 6196 6197 dlm_lock_recovery(ls); 6198 6199 while (1) { 6200 lkb = del_proc_lock(ls, proc); 6201 if (!lkb) 6202 break; 6203 del_timeout(lkb); 6204 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT) 6205 orphan_proc_lock(ls, lkb); 6206 else 6207 unlock_proc_lock(ls, lkb); 6208 6209 /* this removes the reference for the proc->locks list 6210 added by dlm_user_request, it may result in the lkb 6211 being freed */ 6212 6213 dlm_put_lkb(lkb); 6214 } 6215 6216 spin_lock(&ls->ls_clear_proc_locks); 6217 6218 /* in-progress unlocks */ 6219 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) { 6220 list_del_init(&lkb->lkb_ownqueue); 6221 lkb->lkb_flags |= DLM_IFL_DEAD; 6222 dlm_put_lkb(lkb); 6223 } 6224 6225 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) { 6226 dlm_purge_lkb_callbacks(lkb); 6227 list_del_init(&lkb->lkb_cb_list); 6228 dlm_put_lkb(lkb); 6229 } 6230 6231 spin_unlock(&ls->ls_clear_proc_locks); 6232 dlm_unlock_recovery(ls); 6233 } 6234 6235 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) 6236 { 6237 struct dlm_lkb *lkb, *safe; 6238 6239 while (1) { 6240 lkb = NULL; 6241 spin_lock(&proc->locks_spin); 6242 if (!list_empty(&proc->locks)) { 6243 lkb = list_entry(proc->locks.next, struct dlm_lkb, 6244 lkb_ownqueue); 6245 list_del_init(&lkb->lkb_ownqueue); 6246 } 6247 spin_unlock(&proc->locks_spin); 6248 6249 if (!lkb) 6250 break; 6251 6252 lkb->lkb_flags |= DLM_IFL_DEAD; 6253 unlock_proc_lock(ls, lkb); 6254 dlm_put_lkb(lkb); /* ref from proc->locks list */ 6255 } 6256 6257 spin_lock(&proc->locks_spin); 6258 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) { 6259 list_del_init(&lkb->lkb_ownqueue); 6260 lkb->lkb_flags |= DLM_IFL_DEAD; 6261 dlm_put_lkb(lkb); 6262 } 6263 spin_unlock(&proc->locks_spin); 6264 6265 spin_lock(&proc->asts_spin); 6266 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) { 6267 dlm_purge_lkb_callbacks(lkb); 6268 list_del_init(&lkb->lkb_cb_list); 6269 dlm_put_lkb(lkb); 6270 } 6271 spin_unlock(&proc->asts_spin); 6272 } 6273 6274 /* pid of 0 means purge all orphans */ 6275 6276 static void do_purge(struct dlm_ls *ls, int nodeid, int pid) 6277 { 6278 struct dlm_lkb *lkb, *safe; 6279 6280 mutex_lock(&ls->ls_orphans_mutex); 6281 list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) { 6282 if (pid && lkb->lkb_ownpid != pid) 6283 continue; 6284 unlock_proc_lock(ls, lkb); 6285 list_del_init(&lkb->lkb_ownqueue); 6286 dlm_put_lkb(lkb); 6287 } 6288 mutex_unlock(&ls->ls_orphans_mutex); 6289 } 6290 6291 static int send_purge(struct dlm_ls *ls, int nodeid, int pid) 6292 { 6293 struct dlm_message *ms; 6294 struct dlm_mhandle *mh; 6295 int error; 6296 6297 error = _create_message(ls, sizeof(struct dlm_message), nodeid, 6298 DLM_MSG_PURGE, &ms, &mh); 6299 if (error) 6300 return error; 6301 ms->m_nodeid = cpu_to_le32(nodeid); 6302 ms->m_pid = cpu_to_le32(pid); 6303 6304 return send_message(mh, ms, NULL, 0); 6305 } 6306 6307 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc, 6308 int nodeid, int pid) 6309 { 6310 int error = 0; 6311 6312 if (nodeid && (nodeid != dlm_our_nodeid())) { 6313 error = send_purge(ls, nodeid, pid); 6314 } else { 6315 dlm_lock_recovery(ls); 6316 if (pid == current->pid) 6317 purge_proc_locks(ls, proc); 6318 else 6319 do_purge(ls, nodeid, pid); 6320 dlm_unlock_recovery(ls); 6321 } 6322 return error; 6323 } 6324 6325 /* debug functionality */ 6326 int dlm_debug_add_lkb(struct dlm_ls *ls, uint32_t lkb_id, char *name, int len, 6327 int lkb_nodeid, unsigned int lkb_flags, int lkb_status) 6328 { 6329 struct dlm_lksb *lksb; 6330 struct dlm_lkb *lkb; 6331 struct dlm_rsb *r; 6332 int error; 6333 6334 /* we currently can't set a valid user lock */ 6335 if (lkb_flags & DLM_IFL_USER) 6336 return -EOPNOTSUPP; 6337 6338 lksb = kzalloc(sizeof(*lksb), GFP_NOFS); 6339 if (!lksb) 6340 return -ENOMEM; 6341 6342 error = _create_lkb(ls, &lkb, lkb_id, lkb_id + 1); 6343 if (error) { 6344 kfree(lksb); 6345 return error; 6346 } 6347 6348 lkb->lkb_flags = lkb_flags; 6349 lkb->lkb_nodeid = lkb_nodeid; 6350 lkb->lkb_lksb = lksb; 6351 /* user specific pointer, just don't have it NULL for kernel locks */ 6352 if (~lkb_flags & DLM_IFL_USER) 6353 lkb->lkb_astparam = (void *)0xDEADBEEF; 6354 6355 error = find_rsb(ls, name, len, 0, R_REQUEST, &r); 6356 if (error) { 6357 kfree(lksb); 6358 __put_lkb(ls, lkb); 6359 return error; 6360 } 6361 6362 lock_rsb(r); 6363 attach_lkb(r, lkb); 6364 add_lkb(r, lkb, lkb_status); 6365 unlock_rsb(r); 6366 put_rsb(r); 6367 6368 return 0; 6369 } 6370 6371 int dlm_debug_add_lkb_to_waiters(struct dlm_ls *ls, uint32_t lkb_id, 6372 int mstype, int to_nodeid) 6373 { 6374 struct dlm_lkb *lkb; 6375 int error; 6376 6377 error = find_lkb(ls, lkb_id, &lkb); 6378 if (error) 6379 return error; 6380 6381 error = add_to_waiters(lkb, mstype, to_nodeid); 6382 dlm_put_lkb(lkb); 6383 return error; 6384 } 6385 6386