1 /****************************************************************************** 2 ******************************************************************************* 3 ** 4 ** Copyright (C) 2005-2007 Red Hat, Inc. All rights reserved. 5 ** 6 ** This copyrighted material is made available to anyone wishing to use, 7 ** modify, copy, or redistribute it subject to the terms and conditions 8 ** of the GNU General Public License v.2. 9 ** 10 ******************************************************************************* 11 ******************************************************************************/ 12 13 /* Central locking logic has four stages: 14 15 dlm_lock() 16 dlm_unlock() 17 18 request_lock(ls, lkb) 19 convert_lock(ls, lkb) 20 unlock_lock(ls, lkb) 21 cancel_lock(ls, lkb) 22 23 _request_lock(r, lkb) 24 _convert_lock(r, lkb) 25 _unlock_lock(r, lkb) 26 _cancel_lock(r, lkb) 27 28 do_request(r, lkb) 29 do_convert(r, lkb) 30 do_unlock(r, lkb) 31 do_cancel(r, lkb) 32 33 Stage 1 (lock, unlock) is mainly about checking input args and 34 splitting into one of the four main operations: 35 36 dlm_lock = request_lock 37 dlm_lock+CONVERT = convert_lock 38 dlm_unlock = unlock_lock 39 dlm_unlock+CANCEL = cancel_lock 40 41 Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is 42 provided to the next stage. 43 44 Stage 3, _xxxx_lock(), determines if the operation is local or remote. 45 When remote, it calls send_xxxx(), when local it calls do_xxxx(). 46 47 Stage 4, do_xxxx(), is the guts of the operation. It manipulates the 48 given rsb and lkb and queues callbacks. 49 50 For remote operations, send_xxxx() results in the corresponding do_xxxx() 51 function being executed on the remote node. The connecting send/receive 52 calls on local (L) and remote (R) nodes: 53 54 L: send_xxxx() -> R: receive_xxxx() 55 R: do_xxxx() 56 L: receive_xxxx_reply() <- R: send_xxxx_reply() 57 */ 58 #include <linux/types.h> 59 #include "dlm_internal.h" 60 #include <linux/dlm_device.h> 61 #include "memory.h" 62 #include "lowcomms.h" 63 #include "requestqueue.h" 64 #include "util.h" 65 #include "dir.h" 66 #include "member.h" 67 #include "lockspace.h" 68 #include "ast.h" 69 #include "lock.h" 70 #include "rcom.h" 71 #include "recover.h" 72 #include "lvb_table.h" 73 #include "user.h" 74 #include "config.h" 75 76 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb); 77 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb); 78 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb); 79 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb); 80 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb); 81 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode); 82 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb); 83 static int send_remove(struct dlm_rsb *r); 84 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb); 85 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, 86 struct dlm_message *ms); 87 static int receive_extralen(struct dlm_message *ms); 88 static void do_purge(struct dlm_ls *ls, int nodeid, int pid); 89 90 /* 91 * Lock compatibilty matrix - thanks Steve 92 * UN = Unlocked state. Not really a state, used as a flag 93 * PD = Padding. Used to make the matrix a nice power of two in size 94 * Other states are the same as the VMS DLM. 95 * Usage: matrix[grmode+1][rqmode+1] (although m[rq+1][gr+1] is the same) 96 */ 97 98 static const int __dlm_compat_matrix[8][8] = { 99 /* UN NL CR CW PR PW EX PD */ 100 {1, 1, 1, 1, 1, 1, 1, 0}, /* UN */ 101 {1, 1, 1, 1, 1, 1, 1, 0}, /* NL */ 102 {1, 1, 1, 1, 1, 1, 0, 0}, /* CR */ 103 {1, 1, 1, 1, 0, 0, 0, 0}, /* CW */ 104 {1, 1, 1, 0, 1, 0, 0, 0}, /* PR */ 105 {1, 1, 1, 0, 0, 0, 0, 0}, /* PW */ 106 {1, 1, 0, 0, 0, 0, 0, 0}, /* EX */ 107 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */ 108 }; 109 110 /* 111 * This defines the direction of transfer of LVB data. 112 * Granted mode is the row; requested mode is the column. 113 * Usage: matrix[grmode+1][rqmode+1] 114 * 1 = LVB is returned to the caller 115 * 0 = LVB is written to the resource 116 * -1 = nothing happens to the LVB 117 */ 118 119 const int dlm_lvb_operations[8][8] = { 120 /* UN NL CR CW PR PW EX PD*/ 121 { -1, 1, 1, 1, 1, 1, 1, -1 }, /* UN */ 122 { -1, 1, 1, 1, 1, 1, 1, 0 }, /* NL */ 123 { -1, -1, 1, 1, 1, 1, 1, 0 }, /* CR */ 124 { -1, -1, -1, 1, 1, 1, 1, 0 }, /* CW */ 125 { -1, -1, -1, -1, 1, 1, 1, 0 }, /* PR */ 126 { -1, 0, 0, 0, 0, 0, 1, 0 }, /* PW */ 127 { -1, 0, 0, 0, 0, 0, 0, 0 }, /* EX */ 128 { -1, 0, 0, 0, 0, 0, 0, 0 } /* PD */ 129 }; 130 131 #define modes_compat(gr, rq) \ 132 __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1] 133 134 int dlm_modes_compat(int mode1, int mode2) 135 { 136 return __dlm_compat_matrix[mode1 + 1][mode2 + 1]; 137 } 138 139 /* 140 * Compatibility matrix for conversions with QUECVT set. 141 * Granted mode is the row; requested mode is the column. 142 * Usage: matrix[grmode+1][rqmode+1] 143 */ 144 145 static const int __quecvt_compat_matrix[8][8] = { 146 /* UN NL CR CW PR PW EX PD */ 147 {0, 0, 0, 0, 0, 0, 0, 0}, /* UN */ 148 {0, 0, 1, 1, 1, 1, 1, 0}, /* NL */ 149 {0, 0, 0, 1, 1, 1, 1, 0}, /* CR */ 150 {0, 0, 0, 0, 1, 1, 1, 0}, /* CW */ 151 {0, 0, 0, 1, 0, 1, 1, 0}, /* PR */ 152 {0, 0, 0, 0, 0, 0, 1, 0}, /* PW */ 153 {0, 0, 0, 0, 0, 0, 0, 0}, /* EX */ 154 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */ 155 }; 156 157 void dlm_print_lkb(struct dlm_lkb *lkb) 158 { 159 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n" 160 " status %d rqmode %d grmode %d wait_type %d ast_type %d\n", 161 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags, 162 lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode, 163 lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type); 164 } 165 166 void dlm_print_rsb(struct dlm_rsb *r) 167 { 168 printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n", 169 r->res_nodeid, r->res_flags, r->res_first_lkid, 170 r->res_recover_locks_count, r->res_name); 171 } 172 173 void dlm_dump_rsb(struct dlm_rsb *r) 174 { 175 struct dlm_lkb *lkb; 176 177 dlm_print_rsb(r); 178 179 printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n", 180 list_empty(&r->res_root_list), list_empty(&r->res_recover_list)); 181 printk(KERN_ERR "rsb lookup list\n"); 182 list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup) 183 dlm_print_lkb(lkb); 184 printk(KERN_ERR "rsb grant queue:\n"); 185 list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue) 186 dlm_print_lkb(lkb); 187 printk(KERN_ERR "rsb convert queue:\n"); 188 list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue) 189 dlm_print_lkb(lkb); 190 printk(KERN_ERR "rsb wait queue:\n"); 191 list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue) 192 dlm_print_lkb(lkb); 193 } 194 195 /* Threads cannot use the lockspace while it's being recovered */ 196 197 static inline void lock_recovery(struct dlm_ls *ls) 198 { 199 down_read(&ls->ls_in_recovery); 200 } 201 202 static inline void unlock_recovery(struct dlm_ls *ls) 203 { 204 up_read(&ls->ls_in_recovery); 205 } 206 207 static inline int lock_recovery_try(struct dlm_ls *ls) 208 { 209 return down_read_trylock(&ls->ls_in_recovery); 210 } 211 212 static inline int can_be_queued(struct dlm_lkb *lkb) 213 { 214 return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE); 215 } 216 217 static inline int force_blocking_asts(struct dlm_lkb *lkb) 218 { 219 return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST); 220 } 221 222 static inline int is_demoted(struct dlm_lkb *lkb) 223 { 224 return (lkb->lkb_sbflags & DLM_SBF_DEMOTED); 225 } 226 227 static inline int is_altmode(struct dlm_lkb *lkb) 228 { 229 return (lkb->lkb_sbflags & DLM_SBF_ALTMODE); 230 } 231 232 static inline int is_granted(struct dlm_lkb *lkb) 233 { 234 return (lkb->lkb_status == DLM_LKSTS_GRANTED); 235 } 236 237 static inline int is_remote(struct dlm_rsb *r) 238 { 239 DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r);); 240 return !!r->res_nodeid; 241 } 242 243 static inline int is_process_copy(struct dlm_lkb *lkb) 244 { 245 return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY)); 246 } 247 248 static inline int is_master_copy(struct dlm_lkb *lkb) 249 { 250 if (lkb->lkb_flags & DLM_IFL_MSTCPY) 251 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb);); 252 return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0; 253 } 254 255 static inline int middle_conversion(struct dlm_lkb *lkb) 256 { 257 if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) || 258 (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW)) 259 return 1; 260 return 0; 261 } 262 263 static inline int down_conversion(struct dlm_lkb *lkb) 264 { 265 return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode); 266 } 267 268 static inline int is_overlap_unlock(struct dlm_lkb *lkb) 269 { 270 return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK; 271 } 272 273 static inline int is_overlap_cancel(struct dlm_lkb *lkb) 274 { 275 return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL; 276 } 277 278 static inline int is_overlap(struct dlm_lkb *lkb) 279 { 280 return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK | 281 DLM_IFL_OVERLAP_CANCEL)); 282 } 283 284 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) 285 { 286 if (is_master_copy(lkb)) 287 return; 288 289 DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb);); 290 291 lkb->lkb_lksb->sb_status = rv; 292 lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags; 293 294 dlm_add_ast(lkb, AST_COMP); 295 } 296 297 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb) 298 { 299 queue_cast(r, lkb, 300 is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL); 301 } 302 303 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode) 304 { 305 if (is_master_copy(lkb)) 306 send_bast(r, lkb, rqmode); 307 else { 308 lkb->lkb_bastmode = rqmode; 309 dlm_add_ast(lkb, AST_BAST); 310 } 311 } 312 313 /* 314 * Basic operations on rsb's and lkb's 315 */ 316 317 static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len) 318 { 319 struct dlm_rsb *r; 320 321 r = allocate_rsb(ls, len); 322 if (!r) 323 return NULL; 324 325 r->res_ls = ls; 326 r->res_length = len; 327 memcpy(r->res_name, name, len); 328 mutex_init(&r->res_mutex); 329 330 INIT_LIST_HEAD(&r->res_lookup); 331 INIT_LIST_HEAD(&r->res_grantqueue); 332 INIT_LIST_HEAD(&r->res_convertqueue); 333 INIT_LIST_HEAD(&r->res_waitqueue); 334 INIT_LIST_HEAD(&r->res_root_list); 335 INIT_LIST_HEAD(&r->res_recover_list); 336 337 return r; 338 } 339 340 static int search_rsb_list(struct list_head *head, char *name, int len, 341 unsigned int flags, struct dlm_rsb **r_ret) 342 { 343 struct dlm_rsb *r; 344 int error = 0; 345 346 list_for_each_entry(r, head, res_hashchain) { 347 if (len == r->res_length && !memcmp(name, r->res_name, len)) 348 goto found; 349 } 350 return -EBADR; 351 352 found: 353 if (r->res_nodeid && (flags & R_MASTER)) 354 error = -ENOTBLK; 355 *r_ret = r; 356 return error; 357 } 358 359 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b, 360 unsigned int flags, struct dlm_rsb **r_ret) 361 { 362 struct dlm_rsb *r; 363 int error; 364 365 error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r); 366 if (!error) { 367 kref_get(&r->res_ref); 368 goto out; 369 } 370 error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r); 371 if (error) 372 goto out; 373 374 list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list); 375 376 if (dlm_no_directory(ls)) 377 goto out; 378 379 if (r->res_nodeid == -1) { 380 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN); 381 r->res_first_lkid = 0; 382 } else if (r->res_nodeid > 0) { 383 rsb_set_flag(r, RSB_MASTER_UNCERTAIN); 384 r->res_first_lkid = 0; 385 } else { 386 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r);); 387 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),); 388 } 389 out: 390 *r_ret = r; 391 return error; 392 } 393 394 static int search_rsb(struct dlm_ls *ls, char *name, int len, int b, 395 unsigned int flags, struct dlm_rsb **r_ret) 396 { 397 int error; 398 write_lock(&ls->ls_rsbtbl[b].lock); 399 error = _search_rsb(ls, name, len, b, flags, r_ret); 400 write_unlock(&ls->ls_rsbtbl[b].lock); 401 return error; 402 } 403 404 /* 405 * Find rsb in rsbtbl and potentially create/add one 406 * 407 * Delaying the release of rsb's has a similar benefit to applications keeping 408 * NL locks on an rsb, but without the guarantee that the cached master value 409 * will still be valid when the rsb is reused. Apps aren't always smart enough 410 * to keep NL locks on an rsb that they may lock again shortly; this can lead 411 * to excessive master lookups and removals if we don't delay the release. 412 * 413 * Searching for an rsb means looking through both the normal list and toss 414 * list. When found on the toss list the rsb is moved to the normal list with 415 * ref count of 1; when found on normal list the ref count is incremented. 416 */ 417 418 static int find_rsb(struct dlm_ls *ls, char *name, int namelen, 419 unsigned int flags, struct dlm_rsb **r_ret) 420 { 421 struct dlm_rsb *r, *tmp; 422 uint32_t hash, bucket; 423 int error = 0; 424 425 if (dlm_no_directory(ls)) 426 flags |= R_CREATE; 427 428 hash = jhash(name, namelen, 0); 429 bucket = hash & (ls->ls_rsbtbl_size - 1); 430 431 error = search_rsb(ls, name, namelen, bucket, flags, &r); 432 if (!error) 433 goto out; 434 435 if (error == -EBADR && !(flags & R_CREATE)) 436 goto out; 437 438 /* the rsb was found but wasn't a master copy */ 439 if (error == -ENOTBLK) 440 goto out; 441 442 error = -ENOMEM; 443 r = create_rsb(ls, name, namelen); 444 if (!r) 445 goto out; 446 447 r->res_hash = hash; 448 r->res_bucket = bucket; 449 r->res_nodeid = -1; 450 kref_init(&r->res_ref); 451 452 /* With no directory, the master can be set immediately */ 453 if (dlm_no_directory(ls)) { 454 int nodeid = dlm_dir_nodeid(r); 455 if (nodeid == dlm_our_nodeid()) 456 nodeid = 0; 457 r->res_nodeid = nodeid; 458 } 459 460 write_lock(&ls->ls_rsbtbl[bucket].lock); 461 error = _search_rsb(ls, name, namelen, bucket, 0, &tmp); 462 if (!error) { 463 write_unlock(&ls->ls_rsbtbl[bucket].lock); 464 free_rsb(r); 465 r = tmp; 466 goto out; 467 } 468 list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list); 469 write_unlock(&ls->ls_rsbtbl[bucket].lock); 470 error = 0; 471 out: 472 *r_ret = r; 473 return error; 474 } 475 476 int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen, 477 unsigned int flags, struct dlm_rsb **r_ret) 478 { 479 return find_rsb(ls, name, namelen, flags, r_ret); 480 } 481 482 /* This is only called to add a reference when the code already holds 483 a valid reference to the rsb, so there's no need for locking. */ 484 485 static inline void hold_rsb(struct dlm_rsb *r) 486 { 487 kref_get(&r->res_ref); 488 } 489 490 void dlm_hold_rsb(struct dlm_rsb *r) 491 { 492 hold_rsb(r); 493 } 494 495 static void toss_rsb(struct kref *kref) 496 { 497 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref); 498 struct dlm_ls *ls = r->res_ls; 499 500 DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r);); 501 kref_init(&r->res_ref); 502 list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss); 503 r->res_toss_time = jiffies; 504 if (r->res_lvbptr) { 505 free_lvb(r->res_lvbptr); 506 r->res_lvbptr = NULL; 507 } 508 } 509 510 /* When all references to the rsb are gone it's transfered to 511 the tossed list for later disposal. */ 512 513 static void put_rsb(struct dlm_rsb *r) 514 { 515 struct dlm_ls *ls = r->res_ls; 516 uint32_t bucket = r->res_bucket; 517 518 write_lock(&ls->ls_rsbtbl[bucket].lock); 519 kref_put(&r->res_ref, toss_rsb); 520 write_unlock(&ls->ls_rsbtbl[bucket].lock); 521 } 522 523 void dlm_put_rsb(struct dlm_rsb *r) 524 { 525 put_rsb(r); 526 } 527 528 /* See comment for unhold_lkb */ 529 530 static void unhold_rsb(struct dlm_rsb *r) 531 { 532 int rv; 533 rv = kref_put(&r->res_ref, toss_rsb); 534 DLM_ASSERT(!rv, dlm_dump_rsb(r);); 535 } 536 537 static void kill_rsb(struct kref *kref) 538 { 539 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref); 540 541 /* All work is done after the return from kref_put() so we 542 can release the write_lock before the remove and free. */ 543 544 DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r);); 545 DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r);); 546 DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r);); 547 DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r);); 548 DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r);); 549 DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r);); 550 } 551 552 /* Attaching/detaching lkb's from rsb's is for rsb reference counting. 553 The rsb must exist as long as any lkb's for it do. */ 554 555 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb) 556 { 557 hold_rsb(r); 558 lkb->lkb_resource = r; 559 } 560 561 static void detach_lkb(struct dlm_lkb *lkb) 562 { 563 if (lkb->lkb_resource) { 564 put_rsb(lkb->lkb_resource); 565 lkb->lkb_resource = NULL; 566 } 567 } 568 569 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret) 570 { 571 struct dlm_lkb *lkb, *tmp; 572 uint32_t lkid = 0; 573 uint16_t bucket; 574 575 lkb = allocate_lkb(ls); 576 if (!lkb) 577 return -ENOMEM; 578 579 lkb->lkb_nodeid = -1; 580 lkb->lkb_grmode = DLM_LOCK_IV; 581 kref_init(&lkb->lkb_ref); 582 INIT_LIST_HEAD(&lkb->lkb_ownqueue); 583 INIT_LIST_HEAD(&lkb->lkb_rsb_lookup); 584 585 get_random_bytes(&bucket, sizeof(bucket)); 586 bucket &= (ls->ls_lkbtbl_size - 1); 587 588 write_lock(&ls->ls_lkbtbl[bucket].lock); 589 590 /* counter can roll over so we must verify lkid is not in use */ 591 592 while (lkid == 0) { 593 lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++; 594 595 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list, 596 lkb_idtbl_list) { 597 if (tmp->lkb_id != lkid) 598 continue; 599 lkid = 0; 600 break; 601 } 602 } 603 604 lkb->lkb_id = lkid; 605 list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list); 606 write_unlock(&ls->ls_lkbtbl[bucket].lock); 607 608 *lkb_ret = lkb; 609 return 0; 610 } 611 612 static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid) 613 { 614 struct dlm_lkb *lkb; 615 uint16_t bucket = (lkid >> 16); 616 617 list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) { 618 if (lkb->lkb_id == lkid) 619 return lkb; 620 } 621 return NULL; 622 } 623 624 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret) 625 { 626 struct dlm_lkb *lkb; 627 uint16_t bucket = (lkid >> 16); 628 629 if (bucket >= ls->ls_lkbtbl_size) 630 return -EBADSLT; 631 632 read_lock(&ls->ls_lkbtbl[bucket].lock); 633 lkb = __find_lkb(ls, lkid); 634 if (lkb) 635 kref_get(&lkb->lkb_ref); 636 read_unlock(&ls->ls_lkbtbl[bucket].lock); 637 638 *lkb_ret = lkb; 639 return lkb ? 0 : -ENOENT; 640 } 641 642 static void kill_lkb(struct kref *kref) 643 { 644 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref); 645 646 /* All work is done after the return from kref_put() so we 647 can release the write_lock before the detach_lkb */ 648 649 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb);); 650 } 651 652 /* __put_lkb() is used when an lkb may not have an rsb attached to 653 it so we need to provide the lockspace explicitly */ 654 655 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb) 656 { 657 uint16_t bucket = (lkb->lkb_id >> 16); 658 659 write_lock(&ls->ls_lkbtbl[bucket].lock); 660 if (kref_put(&lkb->lkb_ref, kill_lkb)) { 661 list_del(&lkb->lkb_idtbl_list); 662 write_unlock(&ls->ls_lkbtbl[bucket].lock); 663 664 detach_lkb(lkb); 665 666 /* for local/process lkbs, lvbptr points to caller's lksb */ 667 if (lkb->lkb_lvbptr && is_master_copy(lkb)) 668 free_lvb(lkb->lkb_lvbptr); 669 free_lkb(lkb); 670 return 1; 671 } else { 672 write_unlock(&ls->ls_lkbtbl[bucket].lock); 673 return 0; 674 } 675 } 676 677 int dlm_put_lkb(struct dlm_lkb *lkb) 678 { 679 struct dlm_ls *ls; 680 681 DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb);); 682 DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb);); 683 684 ls = lkb->lkb_resource->res_ls; 685 return __put_lkb(ls, lkb); 686 } 687 688 /* This is only called to add a reference when the code already holds 689 a valid reference to the lkb, so there's no need for locking. */ 690 691 static inline void hold_lkb(struct dlm_lkb *lkb) 692 { 693 kref_get(&lkb->lkb_ref); 694 } 695 696 /* This is called when we need to remove a reference and are certain 697 it's not the last ref. e.g. del_lkb is always called between a 698 find_lkb/put_lkb and is always the inverse of a previous add_lkb. 699 put_lkb would work fine, but would involve unnecessary locking */ 700 701 static inline void unhold_lkb(struct dlm_lkb *lkb) 702 { 703 int rv; 704 rv = kref_put(&lkb->lkb_ref, kill_lkb); 705 DLM_ASSERT(!rv, dlm_print_lkb(lkb);); 706 } 707 708 static void lkb_add_ordered(struct list_head *new, struct list_head *head, 709 int mode) 710 { 711 struct dlm_lkb *lkb = NULL; 712 713 list_for_each_entry(lkb, head, lkb_statequeue) 714 if (lkb->lkb_rqmode < mode) 715 break; 716 717 if (!lkb) 718 list_add_tail(new, head); 719 else 720 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue); 721 } 722 723 /* add/remove lkb to rsb's grant/convert/wait queue */ 724 725 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status) 726 { 727 kref_get(&lkb->lkb_ref); 728 729 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb);); 730 731 lkb->lkb_status = status; 732 733 switch (status) { 734 case DLM_LKSTS_WAITING: 735 if (lkb->lkb_exflags & DLM_LKF_HEADQUE) 736 list_add(&lkb->lkb_statequeue, &r->res_waitqueue); 737 else 738 list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue); 739 break; 740 case DLM_LKSTS_GRANTED: 741 /* convention says granted locks kept in order of grmode */ 742 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue, 743 lkb->lkb_grmode); 744 break; 745 case DLM_LKSTS_CONVERT: 746 if (lkb->lkb_exflags & DLM_LKF_HEADQUE) 747 list_add(&lkb->lkb_statequeue, &r->res_convertqueue); 748 else 749 list_add_tail(&lkb->lkb_statequeue, 750 &r->res_convertqueue); 751 break; 752 default: 753 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status);); 754 } 755 } 756 757 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb) 758 { 759 lkb->lkb_status = 0; 760 list_del(&lkb->lkb_statequeue); 761 unhold_lkb(lkb); 762 } 763 764 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts) 765 { 766 hold_lkb(lkb); 767 del_lkb(r, lkb); 768 add_lkb(r, lkb, sts); 769 unhold_lkb(lkb); 770 } 771 772 static int msg_reply_type(int mstype) 773 { 774 switch (mstype) { 775 case DLM_MSG_REQUEST: 776 return DLM_MSG_REQUEST_REPLY; 777 case DLM_MSG_CONVERT: 778 return DLM_MSG_CONVERT_REPLY; 779 case DLM_MSG_UNLOCK: 780 return DLM_MSG_UNLOCK_REPLY; 781 case DLM_MSG_CANCEL: 782 return DLM_MSG_CANCEL_REPLY; 783 case DLM_MSG_LOOKUP: 784 return DLM_MSG_LOOKUP_REPLY; 785 } 786 return -1; 787 } 788 789 /* add/remove lkb from global waiters list of lkb's waiting for 790 a reply from a remote node */ 791 792 static int add_to_waiters(struct dlm_lkb *lkb, int mstype) 793 { 794 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 795 int error = 0; 796 797 mutex_lock(&ls->ls_waiters_mutex); 798 799 if (is_overlap_unlock(lkb) || 800 (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) { 801 error = -EINVAL; 802 goto out; 803 } 804 805 if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) { 806 switch (mstype) { 807 case DLM_MSG_UNLOCK: 808 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK; 809 break; 810 case DLM_MSG_CANCEL: 811 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL; 812 break; 813 default: 814 error = -EBUSY; 815 goto out; 816 } 817 lkb->lkb_wait_count++; 818 hold_lkb(lkb); 819 820 log_debug(ls, "add overlap %x cur %d new %d count %d flags %x", 821 lkb->lkb_id, lkb->lkb_wait_type, mstype, 822 lkb->lkb_wait_count, lkb->lkb_flags); 823 goto out; 824 } 825 826 DLM_ASSERT(!lkb->lkb_wait_count, 827 dlm_print_lkb(lkb); 828 printk("wait_count %d\n", lkb->lkb_wait_count);); 829 830 lkb->lkb_wait_count++; 831 lkb->lkb_wait_type = mstype; 832 hold_lkb(lkb); 833 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters); 834 out: 835 if (error) 836 log_error(ls, "add_to_waiters %x error %d flags %x %d %d %s", 837 lkb->lkb_id, error, lkb->lkb_flags, mstype, 838 lkb->lkb_wait_type, lkb->lkb_resource->res_name); 839 mutex_unlock(&ls->ls_waiters_mutex); 840 return error; 841 } 842 843 /* We clear the RESEND flag because we might be taking an lkb off the waiters 844 list as part of process_requestqueue (e.g. a lookup that has an optimized 845 request reply on the requestqueue) between dlm_recover_waiters_pre() which 846 set RESEND and dlm_recover_waiters_post() */ 847 848 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype) 849 { 850 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 851 int overlap_done = 0; 852 853 if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) { 854 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK; 855 overlap_done = 1; 856 goto out_del; 857 } 858 859 if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) { 860 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; 861 overlap_done = 1; 862 goto out_del; 863 } 864 865 /* N.B. type of reply may not always correspond to type of original 866 msg due to lookup->request optimization, verify others? */ 867 868 if (lkb->lkb_wait_type) { 869 lkb->lkb_wait_type = 0; 870 goto out_del; 871 } 872 873 log_error(ls, "remove_from_waiters lkid %x flags %x types %d %d", 874 lkb->lkb_id, lkb->lkb_flags, mstype, lkb->lkb_wait_type); 875 return -1; 876 877 out_del: 878 /* the force-unlock/cancel has completed and we haven't recvd a reply 879 to the op that was in progress prior to the unlock/cancel; we 880 give up on any reply to the earlier op. FIXME: not sure when/how 881 this would happen */ 882 883 if (overlap_done && lkb->lkb_wait_type) { 884 log_error(ls, "remove_from_waiters %x reply %d give up on %d", 885 lkb->lkb_id, mstype, lkb->lkb_wait_type); 886 lkb->lkb_wait_count--; 887 lkb->lkb_wait_type = 0; 888 } 889 890 DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb);); 891 892 lkb->lkb_flags &= ~DLM_IFL_RESEND; 893 lkb->lkb_wait_count--; 894 if (!lkb->lkb_wait_count) 895 list_del_init(&lkb->lkb_wait_reply); 896 unhold_lkb(lkb); 897 return 0; 898 } 899 900 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype) 901 { 902 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 903 int error; 904 905 mutex_lock(&ls->ls_waiters_mutex); 906 error = _remove_from_waiters(lkb, mstype); 907 mutex_unlock(&ls->ls_waiters_mutex); 908 return error; 909 } 910 911 /* Handles situations where we might be processing a "fake" or "stub" reply in 912 which we can't try to take waiters_mutex again. */ 913 914 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms) 915 { 916 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 917 int error; 918 919 if (ms != &ls->ls_stub_ms) 920 mutex_lock(&ls->ls_waiters_mutex); 921 error = _remove_from_waiters(lkb, ms->m_type); 922 if (ms != &ls->ls_stub_ms) 923 mutex_unlock(&ls->ls_waiters_mutex); 924 return error; 925 } 926 927 static void dir_remove(struct dlm_rsb *r) 928 { 929 int to_nodeid; 930 931 if (dlm_no_directory(r->res_ls)) 932 return; 933 934 to_nodeid = dlm_dir_nodeid(r); 935 if (to_nodeid != dlm_our_nodeid()) 936 send_remove(r); 937 else 938 dlm_dir_remove_entry(r->res_ls, to_nodeid, 939 r->res_name, r->res_length); 940 } 941 942 /* FIXME: shouldn't this be able to exit as soon as one non-due rsb is 943 found since they are in order of newest to oldest? */ 944 945 static int shrink_bucket(struct dlm_ls *ls, int b) 946 { 947 struct dlm_rsb *r; 948 int count = 0, found; 949 950 for (;;) { 951 found = 0; 952 write_lock(&ls->ls_rsbtbl[b].lock); 953 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss, 954 res_hashchain) { 955 if (!time_after_eq(jiffies, r->res_toss_time + 956 dlm_config.ci_toss_secs * HZ)) 957 continue; 958 found = 1; 959 break; 960 } 961 962 if (!found) { 963 write_unlock(&ls->ls_rsbtbl[b].lock); 964 break; 965 } 966 967 if (kref_put(&r->res_ref, kill_rsb)) { 968 list_del(&r->res_hashchain); 969 write_unlock(&ls->ls_rsbtbl[b].lock); 970 971 if (is_master(r)) 972 dir_remove(r); 973 free_rsb(r); 974 count++; 975 } else { 976 write_unlock(&ls->ls_rsbtbl[b].lock); 977 log_error(ls, "tossed rsb in use %s", r->res_name); 978 } 979 } 980 981 return count; 982 } 983 984 void dlm_scan_rsbs(struct dlm_ls *ls) 985 { 986 int i; 987 988 if (dlm_locking_stopped(ls)) 989 return; 990 991 for (i = 0; i < ls->ls_rsbtbl_size; i++) { 992 shrink_bucket(ls, i); 993 cond_resched(); 994 } 995 } 996 997 /* lkb is master or local copy */ 998 999 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 1000 { 1001 int b, len = r->res_ls->ls_lvblen; 1002 1003 /* b=1 lvb returned to caller 1004 b=0 lvb written to rsb or invalidated 1005 b=-1 do nothing */ 1006 1007 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1]; 1008 1009 if (b == 1) { 1010 if (!lkb->lkb_lvbptr) 1011 return; 1012 1013 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK)) 1014 return; 1015 1016 if (!r->res_lvbptr) 1017 return; 1018 1019 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len); 1020 lkb->lkb_lvbseq = r->res_lvbseq; 1021 1022 } else if (b == 0) { 1023 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) { 1024 rsb_set_flag(r, RSB_VALNOTVALID); 1025 return; 1026 } 1027 1028 if (!lkb->lkb_lvbptr) 1029 return; 1030 1031 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK)) 1032 return; 1033 1034 if (!r->res_lvbptr) 1035 r->res_lvbptr = allocate_lvb(r->res_ls); 1036 1037 if (!r->res_lvbptr) 1038 return; 1039 1040 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len); 1041 r->res_lvbseq++; 1042 lkb->lkb_lvbseq = r->res_lvbseq; 1043 rsb_clear_flag(r, RSB_VALNOTVALID); 1044 } 1045 1046 if (rsb_flag(r, RSB_VALNOTVALID)) 1047 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID; 1048 } 1049 1050 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb) 1051 { 1052 if (lkb->lkb_grmode < DLM_LOCK_PW) 1053 return; 1054 1055 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) { 1056 rsb_set_flag(r, RSB_VALNOTVALID); 1057 return; 1058 } 1059 1060 if (!lkb->lkb_lvbptr) 1061 return; 1062 1063 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK)) 1064 return; 1065 1066 if (!r->res_lvbptr) 1067 r->res_lvbptr = allocate_lvb(r->res_ls); 1068 1069 if (!r->res_lvbptr) 1070 return; 1071 1072 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen); 1073 r->res_lvbseq++; 1074 rsb_clear_flag(r, RSB_VALNOTVALID); 1075 } 1076 1077 /* lkb is process copy (pc) */ 1078 1079 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb, 1080 struct dlm_message *ms) 1081 { 1082 int b; 1083 1084 if (!lkb->lkb_lvbptr) 1085 return; 1086 1087 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK)) 1088 return; 1089 1090 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1]; 1091 if (b == 1) { 1092 int len = receive_extralen(ms); 1093 memcpy(lkb->lkb_lvbptr, ms->m_extra, len); 1094 lkb->lkb_lvbseq = ms->m_lvbseq; 1095 } 1096 } 1097 1098 /* Manipulate lkb's on rsb's convert/granted/waiting queues 1099 remove_lock -- used for unlock, removes lkb from granted 1100 revert_lock -- used for cancel, moves lkb from convert to granted 1101 grant_lock -- used for request and convert, adds lkb to granted or 1102 moves lkb from convert or waiting to granted 1103 1104 Each of these is used for master or local copy lkb's. There is 1105 also a _pc() variation used to make the corresponding change on 1106 a process copy (pc) lkb. */ 1107 1108 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 1109 { 1110 del_lkb(r, lkb); 1111 lkb->lkb_grmode = DLM_LOCK_IV; 1112 /* this unhold undoes the original ref from create_lkb() 1113 so this leads to the lkb being freed */ 1114 unhold_lkb(lkb); 1115 } 1116 1117 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 1118 { 1119 set_lvb_unlock(r, lkb); 1120 _remove_lock(r, lkb); 1121 } 1122 1123 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb) 1124 { 1125 _remove_lock(r, lkb); 1126 } 1127 1128 /* returns: 0 did nothing 1129 1 moved lock to granted 1130 -1 removed lock */ 1131 1132 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 1133 { 1134 int rv = 0; 1135 1136 lkb->lkb_rqmode = DLM_LOCK_IV; 1137 1138 switch (lkb->lkb_status) { 1139 case DLM_LKSTS_GRANTED: 1140 break; 1141 case DLM_LKSTS_CONVERT: 1142 move_lkb(r, lkb, DLM_LKSTS_GRANTED); 1143 rv = 1; 1144 break; 1145 case DLM_LKSTS_WAITING: 1146 del_lkb(r, lkb); 1147 lkb->lkb_grmode = DLM_LOCK_IV; 1148 /* this unhold undoes the original ref from create_lkb() 1149 so this leads to the lkb being freed */ 1150 unhold_lkb(lkb); 1151 rv = -1; 1152 break; 1153 default: 1154 log_print("invalid status for revert %d", lkb->lkb_status); 1155 } 1156 return rv; 1157 } 1158 1159 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb) 1160 { 1161 return revert_lock(r, lkb); 1162 } 1163 1164 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 1165 { 1166 if (lkb->lkb_grmode != lkb->lkb_rqmode) { 1167 lkb->lkb_grmode = lkb->lkb_rqmode; 1168 if (lkb->lkb_status) 1169 move_lkb(r, lkb, DLM_LKSTS_GRANTED); 1170 else 1171 add_lkb(r, lkb, DLM_LKSTS_GRANTED); 1172 } 1173 1174 lkb->lkb_rqmode = DLM_LOCK_IV; 1175 } 1176 1177 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 1178 { 1179 set_lvb_lock(r, lkb); 1180 _grant_lock(r, lkb); 1181 lkb->lkb_highbast = 0; 1182 } 1183 1184 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb, 1185 struct dlm_message *ms) 1186 { 1187 set_lvb_lock_pc(r, lkb, ms); 1188 _grant_lock(r, lkb); 1189 } 1190 1191 /* called by grant_pending_locks() which means an async grant message must 1192 be sent to the requesting node in addition to granting the lock if the 1193 lkb belongs to a remote node. */ 1194 1195 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb) 1196 { 1197 grant_lock(r, lkb); 1198 if (is_master_copy(lkb)) 1199 send_grant(r, lkb); 1200 else 1201 queue_cast(r, lkb, 0); 1202 } 1203 1204 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to 1205 change the granted/requested modes. We're munging things accordingly in 1206 the process copy. 1207 CONVDEADLK: our grmode may have been forced down to NL to resolve a 1208 conversion deadlock 1209 ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become 1210 compatible with other granted locks */ 1211 1212 static void munge_demoted(struct dlm_lkb *lkb, struct dlm_message *ms) 1213 { 1214 if (ms->m_type != DLM_MSG_CONVERT_REPLY) { 1215 log_print("munge_demoted %x invalid reply type %d", 1216 lkb->lkb_id, ms->m_type); 1217 return; 1218 } 1219 1220 if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) { 1221 log_print("munge_demoted %x invalid modes gr %d rq %d", 1222 lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode); 1223 return; 1224 } 1225 1226 lkb->lkb_grmode = DLM_LOCK_NL; 1227 } 1228 1229 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms) 1230 { 1231 if (ms->m_type != DLM_MSG_REQUEST_REPLY && 1232 ms->m_type != DLM_MSG_GRANT) { 1233 log_print("munge_altmode %x invalid reply type %d", 1234 lkb->lkb_id, ms->m_type); 1235 return; 1236 } 1237 1238 if (lkb->lkb_exflags & DLM_LKF_ALTPR) 1239 lkb->lkb_rqmode = DLM_LOCK_PR; 1240 else if (lkb->lkb_exflags & DLM_LKF_ALTCW) 1241 lkb->lkb_rqmode = DLM_LOCK_CW; 1242 else { 1243 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags); 1244 dlm_print_lkb(lkb); 1245 } 1246 } 1247 1248 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head) 1249 { 1250 struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb, 1251 lkb_statequeue); 1252 if (lkb->lkb_id == first->lkb_id) 1253 return 1; 1254 1255 return 0; 1256 } 1257 1258 /* Check if the given lkb conflicts with another lkb on the queue. */ 1259 1260 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb) 1261 { 1262 struct dlm_lkb *this; 1263 1264 list_for_each_entry(this, head, lkb_statequeue) { 1265 if (this == lkb) 1266 continue; 1267 if (!modes_compat(this, lkb)) 1268 return 1; 1269 } 1270 return 0; 1271 } 1272 1273 /* 1274 * "A conversion deadlock arises with a pair of lock requests in the converting 1275 * queue for one resource. The granted mode of each lock blocks the requested 1276 * mode of the other lock." 1277 * 1278 * Part 2: if the granted mode of lkb is preventing the first lkb in the 1279 * convert queue from being granted, then demote lkb (set grmode to NL). 1280 * This second form requires that we check for conv-deadlk even when 1281 * now == 0 in _can_be_granted(). 1282 * 1283 * Example: 1284 * Granted Queue: empty 1285 * Convert Queue: NL->EX (first lock) 1286 * PR->EX (second lock) 1287 * 1288 * The first lock can't be granted because of the granted mode of the second 1289 * lock and the second lock can't be granted because it's not first in the 1290 * list. We demote the granted mode of the second lock (the lkb passed to this 1291 * function). 1292 * 1293 * After the resolution, the "grant pending" function needs to go back and try 1294 * to grant locks on the convert queue again since the first lock can now be 1295 * granted. 1296 */ 1297 1298 static int conversion_deadlock_detect(struct dlm_rsb *rsb, struct dlm_lkb *lkb) 1299 { 1300 struct dlm_lkb *this, *first = NULL, *self = NULL; 1301 1302 list_for_each_entry(this, &rsb->res_convertqueue, lkb_statequeue) { 1303 if (!first) 1304 first = this; 1305 if (this == lkb) { 1306 self = lkb; 1307 continue; 1308 } 1309 1310 if (!modes_compat(this, lkb) && !modes_compat(lkb, this)) 1311 return 1; 1312 } 1313 1314 /* if lkb is on the convert queue and is preventing the first 1315 from being granted, then there's deadlock and we demote lkb. 1316 multiple converting locks may need to do this before the first 1317 converting lock can be granted. */ 1318 1319 if (self && self != first) { 1320 if (!modes_compat(lkb, first) && 1321 !queue_conflict(&rsb->res_grantqueue, first)) 1322 return 1; 1323 } 1324 1325 return 0; 1326 } 1327 1328 /* 1329 * Return 1 if the lock can be granted, 0 otherwise. 1330 * Also detect and resolve conversion deadlocks. 1331 * 1332 * lkb is the lock to be granted 1333 * 1334 * now is 1 if the function is being called in the context of the 1335 * immediate request, it is 0 if called later, after the lock has been 1336 * queued. 1337 * 1338 * References are from chapter 6 of "VAXcluster Principles" by Roy Davis 1339 */ 1340 1341 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now) 1342 { 1343 int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV); 1344 1345 /* 1346 * 6-10: Version 5.4 introduced an option to address the phenomenon of 1347 * a new request for a NL mode lock being blocked. 1348 * 1349 * 6-11: If the optional EXPEDITE flag is used with the new NL mode 1350 * request, then it would be granted. In essence, the use of this flag 1351 * tells the Lock Manager to expedite theis request by not considering 1352 * what may be in the CONVERTING or WAITING queues... As of this 1353 * writing, the EXPEDITE flag can be used only with new requests for NL 1354 * mode locks. This flag is not valid for conversion requests. 1355 * 1356 * A shortcut. Earlier checks return an error if EXPEDITE is used in a 1357 * conversion or used with a non-NL requested mode. We also know an 1358 * EXPEDITE request is always granted immediately, so now must always 1359 * be 1. The full condition to grant an expedite request: (now && 1360 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can 1361 * therefore be shortened to just checking the flag. 1362 */ 1363 1364 if (lkb->lkb_exflags & DLM_LKF_EXPEDITE) 1365 return 1; 1366 1367 /* 1368 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be 1369 * added to the remaining conditions. 1370 */ 1371 1372 if (queue_conflict(&r->res_grantqueue, lkb)) 1373 goto out; 1374 1375 /* 1376 * 6-3: By default, a conversion request is immediately granted if the 1377 * requested mode is compatible with the modes of all other granted 1378 * locks 1379 */ 1380 1381 if (queue_conflict(&r->res_convertqueue, lkb)) 1382 goto out; 1383 1384 /* 1385 * 6-5: But the default algorithm for deciding whether to grant or 1386 * queue conversion requests does not by itself guarantee that such 1387 * requests are serviced on a "first come first serve" basis. This, in 1388 * turn, can lead to a phenomenon known as "indefinate postponement". 1389 * 1390 * 6-7: This issue is dealt with by using the optional QUECVT flag with 1391 * the system service employed to request a lock conversion. This flag 1392 * forces certain conversion requests to be queued, even if they are 1393 * compatible with the granted modes of other locks on the same 1394 * resource. Thus, the use of this flag results in conversion requests 1395 * being ordered on a "first come first servce" basis. 1396 * 1397 * DCT: This condition is all about new conversions being able to occur 1398 * "in place" while the lock remains on the granted queue (assuming 1399 * nothing else conflicts.) IOW if QUECVT isn't set, a conversion 1400 * doesn't _have_ to go onto the convert queue where it's processed in 1401 * order. The "now" variable is necessary to distinguish converts 1402 * being received and processed for the first time now, because once a 1403 * convert is moved to the conversion queue the condition below applies 1404 * requiring fifo granting. 1405 */ 1406 1407 if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT)) 1408 return 1; 1409 1410 /* 1411 * The NOORDER flag is set to avoid the standard vms rules on grant 1412 * order. 1413 */ 1414 1415 if (lkb->lkb_exflags & DLM_LKF_NOORDER) 1416 return 1; 1417 1418 /* 1419 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be 1420 * granted until all other conversion requests ahead of it are granted 1421 * and/or canceled. 1422 */ 1423 1424 if (!now && conv && first_in_list(lkb, &r->res_convertqueue)) 1425 return 1; 1426 1427 /* 1428 * 6-4: By default, a new request is immediately granted only if all 1429 * three of the following conditions are satisfied when the request is 1430 * issued: 1431 * - The queue of ungranted conversion requests for the resource is 1432 * empty. 1433 * - The queue of ungranted new requests for the resource is empty. 1434 * - The mode of the new request is compatible with the most 1435 * restrictive mode of all granted locks on the resource. 1436 */ 1437 1438 if (now && !conv && list_empty(&r->res_convertqueue) && 1439 list_empty(&r->res_waitqueue)) 1440 return 1; 1441 1442 /* 1443 * 6-4: Once a lock request is in the queue of ungranted new requests, 1444 * it cannot be granted until the queue of ungranted conversion 1445 * requests is empty, all ungranted new requests ahead of it are 1446 * granted and/or canceled, and it is compatible with the granted mode 1447 * of the most restrictive lock granted on the resource. 1448 */ 1449 1450 if (!now && !conv && list_empty(&r->res_convertqueue) && 1451 first_in_list(lkb, &r->res_waitqueue)) 1452 return 1; 1453 1454 out: 1455 /* 1456 * The following, enabled by CONVDEADLK, departs from VMS. 1457 */ 1458 1459 if (conv && (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) && 1460 conversion_deadlock_detect(r, lkb)) { 1461 lkb->lkb_grmode = DLM_LOCK_NL; 1462 lkb->lkb_sbflags |= DLM_SBF_DEMOTED; 1463 } 1464 1465 return 0; 1466 } 1467 1468 /* 1469 * The ALTPR and ALTCW flags aren't traditional lock manager flags, but are a 1470 * simple way to provide a big optimization to applications that can use them. 1471 */ 1472 1473 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now) 1474 { 1475 uint32_t flags = lkb->lkb_exflags; 1476 int rv; 1477 int8_t alt = 0, rqmode = lkb->lkb_rqmode; 1478 1479 rv = _can_be_granted(r, lkb, now); 1480 if (rv) 1481 goto out; 1482 1483 if (lkb->lkb_sbflags & DLM_SBF_DEMOTED) 1484 goto out; 1485 1486 if (rqmode != DLM_LOCK_PR && flags & DLM_LKF_ALTPR) 1487 alt = DLM_LOCK_PR; 1488 else if (rqmode != DLM_LOCK_CW && flags & DLM_LKF_ALTCW) 1489 alt = DLM_LOCK_CW; 1490 1491 if (alt) { 1492 lkb->lkb_rqmode = alt; 1493 rv = _can_be_granted(r, lkb, now); 1494 if (rv) 1495 lkb->lkb_sbflags |= DLM_SBF_ALTMODE; 1496 else 1497 lkb->lkb_rqmode = rqmode; 1498 } 1499 out: 1500 return rv; 1501 } 1502 1503 static int grant_pending_convert(struct dlm_rsb *r, int high) 1504 { 1505 struct dlm_lkb *lkb, *s; 1506 int hi, demoted, quit, grant_restart, demote_restart; 1507 1508 quit = 0; 1509 restart: 1510 grant_restart = 0; 1511 demote_restart = 0; 1512 hi = DLM_LOCK_IV; 1513 1514 list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) { 1515 demoted = is_demoted(lkb); 1516 if (can_be_granted(r, lkb, 0)) { 1517 grant_lock_pending(r, lkb); 1518 grant_restart = 1; 1519 } else { 1520 hi = max_t(int, lkb->lkb_rqmode, hi); 1521 if (!demoted && is_demoted(lkb)) 1522 demote_restart = 1; 1523 } 1524 } 1525 1526 if (grant_restart) 1527 goto restart; 1528 if (demote_restart && !quit) { 1529 quit = 1; 1530 goto restart; 1531 } 1532 1533 return max_t(int, high, hi); 1534 } 1535 1536 static int grant_pending_wait(struct dlm_rsb *r, int high) 1537 { 1538 struct dlm_lkb *lkb, *s; 1539 1540 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) { 1541 if (can_be_granted(r, lkb, 0)) 1542 grant_lock_pending(r, lkb); 1543 else 1544 high = max_t(int, lkb->lkb_rqmode, high); 1545 } 1546 1547 return high; 1548 } 1549 1550 static void grant_pending_locks(struct dlm_rsb *r) 1551 { 1552 struct dlm_lkb *lkb, *s; 1553 int high = DLM_LOCK_IV; 1554 1555 DLM_ASSERT(is_master(r), dlm_dump_rsb(r);); 1556 1557 high = grant_pending_convert(r, high); 1558 high = grant_pending_wait(r, high); 1559 1560 if (high == DLM_LOCK_IV) 1561 return; 1562 1563 /* 1564 * If there are locks left on the wait/convert queue then send blocking 1565 * ASTs to granted locks based on the largest requested mode (high) 1566 * found above. FIXME: highbast < high comparison not valid for PR/CW. 1567 */ 1568 1569 list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) { 1570 if (lkb->lkb_bastaddr && (lkb->lkb_highbast < high) && 1571 !__dlm_compat_matrix[lkb->lkb_grmode+1][high+1]) { 1572 queue_bast(r, lkb, high); 1573 lkb->lkb_highbast = high; 1574 } 1575 } 1576 } 1577 1578 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head, 1579 struct dlm_lkb *lkb) 1580 { 1581 struct dlm_lkb *gr; 1582 1583 list_for_each_entry(gr, head, lkb_statequeue) { 1584 if (gr->lkb_bastaddr && 1585 gr->lkb_highbast < lkb->lkb_rqmode && 1586 !modes_compat(gr, lkb)) { 1587 queue_bast(r, gr, lkb->lkb_rqmode); 1588 gr->lkb_highbast = lkb->lkb_rqmode; 1589 } 1590 } 1591 } 1592 1593 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb) 1594 { 1595 send_bast_queue(r, &r->res_grantqueue, lkb); 1596 } 1597 1598 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb) 1599 { 1600 send_bast_queue(r, &r->res_grantqueue, lkb); 1601 send_bast_queue(r, &r->res_convertqueue, lkb); 1602 } 1603 1604 /* set_master(r, lkb) -- set the master nodeid of a resource 1605 1606 The purpose of this function is to set the nodeid field in the given 1607 lkb using the nodeid field in the given rsb. If the rsb's nodeid is 1608 known, it can just be copied to the lkb and the function will return 1609 0. If the rsb's nodeid is _not_ known, it needs to be looked up 1610 before it can be copied to the lkb. 1611 1612 When the rsb nodeid is being looked up remotely, the initial lkb 1613 causing the lookup is kept on the ls_waiters list waiting for the 1614 lookup reply. Other lkb's waiting for the same rsb lookup are kept 1615 on the rsb's res_lookup list until the master is verified. 1616 1617 Return values: 1618 0: nodeid is set in rsb/lkb and the caller should go ahead and use it 1619 1: the rsb master is not available and the lkb has been placed on 1620 a wait queue 1621 */ 1622 1623 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb) 1624 { 1625 struct dlm_ls *ls = r->res_ls; 1626 int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid(); 1627 1628 if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) { 1629 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN); 1630 r->res_first_lkid = lkb->lkb_id; 1631 lkb->lkb_nodeid = r->res_nodeid; 1632 return 0; 1633 } 1634 1635 if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) { 1636 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup); 1637 return 1; 1638 } 1639 1640 if (r->res_nodeid == 0) { 1641 lkb->lkb_nodeid = 0; 1642 return 0; 1643 } 1644 1645 if (r->res_nodeid > 0) { 1646 lkb->lkb_nodeid = r->res_nodeid; 1647 return 0; 1648 } 1649 1650 DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r);); 1651 1652 dir_nodeid = dlm_dir_nodeid(r); 1653 1654 if (dir_nodeid != our_nodeid) { 1655 r->res_first_lkid = lkb->lkb_id; 1656 send_lookup(r, lkb); 1657 return 1; 1658 } 1659 1660 for (;;) { 1661 /* It's possible for dlm_scand to remove an old rsb for 1662 this same resource from the toss list, us to create 1663 a new one, look up the master locally, and find it 1664 already exists just before dlm_scand does the 1665 dir_remove() on the previous rsb. */ 1666 1667 error = dlm_dir_lookup(ls, our_nodeid, r->res_name, 1668 r->res_length, &ret_nodeid); 1669 if (!error) 1670 break; 1671 log_debug(ls, "dir_lookup error %d %s", error, r->res_name); 1672 schedule(); 1673 } 1674 1675 if (ret_nodeid == our_nodeid) { 1676 r->res_first_lkid = 0; 1677 r->res_nodeid = 0; 1678 lkb->lkb_nodeid = 0; 1679 } else { 1680 r->res_first_lkid = lkb->lkb_id; 1681 r->res_nodeid = ret_nodeid; 1682 lkb->lkb_nodeid = ret_nodeid; 1683 } 1684 return 0; 1685 } 1686 1687 static void process_lookup_list(struct dlm_rsb *r) 1688 { 1689 struct dlm_lkb *lkb, *safe; 1690 1691 list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) { 1692 list_del_init(&lkb->lkb_rsb_lookup); 1693 _request_lock(r, lkb); 1694 schedule(); 1695 } 1696 } 1697 1698 /* confirm_master -- confirm (or deny) an rsb's master nodeid */ 1699 1700 static void confirm_master(struct dlm_rsb *r, int error) 1701 { 1702 struct dlm_lkb *lkb; 1703 1704 if (!r->res_first_lkid) 1705 return; 1706 1707 switch (error) { 1708 case 0: 1709 case -EINPROGRESS: 1710 r->res_first_lkid = 0; 1711 process_lookup_list(r); 1712 break; 1713 1714 case -EAGAIN: 1715 /* the remote master didn't queue our NOQUEUE request; 1716 make a waiting lkb the first_lkid */ 1717 1718 r->res_first_lkid = 0; 1719 1720 if (!list_empty(&r->res_lookup)) { 1721 lkb = list_entry(r->res_lookup.next, struct dlm_lkb, 1722 lkb_rsb_lookup); 1723 list_del_init(&lkb->lkb_rsb_lookup); 1724 r->res_first_lkid = lkb->lkb_id; 1725 _request_lock(r, lkb); 1726 } else 1727 r->res_nodeid = -1; 1728 break; 1729 1730 default: 1731 log_error(r->res_ls, "confirm_master unknown error %d", error); 1732 } 1733 } 1734 1735 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags, 1736 int namelen, uint32_t parent_lkid, void *ast, 1737 void *astarg, void *bast, struct dlm_args *args) 1738 { 1739 int rv = -EINVAL; 1740 1741 /* check for invalid arg usage */ 1742 1743 if (mode < 0 || mode > DLM_LOCK_EX) 1744 goto out; 1745 1746 if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN)) 1747 goto out; 1748 1749 if (flags & DLM_LKF_CANCEL) 1750 goto out; 1751 1752 if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT)) 1753 goto out; 1754 1755 if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT)) 1756 goto out; 1757 1758 if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE) 1759 goto out; 1760 1761 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT) 1762 goto out; 1763 1764 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT) 1765 goto out; 1766 1767 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE) 1768 goto out; 1769 1770 if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL) 1771 goto out; 1772 1773 if (!ast || !lksb) 1774 goto out; 1775 1776 if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr) 1777 goto out; 1778 1779 /* parent/child locks not yet supported */ 1780 if (parent_lkid) 1781 goto out; 1782 1783 if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid) 1784 goto out; 1785 1786 /* these args will be copied to the lkb in validate_lock_args, 1787 it cannot be done now because when converting locks, fields in 1788 an active lkb cannot be modified before locking the rsb */ 1789 1790 args->flags = flags; 1791 args->astaddr = ast; 1792 args->astparam = (long) astarg; 1793 args->bastaddr = bast; 1794 args->mode = mode; 1795 args->lksb = lksb; 1796 rv = 0; 1797 out: 1798 return rv; 1799 } 1800 1801 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args) 1802 { 1803 if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK | 1804 DLM_LKF_FORCEUNLOCK)) 1805 return -EINVAL; 1806 1807 if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK) 1808 return -EINVAL; 1809 1810 args->flags = flags; 1811 args->astparam = (long) astarg; 1812 return 0; 1813 } 1814 1815 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, 1816 struct dlm_args *args) 1817 { 1818 int rv = -EINVAL; 1819 1820 if (args->flags & DLM_LKF_CONVERT) { 1821 if (lkb->lkb_flags & DLM_IFL_MSTCPY) 1822 goto out; 1823 1824 if (args->flags & DLM_LKF_QUECVT && 1825 !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1]) 1826 goto out; 1827 1828 rv = -EBUSY; 1829 if (lkb->lkb_status != DLM_LKSTS_GRANTED) 1830 goto out; 1831 1832 if (lkb->lkb_wait_type) 1833 goto out; 1834 1835 if (is_overlap(lkb)) 1836 goto out; 1837 } 1838 1839 lkb->lkb_exflags = args->flags; 1840 lkb->lkb_sbflags = 0; 1841 lkb->lkb_astaddr = args->astaddr; 1842 lkb->lkb_astparam = args->astparam; 1843 lkb->lkb_bastaddr = args->bastaddr; 1844 lkb->lkb_rqmode = args->mode; 1845 lkb->lkb_lksb = args->lksb; 1846 lkb->lkb_lvbptr = args->lksb->sb_lvbptr; 1847 lkb->lkb_ownpid = (int) current->pid; 1848 rv = 0; 1849 out: 1850 return rv; 1851 } 1852 1853 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0 1854 for success */ 1855 1856 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here 1857 because there may be a lookup in progress and it's valid to do 1858 cancel/unlockf on it */ 1859 1860 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args) 1861 { 1862 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 1863 int rv = -EINVAL; 1864 1865 if (lkb->lkb_flags & DLM_IFL_MSTCPY) { 1866 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id); 1867 dlm_print_lkb(lkb); 1868 goto out; 1869 } 1870 1871 /* an lkb may still exist even though the lock is EOL'ed due to a 1872 cancel, unlock or failed noqueue request; an app can't use these 1873 locks; return same error as if the lkid had not been found at all */ 1874 1875 if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) { 1876 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id); 1877 rv = -ENOENT; 1878 goto out; 1879 } 1880 1881 /* an lkb may be waiting for an rsb lookup to complete where the 1882 lookup was initiated by another lock */ 1883 1884 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) { 1885 if (!list_empty(&lkb->lkb_rsb_lookup)) { 1886 log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id); 1887 list_del_init(&lkb->lkb_rsb_lookup); 1888 queue_cast(lkb->lkb_resource, lkb, 1889 args->flags & DLM_LKF_CANCEL ? 1890 -DLM_ECANCEL : -DLM_EUNLOCK); 1891 unhold_lkb(lkb); /* undoes create_lkb() */ 1892 rv = -EBUSY; 1893 goto out; 1894 } 1895 } 1896 1897 /* cancel not allowed with another cancel/unlock in progress */ 1898 1899 if (args->flags & DLM_LKF_CANCEL) { 1900 if (lkb->lkb_exflags & DLM_LKF_CANCEL) 1901 goto out; 1902 1903 if (is_overlap(lkb)) 1904 goto out; 1905 1906 if (lkb->lkb_flags & DLM_IFL_RESEND) { 1907 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL; 1908 rv = -EBUSY; 1909 goto out; 1910 } 1911 1912 switch (lkb->lkb_wait_type) { 1913 case DLM_MSG_LOOKUP: 1914 case DLM_MSG_REQUEST: 1915 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL; 1916 rv = -EBUSY; 1917 goto out; 1918 case DLM_MSG_UNLOCK: 1919 case DLM_MSG_CANCEL: 1920 goto out; 1921 } 1922 /* add_to_waiters() will set OVERLAP_CANCEL */ 1923 goto out_ok; 1924 } 1925 1926 /* do we need to allow a force-unlock if there's a normal unlock 1927 already in progress? in what conditions could the normal unlock 1928 fail such that we'd want to send a force-unlock to be sure? */ 1929 1930 if (args->flags & DLM_LKF_FORCEUNLOCK) { 1931 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK) 1932 goto out; 1933 1934 if (is_overlap_unlock(lkb)) 1935 goto out; 1936 1937 if (lkb->lkb_flags & DLM_IFL_RESEND) { 1938 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK; 1939 rv = -EBUSY; 1940 goto out; 1941 } 1942 1943 switch (lkb->lkb_wait_type) { 1944 case DLM_MSG_LOOKUP: 1945 case DLM_MSG_REQUEST: 1946 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK; 1947 rv = -EBUSY; 1948 goto out; 1949 case DLM_MSG_UNLOCK: 1950 goto out; 1951 } 1952 /* add_to_waiters() will set OVERLAP_UNLOCK */ 1953 goto out_ok; 1954 } 1955 1956 /* normal unlock not allowed if there's any op in progress */ 1957 rv = -EBUSY; 1958 if (lkb->lkb_wait_type || lkb->lkb_wait_count) 1959 goto out; 1960 1961 out_ok: 1962 /* an overlapping op shouldn't blow away exflags from other op */ 1963 lkb->lkb_exflags |= args->flags; 1964 lkb->lkb_sbflags = 0; 1965 lkb->lkb_astparam = args->astparam; 1966 rv = 0; 1967 out: 1968 if (rv) 1969 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv, 1970 lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags, 1971 args->flags, lkb->lkb_wait_type, 1972 lkb->lkb_resource->res_name); 1973 return rv; 1974 } 1975 1976 /* 1977 * Four stage 4 varieties: 1978 * do_request(), do_convert(), do_unlock(), do_cancel() 1979 * These are called on the master node for the given lock and 1980 * from the central locking logic. 1981 */ 1982 1983 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb) 1984 { 1985 int error = 0; 1986 1987 if (can_be_granted(r, lkb, 1)) { 1988 grant_lock(r, lkb); 1989 queue_cast(r, lkb, 0); 1990 goto out; 1991 } 1992 1993 if (can_be_queued(lkb)) { 1994 error = -EINPROGRESS; 1995 add_lkb(r, lkb, DLM_LKSTS_WAITING); 1996 send_blocking_asts(r, lkb); 1997 goto out; 1998 } 1999 2000 error = -EAGAIN; 2001 if (force_blocking_asts(lkb)) 2002 send_blocking_asts_all(r, lkb); 2003 queue_cast(r, lkb, -EAGAIN); 2004 2005 out: 2006 return error; 2007 } 2008 2009 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb) 2010 { 2011 int error = 0; 2012 2013 /* changing an existing lock may allow others to be granted */ 2014 2015 if (can_be_granted(r, lkb, 1)) { 2016 grant_lock(r, lkb); 2017 queue_cast(r, lkb, 0); 2018 grant_pending_locks(r); 2019 goto out; 2020 } 2021 2022 /* is_demoted() means the can_be_granted() above set the grmode 2023 to NL, and left us on the granted queue. This auto-demotion 2024 (due to CONVDEADLK) might mean other locks, and/or this lock, are 2025 now grantable. We have to try to grant other converting locks 2026 before we try again to grant this one. */ 2027 2028 if (is_demoted(lkb)) { 2029 grant_pending_convert(r, DLM_LOCK_IV); 2030 if (_can_be_granted(r, lkb, 1)) { 2031 grant_lock(r, lkb); 2032 queue_cast(r, lkb, 0); 2033 grant_pending_locks(r); 2034 goto out; 2035 } 2036 /* else fall through and move to convert queue */ 2037 } 2038 2039 if (can_be_queued(lkb)) { 2040 error = -EINPROGRESS; 2041 del_lkb(r, lkb); 2042 add_lkb(r, lkb, DLM_LKSTS_CONVERT); 2043 send_blocking_asts(r, lkb); 2044 goto out; 2045 } 2046 2047 error = -EAGAIN; 2048 if (force_blocking_asts(lkb)) 2049 send_blocking_asts_all(r, lkb); 2050 queue_cast(r, lkb, -EAGAIN); 2051 2052 out: 2053 return error; 2054 } 2055 2056 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb) 2057 { 2058 remove_lock(r, lkb); 2059 queue_cast(r, lkb, -DLM_EUNLOCK); 2060 grant_pending_locks(r); 2061 return -DLM_EUNLOCK; 2062 } 2063 2064 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */ 2065 2066 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb) 2067 { 2068 int error; 2069 2070 error = revert_lock(r, lkb); 2071 if (error) { 2072 queue_cast(r, lkb, -DLM_ECANCEL); 2073 grant_pending_locks(r); 2074 return -DLM_ECANCEL; 2075 } 2076 return 0; 2077 } 2078 2079 /* 2080 * Four stage 3 varieties: 2081 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock() 2082 */ 2083 2084 /* add a new lkb to a possibly new rsb, called by requesting process */ 2085 2086 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 2087 { 2088 int error; 2089 2090 /* set_master: sets lkb nodeid from r */ 2091 2092 error = set_master(r, lkb); 2093 if (error < 0) 2094 goto out; 2095 if (error) { 2096 error = 0; 2097 goto out; 2098 } 2099 2100 if (is_remote(r)) 2101 /* receive_request() calls do_request() on remote node */ 2102 error = send_request(r, lkb); 2103 else 2104 error = do_request(r, lkb); 2105 out: 2106 return error; 2107 } 2108 2109 /* change some property of an existing lkb, e.g. mode */ 2110 2111 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 2112 { 2113 int error; 2114 2115 if (is_remote(r)) 2116 /* receive_convert() calls do_convert() on remote node */ 2117 error = send_convert(r, lkb); 2118 else 2119 error = do_convert(r, lkb); 2120 2121 return error; 2122 } 2123 2124 /* remove an existing lkb from the granted queue */ 2125 2126 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 2127 { 2128 int error; 2129 2130 if (is_remote(r)) 2131 /* receive_unlock() calls do_unlock() on remote node */ 2132 error = send_unlock(r, lkb); 2133 else 2134 error = do_unlock(r, lkb); 2135 2136 return error; 2137 } 2138 2139 /* remove an existing lkb from the convert or wait queue */ 2140 2141 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 2142 { 2143 int error; 2144 2145 if (is_remote(r)) 2146 /* receive_cancel() calls do_cancel() on remote node */ 2147 error = send_cancel(r, lkb); 2148 else 2149 error = do_cancel(r, lkb); 2150 2151 return error; 2152 } 2153 2154 /* 2155 * Four stage 2 varieties: 2156 * request_lock(), convert_lock(), unlock_lock(), cancel_lock() 2157 */ 2158 2159 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name, 2160 int len, struct dlm_args *args) 2161 { 2162 struct dlm_rsb *r; 2163 int error; 2164 2165 error = validate_lock_args(ls, lkb, args); 2166 if (error) 2167 goto out; 2168 2169 error = find_rsb(ls, name, len, R_CREATE, &r); 2170 if (error) 2171 goto out; 2172 2173 lock_rsb(r); 2174 2175 attach_lkb(r, lkb); 2176 lkb->lkb_lksb->sb_lkid = lkb->lkb_id; 2177 2178 error = _request_lock(r, lkb); 2179 2180 unlock_rsb(r); 2181 put_rsb(r); 2182 2183 out: 2184 return error; 2185 } 2186 2187 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, 2188 struct dlm_args *args) 2189 { 2190 struct dlm_rsb *r; 2191 int error; 2192 2193 r = lkb->lkb_resource; 2194 2195 hold_rsb(r); 2196 lock_rsb(r); 2197 2198 error = validate_lock_args(ls, lkb, args); 2199 if (error) 2200 goto out; 2201 2202 error = _convert_lock(r, lkb); 2203 out: 2204 unlock_rsb(r); 2205 put_rsb(r); 2206 return error; 2207 } 2208 2209 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, 2210 struct dlm_args *args) 2211 { 2212 struct dlm_rsb *r; 2213 int error; 2214 2215 r = lkb->lkb_resource; 2216 2217 hold_rsb(r); 2218 lock_rsb(r); 2219 2220 error = validate_unlock_args(lkb, args); 2221 if (error) 2222 goto out; 2223 2224 error = _unlock_lock(r, lkb); 2225 out: 2226 unlock_rsb(r); 2227 put_rsb(r); 2228 return error; 2229 } 2230 2231 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, 2232 struct dlm_args *args) 2233 { 2234 struct dlm_rsb *r; 2235 int error; 2236 2237 r = lkb->lkb_resource; 2238 2239 hold_rsb(r); 2240 lock_rsb(r); 2241 2242 error = validate_unlock_args(lkb, args); 2243 if (error) 2244 goto out; 2245 2246 error = _cancel_lock(r, lkb); 2247 out: 2248 unlock_rsb(r); 2249 put_rsb(r); 2250 return error; 2251 } 2252 2253 /* 2254 * Two stage 1 varieties: dlm_lock() and dlm_unlock() 2255 */ 2256 2257 int dlm_lock(dlm_lockspace_t *lockspace, 2258 int mode, 2259 struct dlm_lksb *lksb, 2260 uint32_t flags, 2261 void *name, 2262 unsigned int namelen, 2263 uint32_t parent_lkid, 2264 void (*ast) (void *astarg), 2265 void *astarg, 2266 void (*bast) (void *astarg, int mode)) 2267 { 2268 struct dlm_ls *ls; 2269 struct dlm_lkb *lkb; 2270 struct dlm_args args; 2271 int error, convert = flags & DLM_LKF_CONVERT; 2272 2273 ls = dlm_find_lockspace_local(lockspace); 2274 if (!ls) 2275 return -EINVAL; 2276 2277 lock_recovery(ls); 2278 2279 if (convert) 2280 error = find_lkb(ls, lksb->sb_lkid, &lkb); 2281 else 2282 error = create_lkb(ls, &lkb); 2283 2284 if (error) 2285 goto out; 2286 2287 error = set_lock_args(mode, lksb, flags, namelen, parent_lkid, ast, 2288 astarg, bast, &args); 2289 if (error) 2290 goto out_put; 2291 2292 if (convert) 2293 error = convert_lock(ls, lkb, &args); 2294 else 2295 error = request_lock(ls, lkb, name, namelen, &args); 2296 2297 if (error == -EINPROGRESS) 2298 error = 0; 2299 out_put: 2300 if (convert || error) 2301 __put_lkb(ls, lkb); 2302 if (error == -EAGAIN) 2303 error = 0; 2304 out: 2305 unlock_recovery(ls); 2306 dlm_put_lockspace(ls); 2307 return error; 2308 } 2309 2310 int dlm_unlock(dlm_lockspace_t *lockspace, 2311 uint32_t lkid, 2312 uint32_t flags, 2313 struct dlm_lksb *lksb, 2314 void *astarg) 2315 { 2316 struct dlm_ls *ls; 2317 struct dlm_lkb *lkb; 2318 struct dlm_args args; 2319 int error; 2320 2321 ls = dlm_find_lockspace_local(lockspace); 2322 if (!ls) 2323 return -EINVAL; 2324 2325 lock_recovery(ls); 2326 2327 error = find_lkb(ls, lkid, &lkb); 2328 if (error) 2329 goto out; 2330 2331 error = set_unlock_args(flags, astarg, &args); 2332 if (error) 2333 goto out_put; 2334 2335 if (flags & DLM_LKF_CANCEL) 2336 error = cancel_lock(ls, lkb, &args); 2337 else 2338 error = unlock_lock(ls, lkb, &args); 2339 2340 if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL) 2341 error = 0; 2342 if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK))) 2343 error = 0; 2344 out_put: 2345 dlm_put_lkb(lkb); 2346 out: 2347 unlock_recovery(ls); 2348 dlm_put_lockspace(ls); 2349 return error; 2350 } 2351 2352 /* 2353 * send/receive routines for remote operations and replies 2354 * 2355 * send_args 2356 * send_common 2357 * send_request receive_request 2358 * send_convert receive_convert 2359 * send_unlock receive_unlock 2360 * send_cancel receive_cancel 2361 * send_grant receive_grant 2362 * send_bast receive_bast 2363 * send_lookup receive_lookup 2364 * send_remove receive_remove 2365 * 2366 * send_common_reply 2367 * receive_request_reply send_request_reply 2368 * receive_convert_reply send_convert_reply 2369 * receive_unlock_reply send_unlock_reply 2370 * receive_cancel_reply send_cancel_reply 2371 * receive_lookup_reply send_lookup_reply 2372 */ 2373 2374 static int _create_message(struct dlm_ls *ls, int mb_len, 2375 int to_nodeid, int mstype, 2376 struct dlm_message **ms_ret, 2377 struct dlm_mhandle **mh_ret) 2378 { 2379 struct dlm_message *ms; 2380 struct dlm_mhandle *mh; 2381 char *mb; 2382 2383 /* get_buffer gives us a message handle (mh) that we need to 2384 pass into lowcomms_commit and a message buffer (mb) that we 2385 write our data into */ 2386 2387 mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_KERNEL, &mb); 2388 if (!mh) 2389 return -ENOBUFS; 2390 2391 memset(mb, 0, mb_len); 2392 2393 ms = (struct dlm_message *) mb; 2394 2395 ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR); 2396 ms->m_header.h_lockspace = ls->ls_global_id; 2397 ms->m_header.h_nodeid = dlm_our_nodeid(); 2398 ms->m_header.h_length = mb_len; 2399 ms->m_header.h_cmd = DLM_MSG; 2400 2401 ms->m_type = mstype; 2402 2403 *mh_ret = mh; 2404 *ms_ret = ms; 2405 return 0; 2406 } 2407 2408 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb, 2409 int to_nodeid, int mstype, 2410 struct dlm_message **ms_ret, 2411 struct dlm_mhandle **mh_ret) 2412 { 2413 int mb_len = sizeof(struct dlm_message); 2414 2415 switch (mstype) { 2416 case DLM_MSG_REQUEST: 2417 case DLM_MSG_LOOKUP: 2418 case DLM_MSG_REMOVE: 2419 mb_len += r->res_length; 2420 break; 2421 case DLM_MSG_CONVERT: 2422 case DLM_MSG_UNLOCK: 2423 case DLM_MSG_REQUEST_REPLY: 2424 case DLM_MSG_CONVERT_REPLY: 2425 case DLM_MSG_GRANT: 2426 if (lkb && lkb->lkb_lvbptr) 2427 mb_len += r->res_ls->ls_lvblen; 2428 break; 2429 } 2430 2431 return _create_message(r->res_ls, mb_len, to_nodeid, mstype, 2432 ms_ret, mh_ret); 2433 } 2434 2435 /* further lowcomms enhancements or alternate implementations may make 2436 the return value from this function useful at some point */ 2437 2438 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms) 2439 { 2440 dlm_message_out(ms); 2441 dlm_lowcomms_commit_buffer(mh); 2442 return 0; 2443 } 2444 2445 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb, 2446 struct dlm_message *ms) 2447 { 2448 ms->m_nodeid = lkb->lkb_nodeid; 2449 ms->m_pid = lkb->lkb_ownpid; 2450 ms->m_lkid = lkb->lkb_id; 2451 ms->m_remid = lkb->lkb_remid; 2452 ms->m_exflags = lkb->lkb_exflags; 2453 ms->m_sbflags = lkb->lkb_sbflags; 2454 ms->m_flags = lkb->lkb_flags; 2455 ms->m_lvbseq = lkb->lkb_lvbseq; 2456 ms->m_status = lkb->lkb_status; 2457 ms->m_grmode = lkb->lkb_grmode; 2458 ms->m_rqmode = lkb->lkb_rqmode; 2459 ms->m_hash = r->res_hash; 2460 2461 /* m_result and m_bastmode are set from function args, 2462 not from lkb fields */ 2463 2464 if (lkb->lkb_bastaddr) 2465 ms->m_asts |= AST_BAST; 2466 if (lkb->lkb_astaddr) 2467 ms->m_asts |= AST_COMP; 2468 2469 /* compare with switch in create_message; send_remove() doesn't 2470 use send_args() */ 2471 2472 switch (ms->m_type) { 2473 case DLM_MSG_REQUEST: 2474 case DLM_MSG_LOOKUP: 2475 memcpy(ms->m_extra, r->res_name, r->res_length); 2476 break; 2477 case DLM_MSG_CONVERT: 2478 case DLM_MSG_UNLOCK: 2479 case DLM_MSG_REQUEST_REPLY: 2480 case DLM_MSG_CONVERT_REPLY: 2481 case DLM_MSG_GRANT: 2482 if (!lkb->lkb_lvbptr) 2483 break; 2484 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen); 2485 break; 2486 } 2487 } 2488 2489 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype) 2490 { 2491 struct dlm_message *ms; 2492 struct dlm_mhandle *mh; 2493 int to_nodeid, error; 2494 2495 error = add_to_waiters(lkb, mstype); 2496 if (error) 2497 return error; 2498 2499 to_nodeid = r->res_nodeid; 2500 2501 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh); 2502 if (error) 2503 goto fail; 2504 2505 send_args(r, lkb, ms); 2506 2507 error = send_message(mh, ms); 2508 if (error) 2509 goto fail; 2510 return 0; 2511 2512 fail: 2513 remove_from_waiters(lkb, msg_reply_type(mstype)); 2514 return error; 2515 } 2516 2517 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb) 2518 { 2519 return send_common(r, lkb, DLM_MSG_REQUEST); 2520 } 2521 2522 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb) 2523 { 2524 int error; 2525 2526 error = send_common(r, lkb, DLM_MSG_CONVERT); 2527 2528 /* down conversions go without a reply from the master */ 2529 if (!error && down_conversion(lkb)) { 2530 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY); 2531 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY; 2532 r->res_ls->ls_stub_ms.m_result = 0; 2533 r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags; 2534 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms); 2535 } 2536 2537 return error; 2538 } 2539 2540 /* FIXME: if this lkb is the only lock we hold on the rsb, then set 2541 MASTER_UNCERTAIN to force the next request on the rsb to confirm 2542 that the master is still correct. */ 2543 2544 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb) 2545 { 2546 return send_common(r, lkb, DLM_MSG_UNLOCK); 2547 } 2548 2549 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb) 2550 { 2551 return send_common(r, lkb, DLM_MSG_CANCEL); 2552 } 2553 2554 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb) 2555 { 2556 struct dlm_message *ms; 2557 struct dlm_mhandle *mh; 2558 int to_nodeid, error; 2559 2560 to_nodeid = lkb->lkb_nodeid; 2561 2562 error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh); 2563 if (error) 2564 goto out; 2565 2566 send_args(r, lkb, ms); 2567 2568 ms->m_result = 0; 2569 2570 error = send_message(mh, ms); 2571 out: 2572 return error; 2573 } 2574 2575 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode) 2576 { 2577 struct dlm_message *ms; 2578 struct dlm_mhandle *mh; 2579 int to_nodeid, error; 2580 2581 to_nodeid = lkb->lkb_nodeid; 2582 2583 error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh); 2584 if (error) 2585 goto out; 2586 2587 send_args(r, lkb, ms); 2588 2589 ms->m_bastmode = mode; 2590 2591 error = send_message(mh, ms); 2592 out: 2593 return error; 2594 } 2595 2596 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb) 2597 { 2598 struct dlm_message *ms; 2599 struct dlm_mhandle *mh; 2600 int to_nodeid, error; 2601 2602 error = add_to_waiters(lkb, DLM_MSG_LOOKUP); 2603 if (error) 2604 return error; 2605 2606 to_nodeid = dlm_dir_nodeid(r); 2607 2608 error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh); 2609 if (error) 2610 goto fail; 2611 2612 send_args(r, lkb, ms); 2613 2614 error = send_message(mh, ms); 2615 if (error) 2616 goto fail; 2617 return 0; 2618 2619 fail: 2620 remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY); 2621 return error; 2622 } 2623 2624 static int send_remove(struct dlm_rsb *r) 2625 { 2626 struct dlm_message *ms; 2627 struct dlm_mhandle *mh; 2628 int to_nodeid, error; 2629 2630 to_nodeid = dlm_dir_nodeid(r); 2631 2632 error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh); 2633 if (error) 2634 goto out; 2635 2636 memcpy(ms->m_extra, r->res_name, r->res_length); 2637 ms->m_hash = r->res_hash; 2638 2639 error = send_message(mh, ms); 2640 out: 2641 return error; 2642 } 2643 2644 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, 2645 int mstype, int rv) 2646 { 2647 struct dlm_message *ms; 2648 struct dlm_mhandle *mh; 2649 int to_nodeid, error; 2650 2651 to_nodeid = lkb->lkb_nodeid; 2652 2653 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh); 2654 if (error) 2655 goto out; 2656 2657 send_args(r, lkb, ms); 2658 2659 ms->m_result = rv; 2660 2661 error = send_message(mh, ms); 2662 out: 2663 return error; 2664 } 2665 2666 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) 2667 { 2668 return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv); 2669 } 2670 2671 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) 2672 { 2673 return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv); 2674 } 2675 2676 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) 2677 { 2678 return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv); 2679 } 2680 2681 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) 2682 { 2683 return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv); 2684 } 2685 2686 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in, 2687 int ret_nodeid, int rv) 2688 { 2689 struct dlm_rsb *r = &ls->ls_stub_rsb; 2690 struct dlm_message *ms; 2691 struct dlm_mhandle *mh; 2692 int error, nodeid = ms_in->m_header.h_nodeid; 2693 2694 error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh); 2695 if (error) 2696 goto out; 2697 2698 ms->m_lkid = ms_in->m_lkid; 2699 ms->m_result = rv; 2700 ms->m_nodeid = ret_nodeid; 2701 2702 error = send_message(mh, ms); 2703 out: 2704 return error; 2705 } 2706 2707 /* which args we save from a received message depends heavily on the type 2708 of message, unlike the send side where we can safely send everything about 2709 the lkb for any type of message */ 2710 2711 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms) 2712 { 2713 lkb->lkb_exflags = ms->m_exflags; 2714 lkb->lkb_sbflags = ms->m_sbflags; 2715 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) | 2716 (ms->m_flags & 0x0000FFFF); 2717 } 2718 2719 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms) 2720 { 2721 lkb->lkb_sbflags = ms->m_sbflags; 2722 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) | 2723 (ms->m_flags & 0x0000FFFF); 2724 } 2725 2726 static int receive_extralen(struct dlm_message *ms) 2727 { 2728 return (ms->m_header.h_length - sizeof(struct dlm_message)); 2729 } 2730 2731 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb, 2732 struct dlm_message *ms) 2733 { 2734 int len; 2735 2736 if (lkb->lkb_exflags & DLM_LKF_VALBLK) { 2737 if (!lkb->lkb_lvbptr) 2738 lkb->lkb_lvbptr = allocate_lvb(ls); 2739 if (!lkb->lkb_lvbptr) 2740 return -ENOMEM; 2741 len = receive_extralen(ms); 2742 memcpy(lkb->lkb_lvbptr, ms->m_extra, len); 2743 } 2744 return 0; 2745 } 2746 2747 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb, 2748 struct dlm_message *ms) 2749 { 2750 lkb->lkb_nodeid = ms->m_header.h_nodeid; 2751 lkb->lkb_ownpid = ms->m_pid; 2752 lkb->lkb_remid = ms->m_lkid; 2753 lkb->lkb_grmode = DLM_LOCK_IV; 2754 lkb->lkb_rqmode = ms->m_rqmode; 2755 lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST); 2756 lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP); 2757 2758 DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb);); 2759 2760 if (lkb->lkb_exflags & DLM_LKF_VALBLK) { 2761 /* lkb was just created so there won't be an lvb yet */ 2762 lkb->lkb_lvbptr = allocate_lvb(ls); 2763 if (!lkb->lkb_lvbptr) 2764 return -ENOMEM; 2765 } 2766 2767 return 0; 2768 } 2769 2770 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb, 2771 struct dlm_message *ms) 2772 { 2773 if (lkb->lkb_nodeid != ms->m_header.h_nodeid) { 2774 log_error(ls, "convert_args nodeid %d %d lkid %x %x", 2775 lkb->lkb_nodeid, ms->m_header.h_nodeid, 2776 lkb->lkb_id, lkb->lkb_remid); 2777 return -EINVAL; 2778 } 2779 2780 if (!is_master_copy(lkb)) 2781 return -EINVAL; 2782 2783 if (lkb->lkb_status != DLM_LKSTS_GRANTED) 2784 return -EBUSY; 2785 2786 if (receive_lvb(ls, lkb, ms)) 2787 return -ENOMEM; 2788 2789 lkb->lkb_rqmode = ms->m_rqmode; 2790 lkb->lkb_lvbseq = ms->m_lvbseq; 2791 2792 return 0; 2793 } 2794 2795 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, 2796 struct dlm_message *ms) 2797 { 2798 if (!is_master_copy(lkb)) 2799 return -EINVAL; 2800 if (receive_lvb(ls, lkb, ms)) 2801 return -ENOMEM; 2802 return 0; 2803 } 2804 2805 /* We fill in the stub-lkb fields with the info that send_xxxx_reply() 2806 uses to send a reply and that the remote end uses to process the reply. */ 2807 2808 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms) 2809 { 2810 struct dlm_lkb *lkb = &ls->ls_stub_lkb; 2811 lkb->lkb_nodeid = ms->m_header.h_nodeid; 2812 lkb->lkb_remid = ms->m_lkid; 2813 } 2814 2815 static void receive_request(struct dlm_ls *ls, struct dlm_message *ms) 2816 { 2817 struct dlm_lkb *lkb; 2818 struct dlm_rsb *r; 2819 int error, namelen; 2820 2821 error = create_lkb(ls, &lkb); 2822 if (error) 2823 goto fail; 2824 2825 receive_flags(lkb, ms); 2826 lkb->lkb_flags |= DLM_IFL_MSTCPY; 2827 error = receive_request_args(ls, lkb, ms); 2828 if (error) { 2829 __put_lkb(ls, lkb); 2830 goto fail; 2831 } 2832 2833 namelen = receive_extralen(ms); 2834 2835 error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r); 2836 if (error) { 2837 __put_lkb(ls, lkb); 2838 goto fail; 2839 } 2840 2841 lock_rsb(r); 2842 2843 attach_lkb(r, lkb); 2844 error = do_request(r, lkb); 2845 send_request_reply(r, lkb, error); 2846 2847 unlock_rsb(r); 2848 put_rsb(r); 2849 2850 if (error == -EINPROGRESS) 2851 error = 0; 2852 if (error) 2853 dlm_put_lkb(lkb); 2854 return; 2855 2856 fail: 2857 setup_stub_lkb(ls, ms); 2858 send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); 2859 } 2860 2861 static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms) 2862 { 2863 struct dlm_lkb *lkb; 2864 struct dlm_rsb *r; 2865 int error, reply = 1; 2866 2867 error = find_lkb(ls, ms->m_remid, &lkb); 2868 if (error) 2869 goto fail; 2870 2871 r = lkb->lkb_resource; 2872 2873 hold_rsb(r); 2874 lock_rsb(r); 2875 2876 receive_flags(lkb, ms); 2877 error = receive_convert_args(ls, lkb, ms); 2878 if (error) 2879 goto out; 2880 reply = !down_conversion(lkb); 2881 2882 error = do_convert(r, lkb); 2883 out: 2884 if (reply) 2885 send_convert_reply(r, lkb, error); 2886 2887 unlock_rsb(r); 2888 put_rsb(r); 2889 dlm_put_lkb(lkb); 2890 return; 2891 2892 fail: 2893 setup_stub_lkb(ls, ms); 2894 send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); 2895 } 2896 2897 static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms) 2898 { 2899 struct dlm_lkb *lkb; 2900 struct dlm_rsb *r; 2901 int error; 2902 2903 error = find_lkb(ls, ms->m_remid, &lkb); 2904 if (error) 2905 goto fail; 2906 2907 r = lkb->lkb_resource; 2908 2909 hold_rsb(r); 2910 lock_rsb(r); 2911 2912 receive_flags(lkb, ms); 2913 error = receive_unlock_args(ls, lkb, ms); 2914 if (error) 2915 goto out; 2916 2917 error = do_unlock(r, lkb); 2918 out: 2919 send_unlock_reply(r, lkb, error); 2920 2921 unlock_rsb(r); 2922 put_rsb(r); 2923 dlm_put_lkb(lkb); 2924 return; 2925 2926 fail: 2927 setup_stub_lkb(ls, ms); 2928 send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); 2929 } 2930 2931 static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms) 2932 { 2933 struct dlm_lkb *lkb; 2934 struct dlm_rsb *r; 2935 int error; 2936 2937 error = find_lkb(ls, ms->m_remid, &lkb); 2938 if (error) 2939 goto fail; 2940 2941 receive_flags(lkb, ms); 2942 2943 r = lkb->lkb_resource; 2944 2945 hold_rsb(r); 2946 lock_rsb(r); 2947 2948 error = do_cancel(r, lkb); 2949 send_cancel_reply(r, lkb, error); 2950 2951 unlock_rsb(r); 2952 put_rsb(r); 2953 dlm_put_lkb(lkb); 2954 return; 2955 2956 fail: 2957 setup_stub_lkb(ls, ms); 2958 send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); 2959 } 2960 2961 static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms) 2962 { 2963 struct dlm_lkb *lkb; 2964 struct dlm_rsb *r; 2965 int error; 2966 2967 error = find_lkb(ls, ms->m_remid, &lkb); 2968 if (error) { 2969 log_error(ls, "receive_grant no lkb"); 2970 return; 2971 } 2972 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); 2973 2974 r = lkb->lkb_resource; 2975 2976 hold_rsb(r); 2977 lock_rsb(r); 2978 2979 receive_flags_reply(lkb, ms); 2980 if (is_altmode(lkb)) 2981 munge_altmode(lkb, ms); 2982 grant_lock_pc(r, lkb, ms); 2983 queue_cast(r, lkb, 0); 2984 2985 unlock_rsb(r); 2986 put_rsb(r); 2987 dlm_put_lkb(lkb); 2988 } 2989 2990 static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms) 2991 { 2992 struct dlm_lkb *lkb; 2993 struct dlm_rsb *r; 2994 int error; 2995 2996 error = find_lkb(ls, ms->m_remid, &lkb); 2997 if (error) { 2998 log_error(ls, "receive_bast no lkb"); 2999 return; 3000 } 3001 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); 3002 3003 r = lkb->lkb_resource; 3004 3005 hold_rsb(r); 3006 lock_rsb(r); 3007 3008 queue_bast(r, lkb, ms->m_bastmode); 3009 3010 unlock_rsb(r); 3011 put_rsb(r); 3012 dlm_put_lkb(lkb); 3013 } 3014 3015 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms) 3016 { 3017 int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid; 3018 3019 from_nodeid = ms->m_header.h_nodeid; 3020 our_nodeid = dlm_our_nodeid(); 3021 3022 len = receive_extralen(ms); 3023 3024 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash); 3025 if (dir_nodeid != our_nodeid) { 3026 log_error(ls, "lookup dir_nodeid %d from %d", 3027 dir_nodeid, from_nodeid); 3028 error = -EINVAL; 3029 ret_nodeid = -1; 3030 goto out; 3031 } 3032 3033 error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid); 3034 3035 /* Optimization: we're master so treat lookup as a request */ 3036 if (!error && ret_nodeid == our_nodeid) { 3037 receive_request(ls, ms); 3038 return; 3039 } 3040 out: 3041 send_lookup_reply(ls, ms, ret_nodeid, error); 3042 } 3043 3044 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms) 3045 { 3046 int len, dir_nodeid, from_nodeid; 3047 3048 from_nodeid = ms->m_header.h_nodeid; 3049 3050 len = receive_extralen(ms); 3051 3052 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash); 3053 if (dir_nodeid != dlm_our_nodeid()) { 3054 log_error(ls, "remove dir entry dir_nodeid %d from %d", 3055 dir_nodeid, from_nodeid); 3056 return; 3057 } 3058 3059 dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len); 3060 } 3061 3062 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms) 3063 { 3064 do_purge(ls, ms->m_nodeid, ms->m_pid); 3065 } 3066 3067 static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) 3068 { 3069 struct dlm_lkb *lkb; 3070 struct dlm_rsb *r; 3071 int error, mstype, result; 3072 3073 error = find_lkb(ls, ms->m_remid, &lkb); 3074 if (error) { 3075 log_error(ls, "receive_request_reply no lkb"); 3076 return; 3077 } 3078 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); 3079 3080 r = lkb->lkb_resource; 3081 hold_rsb(r); 3082 lock_rsb(r); 3083 3084 mstype = lkb->lkb_wait_type; 3085 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY); 3086 if (error) 3087 goto out; 3088 3089 /* Optimization: the dir node was also the master, so it took our 3090 lookup as a request and sent request reply instead of lookup reply */ 3091 if (mstype == DLM_MSG_LOOKUP) { 3092 r->res_nodeid = ms->m_header.h_nodeid; 3093 lkb->lkb_nodeid = r->res_nodeid; 3094 } 3095 3096 /* this is the value returned from do_request() on the master */ 3097 result = ms->m_result; 3098 3099 switch (result) { 3100 case -EAGAIN: 3101 /* request would block (be queued) on remote master */ 3102 queue_cast(r, lkb, -EAGAIN); 3103 confirm_master(r, -EAGAIN); 3104 unhold_lkb(lkb); /* undoes create_lkb() */ 3105 break; 3106 3107 case -EINPROGRESS: 3108 case 0: 3109 /* request was queued or granted on remote master */ 3110 receive_flags_reply(lkb, ms); 3111 lkb->lkb_remid = ms->m_lkid; 3112 if (is_altmode(lkb)) 3113 munge_altmode(lkb, ms); 3114 if (result) 3115 add_lkb(r, lkb, DLM_LKSTS_WAITING); 3116 else { 3117 grant_lock_pc(r, lkb, ms); 3118 queue_cast(r, lkb, 0); 3119 } 3120 confirm_master(r, result); 3121 break; 3122 3123 case -EBADR: 3124 case -ENOTBLK: 3125 /* find_rsb failed to find rsb or rsb wasn't master */ 3126 log_debug(ls, "receive_request_reply %x %x master diff %d %d", 3127 lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result); 3128 r->res_nodeid = -1; 3129 lkb->lkb_nodeid = -1; 3130 3131 if (is_overlap(lkb)) { 3132 /* we'll ignore error in cancel/unlock reply */ 3133 queue_cast_overlap(r, lkb); 3134 unhold_lkb(lkb); /* undoes create_lkb() */ 3135 } else 3136 _request_lock(r, lkb); 3137 break; 3138 3139 default: 3140 log_error(ls, "receive_request_reply %x error %d", 3141 lkb->lkb_id, result); 3142 } 3143 3144 if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) { 3145 log_debug(ls, "receive_request_reply %x result %d unlock", 3146 lkb->lkb_id, result); 3147 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK; 3148 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; 3149 send_unlock(r, lkb); 3150 } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) { 3151 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id); 3152 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK; 3153 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; 3154 send_cancel(r, lkb); 3155 } else { 3156 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; 3157 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK; 3158 } 3159 out: 3160 unlock_rsb(r); 3161 put_rsb(r); 3162 dlm_put_lkb(lkb); 3163 } 3164 3165 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, 3166 struct dlm_message *ms) 3167 { 3168 /* this is the value returned from do_convert() on the master */ 3169 switch (ms->m_result) { 3170 case -EAGAIN: 3171 /* convert would block (be queued) on remote master */ 3172 queue_cast(r, lkb, -EAGAIN); 3173 break; 3174 3175 case -EINPROGRESS: 3176 /* convert was queued on remote master */ 3177 receive_flags_reply(lkb, ms); 3178 if (is_demoted(lkb)) 3179 munge_demoted(lkb, ms); 3180 del_lkb(r, lkb); 3181 add_lkb(r, lkb, DLM_LKSTS_CONVERT); 3182 break; 3183 3184 case 0: 3185 /* convert was granted on remote master */ 3186 receive_flags_reply(lkb, ms); 3187 if (is_demoted(lkb)) 3188 munge_demoted(lkb, ms); 3189 grant_lock_pc(r, lkb, ms); 3190 queue_cast(r, lkb, 0); 3191 break; 3192 3193 default: 3194 log_error(r->res_ls, "receive_convert_reply %x error %d", 3195 lkb->lkb_id, ms->m_result); 3196 } 3197 } 3198 3199 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms) 3200 { 3201 struct dlm_rsb *r = lkb->lkb_resource; 3202 int error; 3203 3204 hold_rsb(r); 3205 lock_rsb(r); 3206 3207 /* stub reply can happen with waiters_mutex held */ 3208 error = remove_from_waiters_ms(lkb, ms); 3209 if (error) 3210 goto out; 3211 3212 __receive_convert_reply(r, lkb, ms); 3213 out: 3214 unlock_rsb(r); 3215 put_rsb(r); 3216 } 3217 3218 static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms) 3219 { 3220 struct dlm_lkb *lkb; 3221 int error; 3222 3223 error = find_lkb(ls, ms->m_remid, &lkb); 3224 if (error) { 3225 log_error(ls, "receive_convert_reply no lkb"); 3226 return; 3227 } 3228 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); 3229 3230 _receive_convert_reply(lkb, ms); 3231 dlm_put_lkb(lkb); 3232 } 3233 3234 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms) 3235 { 3236 struct dlm_rsb *r = lkb->lkb_resource; 3237 int error; 3238 3239 hold_rsb(r); 3240 lock_rsb(r); 3241 3242 /* stub reply can happen with waiters_mutex held */ 3243 error = remove_from_waiters_ms(lkb, ms); 3244 if (error) 3245 goto out; 3246 3247 /* this is the value returned from do_unlock() on the master */ 3248 3249 switch (ms->m_result) { 3250 case -DLM_EUNLOCK: 3251 receive_flags_reply(lkb, ms); 3252 remove_lock_pc(r, lkb); 3253 queue_cast(r, lkb, -DLM_EUNLOCK); 3254 break; 3255 case -ENOENT: 3256 break; 3257 default: 3258 log_error(r->res_ls, "receive_unlock_reply %x error %d", 3259 lkb->lkb_id, ms->m_result); 3260 } 3261 out: 3262 unlock_rsb(r); 3263 put_rsb(r); 3264 } 3265 3266 static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms) 3267 { 3268 struct dlm_lkb *lkb; 3269 int error; 3270 3271 error = find_lkb(ls, ms->m_remid, &lkb); 3272 if (error) { 3273 log_error(ls, "receive_unlock_reply no lkb"); 3274 return; 3275 } 3276 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); 3277 3278 _receive_unlock_reply(lkb, ms); 3279 dlm_put_lkb(lkb); 3280 } 3281 3282 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms) 3283 { 3284 struct dlm_rsb *r = lkb->lkb_resource; 3285 int error; 3286 3287 hold_rsb(r); 3288 lock_rsb(r); 3289 3290 /* stub reply can happen with waiters_mutex held */ 3291 error = remove_from_waiters_ms(lkb, ms); 3292 if (error) 3293 goto out; 3294 3295 /* this is the value returned from do_cancel() on the master */ 3296 3297 switch (ms->m_result) { 3298 case -DLM_ECANCEL: 3299 receive_flags_reply(lkb, ms); 3300 revert_lock_pc(r, lkb); 3301 if (ms->m_result) 3302 queue_cast(r, lkb, -DLM_ECANCEL); 3303 break; 3304 case 0: 3305 break; 3306 default: 3307 log_error(r->res_ls, "receive_cancel_reply %x error %d", 3308 lkb->lkb_id, ms->m_result); 3309 } 3310 out: 3311 unlock_rsb(r); 3312 put_rsb(r); 3313 } 3314 3315 static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms) 3316 { 3317 struct dlm_lkb *lkb; 3318 int error; 3319 3320 error = find_lkb(ls, ms->m_remid, &lkb); 3321 if (error) { 3322 log_error(ls, "receive_cancel_reply no lkb"); 3323 return; 3324 } 3325 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); 3326 3327 _receive_cancel_reply(lkb, ms); 3328 dlm_put_lkb(lkb); 3329 } 3330 3331 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms) 3332 { 3333 struct dlm_lkb *lkb; 3334 struct dlm_rsb *r; 3335 int error, ret_nodeid; 3336 3337 error = find_lkb(ls, ms->m_lkid, &lkb); 3338 if (error) { 3339 log_error(ls, "receive_lookup_reply no lkb"); 3340 return; 3341 } 3342 3343 /* ms->m_result is the value returned by dlm_dir_lookup on dir node 3344 FIXME: will a non-zero error ever be returned? */ 3345 3346 r = lkb->lkb_resource; 3347 hold_rsb(r); 3348 lock_rsb(r); 3349 3350 error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY); 3351 if (error) 3352 goto out; 3353 3354 ret_nodeid = ms->m_nodeid; 3355 if (ret_nodeid == dlm_our_nodeid()) { 3356 r->res_nodeid = 0; 3357 ret_nodeid = 0; 3358 r->res_first_lkid = 0; 3359 } else { 3360 /* set_master() will copy res_nodeid to lkb_nodeid */ 3361 r->res_nodeid = ret_nodeid; 3362 } 3363 3364 if (is_overlap(lkb)) { 3365 log_debug(ls, "receive_lookup_reply %x unlock %x", 3366 lkb->lkb_id, lkb->lkb_flags); 3367 queue_cast_overlap(r, lkb); 3368 unhold_lkb(lkb); /* undoes create_lkb() */ 3369 goto out_list; 3370 } 3371 3372 _request_lock(r, lkb); 3373 3374 out_list: 3375 if (!ret_nodeid) 3376 process_lookup_list(r); 3377 out: 3378 unlock_rsb(r); 3379 put_rsb(r); 3380 dlm_put_lkb(lkb); 3381 } 3382 3383 int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery) 3384 { 3385 struct dlm_message *ms = (struct dlm_message *) hd; 3386 struct dlm_ls *ls; 3387 int error = 0; 3388 3389 if (!recovery) 3390 dlm_message_in(ms); 3391 3392 ls = dlm_find_lockspace_global(hd->h_lockspace); 3393 if (!ls) { 3394 log_print("drop message %d from %d for unknown lockspace %d", 3395 ms->m_type, nodeid, hd->h_lockspace); 3396 return -EINVAL; 3397 } 3398 3399 /* recovery may have just ended leaving a bunch of backed-up requests 3400 in the requestqueue; wait while dlm_recoverd clears them */ 3401 3402 if (!recovery) 3403 dlm_wait_requestqueue(ls); 3404 3405 /* recovery may have just started while there were a bunch of 3406 in-flight requests -- save them in requestqueue to be processed 3407 after recovery. we can't let dlm_recvd block on the recovery 3408 lock. if dlm_recoverd is calling this function to clear the 3409 requestqueue, it needs to be interrupted (-EINTR) if another 3410 recovery operation is starting. */ 3411 3412 while (1) { 3413 if (dlm_locking_stopped(ls)) { 3414 if (recovery) { 3415 error = -EINTR; 3416 goto out; 3417 } 3418 error = dlm_add_requestqueue(ls, nodeid, hd); 3419 if (error == -EAGAIN) 3420 continue; 3421 else { 3422 error = -EINTR; 3423 goto out; 3424 } 3425 } 3426 3427 if (lock_recovery_try(ls)) 3428 break; 3429 schedule(); 3430 } 3431 3432 switch (ms->m_type) { 3433 3434 /* messages sent to a master node */ 3435 3436 case DLM_MSG_REQUEST: 3437 receive_request(ls, ms); 3438 break; 3439 3440 case DLM_MSG_CONVERT: 3441 receive_convert(ls, ms); 3442 break; 3443 3444 case DLM_MSG_UNLOCK: 3445 receive_unlock(ls, ms); 3446 break; 3447 3448 case DLM_MSG_CANCEL: 3449 receive_cancel(ls, ms); 3450 break; 3451 3452 /* messages sent from a master node (replies to above) */ 3453 3454 case DLM_MSG_REQUEST_REPLY: 3455 receive_request_reply(ls, ms); 3456 break; 3457 3458 case DLM_MSG_CONVERT_REPLY: 3459 receive_convert_reply(ls, ms); 3460 break; 3461 3462 case DLM_MSG_UNLOCK_REPLY: 3463 receive_unlock_reply(ls, ms); 3464 break; 3465 3466 case DLM_MSG_CANCEL_REPLY: 3467 receive_cancel_reply(ls, ms); 3468 break; 3469 3470 /* messages sent from a master node (only two types of async msg) */ 3471 3472 case DLM_MSG_GRANT: 3473 receive_grant(ls, ms); 3474 break; 3475 3476 case DLM_MSG_BAST: 3477 receive_bast(ls, ms); 3478 break; 3479 3480 /* messages sent to a dir node */ 3481 3482 case DLM_MSG_LOOKUP: 3483 receive_lookup(ls, ms); 3484 break; 3485 3486 case DLM_MSG_REMOVE: 3487 receive_remove(ls, ms); 3488 break; 3489 3490 /* messages sent from a dir node (remove has no reply) */ 3491 3492 case DLM_MSG_LOOKUP_REPLY: 3493 receive_lookup_reply(ls, ms); 3494 break; 3495 3496 /* other messages */ 3497 3498 case DLM_MSG_PURGE: 3499 receive_purge(ls, ms); 3500 break; 3501 3502 default: 3503 log_error(ls, "unknown message type %d", ms->m_type); 3504 } 3505 3506 unlock_recovery(ls); 3507 out: 3508 dlm_put_lockspace(ls); 3509 dlm_astd_wake(); 3510 return error; 3511 } 3512 3513 3514 /* 3515 * Recovery related 3516 */ 3517 3518 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb) 3519 { 3520 if (middle_conversion(lkb)) { 3521 hold_lkb(lkb); 3522 ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY; 3523 ls->ls_stub_ms.m_result = -EINPROGRESS; 3524 ls->ls_stub_ms.m_flags = lkb->lkb_flags; 3525 _receive_convert_reply(lkb, &ls->ls_stub_ms); 3526 3527 /* Same special case as in receive_rcom_lock_args() */ 3528 lkb->lkb_grmode = DLM_LOCK_IV; 3529 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT); 3530 unhold_lkb(lkb); 3531 3532 } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) { 3533 lkb->lkb_flags |= DLM_IFL_RESEND; 3534 } 3535 3536 /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down 3537 conversions are async; there's no reply from the remote master */ 3538 } 3539 3540 /* A waiting lkb needs recovery if the master node has failed, or 3541 the master node is changing (only when no directory is used) */ 3542 3543 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb) 3544 { 3545 if (dlm_is_removed(ls, lkb->lkb_nodeid)) 3546 return 1; 3547 3548 if (!dlm_no_directory(ls)) 3549 return 0; 3550 3551 if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid) 3552 return 1; 3553 3554 return 0; 3555 } 3556 3557 /* Recovery for locks that are waiting for replies from nodes that are now 3558 gone. We can just complete unlocks and cancels by faking a reply from the 3559 dead node. Requests and up-conversions we flag to be resent after 3560 recovery. Down-conversions can just be completed with a fake reply like 3561 unlocks. Conversions between PR and CW need special attention. */ 3562 3563 void dlm_recover_waiters_pre(struct dlm_ls *ls) 3564 { 3565 struct dlm_lkb *lkb, *safe; 3566 3567 mutex_lock(&ls->ls_waiters_mutex); 3568 3569 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) { 3570 log_debug(ls, "pre recover waiter lkid %x type %d flags %x", 3571 lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags); 3572 3573 /* all outstanding lookups, regardless of destination will be 3574 resent after recovery is done */ 3575 3576 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) { 3577 lkb->lkb_flags |= DLM_IFL_RESEND; 3578 continue; 3579 } 3580 3581 if (!waiter_needs_recovery(ls, lkb)) 3582 continue; 3583 3584 switch (lkb->lkb_wait_type) { 3585 3586 case DLM_MSG_REQUEST: 3587 lkb->lkb_flags |= DLM_IFL_RESEND; 3588 break; 3589 3590 case DLM_MSG_CONVERT: 3591 recover_convert_waiter(ls, lkb); 3592 break; 3593 3594 case DLM_MSG_UNLOCK: 3595 hold_lkb(lkb); 3596 ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY; 3597 ls->ls_stub_ms.m_result = -DLM_EUNLOCK; 3598 ls->ls_stub_ms.m_flags = lkb->lkb_flags; 3599 _receive_unlock_reply(lkb, &ls->ls_stub_ms); 3600 dlm_put_lkb(lkb); 3601 break; 3602 3603 case DLM_MSG_CANCEL: 3604 hold_lkb(lkb); 3605 ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY; 3606 ls->ls_stub_ms.m_result = -DLM_ECANCEL; 3607 ls->ls_stub_ms.m_flags = lkb->lkb_flags; 3608 _receive_cancel_reply(lkb, &ls->ls_stub_ms); 3609 dlm_put_lkb(lkb); 3610 break; 3611 3612 default: 3613 log_error(ls, "invalid lkb wait_type %d", 3614 lkb->lkb_wait_type); 3615 } 3616 schedule(); 3617 } 3618 mutex_unlock(&ls->ls_waiters_mutex); 3619 } 3620 3621 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls) 3622 { 3623 struct dlm_lkb *lkb; 3624 int found = 0; 3625 3626 mutex_lock(&ls->ls_waiters_mutex); 3627 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) { 3628 if (lkb->lkb_flags & DLM_IFL_RESEND) { 3629 hold_lkb(lkb); 3630 found = 1; 3631 break; 3632 } 3633 } 3634 mutex_unlock(&ls->ls_waiters_mutex); 3635 3636 if (!found) 3637 lkb = NULL; 3638 return lkb; 3639 } 3640 3641 /* Deal with lookups and lkb's marked RESEND from _pre. We may now be the 3642 master or dir-node for r. Processing the lkb may result in it being placed 3643 back on waiters. */ 3644 3645 /* We do this after normal locking has been enabled and any saved messages 3646 (in requestqueue) have been processed. We should be confident that at 3647 this point we won't get or process a reply to any of these waiting 3648 operations. But, new ops may be coming in on the rsbs/locks here from 3649 userspace or remotely. */ 3650 3651 /* there may have been an overlap unlock/cancel prior to recovery or after 3652 recovery. if before, the lkb may still have a pos wait_count; if after, the 3653 overlap flag would just have been set and nothing new sent. we can be 3654 confident here than any replies to either the initial op or overlap ops 3655 prior to recovery have been received. */ 3656 3657 int dlm_recover_waiters_post(struct dlm_ls *ls) 3658 { 3659 struct dlm_lkb *lkb; 3660 struct dlm_rsb *r; 3661 int error = 0, mstype, err, oc, ou; 3662 3663 while (1) { 3664 if (dlm_locking_stopped(ls)) { 3665 log_debug(ls, "recover_waiters_post aborted"); 3666 error = -EINTR; 3667 break; 3668 } 3669 3670 lkb = find_resend_waiter(ls); 3671 if (!lkb) 3672 break; 3673 3674 r = lkb->lkb_resource; 3675 hold_rsb(r); 3676 lock_rsb(r); 3677 3678 mstype = lkb->lkb_wait_type; 3679 oc = is_overlap_cancel(lkb); 3680 ou = is_overlap_unlock(lkb); 3681 err = 0; 3682 3683 log_debug(ls, "recover_waiters_post %x type %d flags %x %s", 3684 lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name); 3685 3686 /* At this point we assume that we won't get a reply to any 3687 previous op or overlap op on this lock. First, do a big 3688 remove_from_waiters() for all previous ops. */ 3689 3690 lkb->lkb_flags &= ~DLM_IFL_RESEND; 3691 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK; 3692 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; 3693 lkb->lkb_wait_type = 0; 3694 lkb->lkb_wait_count = 0; 3695 mutex_lock(&ls->ls_waiters_mutex); 3696 list_del_init(&lkb->lkb_wait_reply); 3697 mutex_unlock(&ls->ls_waiters_mutex); 3698 unhold_lkb(lkb); /* for waiters list */ 3699 3700 if (oc || ou) { 3701 /* do an unlock or cancel instead of resending */ 3702 switch (mstype) { 3703 case DLM_MSG_LOOKUP: 3704 case DLM_MSG_REQUEST: 3705 queue_cast(r, lkb, ou ? -DLM_EUNLOCK : 3706 -DLM_ECANCEL); 3707 unhold_lkb(lkb); /* undoes create_lkb() */ 3708 break; 3709 case DLM_MSG_CONVERT: 3710 if (oc) { 3711 queue_cast(r, lkb, -DLM_ECANCEL); 3712 } else { 3713 lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK; 3714 _unlock_lock(r, lkb); 3715 } 3716 break; 3717 default: 3718 err = 1; 3719 } 3720 } else { 3721 switch (mstype) { 3722 case DLM_MSG_LOOKUP: 3723 case DLM_MSG_REQUEST: 3724 _request_lock(r, lkb); 3725 if (is_master(r)) 3726 confirm_master(r, 0); 3727 break; 3728 case DLM_MSG_CONVERT: 3729 _convert_lock(r, lkb); 3730 break; 3731 default: 3732 err = 1; 3733 } 3734 } 3735 3736 if (err) 3737 log_error(ls, "recover_waiters_post %x %d %x %d %d", 3738 lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou); 3739 unlock_rsb(r); 3740 put_rsb(r); 3741 dlm_put_lkb(lkb); 3742 } 3743 3744 return error; 3745 } 3746 3747 static void purge_queue(struct dlm_rsb *r, struct list_head *queue, 3748 int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb)) 3749 { 3750 struct dlm_ls *ls = r->res_ls; 3751 struct dlm_lkb *lkb, *safe; 3752 3753 list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) { 3754 if (test(ls, lkb)) { 3755 rsb_set_flag(r, RSB_LOCKS_PURGED); 3756 del_lkb(r, lkb); 3757 /* this put should free the lkb */ 3758 if (!dlm_put_lkb(lkb)) 3759 log_error(ls, "purged lkb not released"); 3760 } 3761 } 3762 } 3763 3764 static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb) 3765 { 3766 return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid)); 3767 } 3768 3769 static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb) 3770 { 3771 return is_master_copy(lkb); 3772 } 3773 3774 static void purge_dead_locks(struct dlm_rsb *r) 3775 { 3776 purge_queue(r, &r->res_grantqueue, &purge_dead_test); 3777 purge_queue(r, &r->res_convertqueue, &purge_dead_test); 3778 purge_queue(r, &r->res_waitqueue, &purge_dead_test); 3779 } 3780 3781 void dlm_purge_mstcpy_locks(struct dlm_rsb *r) 3782 { 3783 purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test); 3784 purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test); 3785 purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test); 3786 } 3787 3788 /* Get rid of locks held by nodes that are gone. */ 3789 3790 int dlm_purge_locks(struct dlm_ls *ls) 3791 { 3792 struct dlm_rsb *r; 3793 3794 log_debug(ls, "dlm_purge_locks"); 3795 3796 down_write(&ls->ls_root_sem); 3797 list_for_each_entry(r, &ls->ls_root_list, res_root_list) { 3798 hold_rsb(r); 3799 lock_rsb(r); 3800 if (is_master(r)) 3801 purge_dead_locks(r); 3802 unlock_rsb(r); 3803 unhold_rsb(r); 3804 3805 schedule(); 3806 } 3807 up_write(&ls->ls_root_sem); 3808 3809 return 0; 3810 } 3811 3812 static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket) 3813 { 3814 struct dlm_rsb *r, *r_ret = NULL; 3815 3816 read_lock(&ls->ls_rsbtbl[bucket].lock); 3817 list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) { 3818 if (!rsb_flag(r, RSB_LOCKS_PURGED)) 3819 continue; 3820 hold_rsb(r); 3821 rsb_clear_flag(r, RSB_LOCKS_PURGED); 3822 r_ret = r; 3823 break; 3824 } 3825 read_unlock(&ls->ls_rsbtbl[bucket].lock); 3826 return r_ret; 3827 } 3828 3829 void dlm_grant_after_purge(struct dlm_ls *ls) 3830 { 3831 struct dlm_rsb *r; 3832 int bucket = 0; 3833 3834 while (1) { 3835 r = find_purged_rsb(ls, bucket); 3836 if (!r) { 3837 if (bucket == ls->ls_rsbtbl_size - 1) 3838 break; 3839 bucket++; 3840 continue; 3841 } 3842 lock_rsb(r); 3843 if (is_master(r)) { 3844 grant_pending_locks(r); 3845 confirm_master(r, 0); 3846 } 3847 unlock_rsb(r); 3848 put_rsb(r); 3849 schedule(); 3850 } 3851 } 3852 3853 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid, 3854 uint32_t remid) 3855 { 3856 struct dlm_lkb *lkb; 3857 3858 list_for_each_entry(lkb, head, lkb_statequeue) { 3859 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid) 3860 return lkb; 3861 } 3862 return NULL; 3863 } 3864 3865 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid, 3866 uint32_t remid) 3867 { 3868 struct dlm_lkb *lkb; 3869 3870 lkb = search_remid_list(&r->res_grantqueue, nodeid, remid); 3871 if (lkb) 3872 return lkb; 3873 lkb = search_remid_list(&r->res_convertqueue, nodeid, remid); 3874 if (lkb) 3875 return lkb; 3876 lkb = search_remid_list(&r->res_waitqueue, nodeid, remid); 3877 if (lkb) 3878 return lkb; 3879 return NULL; 3880 } 3881 3882 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, 3883 struct dlm_rsb *r, struct dlm_rcom *rc) 3884 { 3885 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf; 3886 int lvblen; 3887 3888 lkb->lkb_nodeid = rc->rc_header.h_nodeid; 3889 lkb->lkb_ownpid = rl->rl_ownpid; 3890 lkb->lkb_remid = rl->rl_lkid; 3891 lkb->lkb_exflags = rl->rl_exflags; 3892 lkb->lkb_flags = rl->rl_flags & 0x0000FFFF; 3893 lkb->lkb_flags |= DLM_IFL_MSTCPY; 3894 lkb->lkb_lvbseq = rl->rl_lvbseq; 3895 lkb->lkb_rqmode = rl->rl_rqmode; 3896 lkb->lkb_grmode = rl->rl_grmode; 3897 /* don't set lkb_status because add_lkb wants to itself */ 3898 3899 lkb->lkb_bastaddr = (void *) (long) (rl->rl_asts & AST_BAST); 3900 lkb->lkb_astaddr = (void *) (long) (rl->rl_asts & AST_COMP); 3901 3902 if (lkb->lkb_exflags & DLM_LKF_VALBLK) { 3903 lkb->lkb_lvbptr = allocate_lvb(ls); 3904 if (!lkb->lkb_lvbptr) 3905 return -ENOMEM; 3906 lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) - 3907 sizeof(struct rcom_lock); 3908 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen); 3909 } 3910 3911 /* Conversions between PR and CW (middle modes) need special handling. 3912 The real granted mode of these converting locks cannot be determined 3913 until all locks have been rebuilt on the rsb (recover_conversion) */ 3914 3915 if (rl->rl_wait_type == DLM_MSG_CONVERT && middle_conversion(lkb)) { 3916 rl->rl_status = DLM_LKSTS_CONVERT; 3917 lkb->lkb_grmode = DLM_LOCK_IV; 3918 rsb_set_flag(r, RSB_RECOVER_CONVERT); 3919 } 3920 3921 return 0; 3922 } 3923 3924 /* This lkb may have been recovered in a previous aborted recovery so we need 3925 to check if the rsb already has an lkb with the given remote nodeid/lkid. 3926 If so we just send back a standard reply. If not, we create a new lkb with 3927 the given values and send back our lkid. We send back our lkid by sending 3928 back the rcom_lock struct we got but with the remid field filled in. */ 3929 3930 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc) 3931 { 3932 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf; 3933 struct dlm_rsb *r; 3934 struct dlm_lkb *lkb; 3935 int error; 3936 3937 if (rl->rl_parent_lkid) { 3938 error = -EOPNOTSUPP; 3939 goto out; 3940 } 3941 3942 error = find_rsb(ls, rl->rl_name, rl->rl_namelen, R_MASTER, &r); 3943 if (error) 3944 goto out; 3945 3946 lock_rsb(r); 3947 3948 lkb = search_remid(r, rc->rc_header.h_nodeid, rl->rl_lkid); 3949 if (lkb) { 3950 error = -EEXIST; 3951 goto out_remid; 3952 } 3953 3954 error = create_lkb(ls, &lkb); 3955 if (error) 3956 goto out_unlock; 3957 3958 error = receive_rcom_lock_args(ls, lkb, r, rc); 3959 if (error) { 3960 __put_lkb(ls, lkb); 3961 goto out_unlock; 3962 } 3963 3964 attach_lkb(r, lkb); 3965 add_lkb(r, lkb, rl->rl_status); 3966 error = 0; 3967 3968 out_remid: 3969 /* this is the new value returned to the lock holder for 3970 saving in its process-copy lkb */ 3971 rl->rl_remid = lkb->lkb_id; 3972 3973 out_unlock: 3974 unlock_rsb(r); 3975 put_rsb(r); 3976 out: 3977 if (error) 3978 log_print("recover_master_copy %d %x", error, rl->rl_lkid); 3979 rl->rl_result = error; 3980 return error; 3981 } 3982 3983 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc) 3984 { 3985 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf; 3986 struct dlm_rsb *r; 3987 struct dlm_lkb *lkb; 3988 int error; 3989 3990 error = find_lkb(ls, rl->rl_lkid, &lkb); 3991 if (error) { 3992 log_error(ls, "recover_process_copy no lkid %x", rl->rl_lkid); 3993 return error; 3994 } 3995 3996 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); 3997 3998 error = rl->rl_result; 3999 4000 r = lkb->lkb_resource; 4001 hold_rsb(r); 4002 lock_rsb(r); 4003 4004 switch (error) { 4005 case -EBADR: 4006 /* There's a chance the new master received our lock before 4007 dlm_recover_master_reply(), this wouldn't happen if we did 4008 a barrier between recover_masters and recover_locks. */ 4009 log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id, 4010 (unsigned long)r, r->res_name); 4011 dlm_send_rcom_lock(r, lkb); 4012 goto out; 4013 case -EEXIST: 4014 log_debug(ls, "master copy exists %x", lkb->lkb_id); 4015 /* fall through */ 4016 case 0: 4017 lkb->lkb_remid = rl->rl_remid; 4018 break; 4019 default: 4020 log_error(ls, "dlm_recover_process_copy unknown error %d %x", 4021 error, lkb->lkb_id); 4022 } 4023 4024 /* an ack for dlm_recover_locks() which waits for replies from 4025 all the locks it sends to new masters */ 4026 dlm_recovered_lock(r); 4027 out: 4028 unlock_rsb(r); 4029 put_rsb(r); 4030 dlm_put_lkb(lkb); 4031 4032 return 0; 4033 } 4034 4035 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua, 4036 int mode, uint32_t flags, void *name, unsigned int namelen, 4037 uint32_t parent_lkid) 4038 { 4039 struct dlm_lkb *lkb; 4040 struct dlm_args args; 4041 int error; 4042 4043 lock_recovery(ls); 4044 4045 error = create_lkb(ls, &lkb); 4046 if (error) { 4047 kfree(ua); 4048 goto out; 4049 } 4050 4051 if (flags & DLM_LKF_VALBLK) { 4052 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL); 4053 if (!ua->lksb.sb_lvbptr) { 4054 kfree(ua); 4055 __put_lkb(ls, lkb); 4056 error = -ENOMEM; 4057 goto out; 4058 } 4059 } 4060 4061 /* After ua is attached to lkb it will be freed by free_lkb(). 4062 When DLM_IFL_USER is set, the dlm knows that this is a userspace 4063 lock and that lkb_astparam is the dlm_user_args structure. */ 4064 4065 error = set_lock_args(mode, &ua->lksb, flags, namelen, parent_lkid, 4066 DLM_FAKE_USER_AST, ua, DLM_FAKE_USER_AST, &args); 4067 lkb->lkb_flags |= DLM_IFL_USER; 4068 ua->old_mode = DLM_LOCK_IV; 4069 4070 if (error) { 4071 __put_lkb(ls, lkb); 4072 goto out; 4073 } 4074 4075 error = request_lock(ls, lkb, name, namelen, &args); 4076 4077 switch (error) { 4078 case 0: 4079 break; 4080 case -EINPROGRESS: 4081 error = 0; 4082 break; 4083 case -EAGAIN: 4084 error = 0; 4085 /* fall through */ 4086 default: 4087 __put_lkb(ls, lkb); 4088 goto out; 4089 } 4090 4091 /* add this new lkb to the per-process list of locks */ 4092 spin_lock(&ua->proc->locks_spin); 4093 hold_lkb(lkb); 4094 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks); 4095 spin_unlock(&ua->proc->locks_spin); 4096 out: 4097 unlock_recovery(ls); 4098 return error; 4099 } 4100 4101 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, 4102 int mode, uint32_t flags, uint32_t lkid, char *lvb_in) 4103 { 4104 struct dlm_lkb *lkb; 4105 struct dlm_args args; 4106 struct dlm_user_args *ua; 4107 int error; 4108 4109 lock_recovery(ls); 4110 4111 error = find_lkb(ls, lkid, &lkb); 4112 if (error) 4113 goto out; 4114 4115 /* user can change the params on its lock when it converts it, or 4116 add an lvb that didn't exist before */ 4117 4118 ua = (struct dlm_user_args *)lkb->lkb_astparam; 4119 4120 if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) { 4121 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL); 4122 if (!ua->lksb.sb_lvbptr) { 4123 error = -ENOMEM; 4124 goto out_put; 4125 } 4126 } 4127 if (lvb_in && ua->lksb.sb_lvbptr) 4128 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN); 4129 4130 ua->castparam = ua_tmp->castparam; 4131 ua->castaddr = ua_tmp->castaddr; 4132 ua->bastparam = ua_tmp->bastparam; 4133 ua->bastaddr = ua_tmp->bastaddr; 4134 ua->user_lksb = ua_tmp->user_lksb; 4135 ua->old_mode = lkb->lkb_grmode; 4136 4137 error = set_lock_args(mode, &ua->lksb, flags, 0, 0, DLM_FAKE_USER_AST, 4138 ua, DLM_FAKE_USER_AST, &args); 4139 if (error) 4140 goto out_put; 4141 4142 error = convert_lock(ls, lkb, &args); 4143 4144 if (error == -EINPROGRESS || error == -EAGAIN) 4145 error = 0; 4146 out_put: 4147 dlm_put_lkb(lkb); 4148 out: 4149 unlock_recovery(ls); 4150 kfree(ua_tmp); 4151 return error; 4152 } 4153 4154 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, 4155 uint32_t flags, uint32_t lkid, char *lvb_in) 4156 { 4157 struct dlm_lkb *lkb; 4158 struct dlm_args args; 4159 struct dlm_user_args *ua; 4160 int error; 4161 4162 lock_recovery(ls); 4163 4164 error = find_lkb(ls, lkid, &lkb); 4165 if (error) 4166 goto out; 4167 4168 ua = (struct dlm_user_args *)lkb->lkb_astparam; 4169 4170 if (lvb_in && ua->lksb.sb_lvbptr) 4171 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN); 4172 ua->castparam = ua_tmp->castparam; 4173 ua->user_lksb = ua_tmp->user_lksb; 4174 4175 error = set_unlock_args(flags, ua, &args); 4176 if (error) 4177 goto out_put; 4178 4179 error = unlock_lock(ls, lkb, &args); 4180 4181 if (error == -DLM_EUNLOCK) 4182 error = 0; 4183 /* from validate_unlock_args() */ 4184 if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK)) 4185 error = 0; 4186 if (error) 4187 goto out_put; 4188 4189 spin_lock(&ua->proc->locks_spin); 4190 /* dlm_user_add_ast() may have already taken lkb off the proc list */ 4191 if (!list_empty(&lkb->lkb_ownqueue)) 4192 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking); 4193 spin_unlock(&ua->proc->locks_spin); 4194 out_put: 4195 dlm_put_lkb(lkb); 4196 out: 4197 unlock_recovery(ls); 4198 kfree(ua_tmp); 4199 return error; 4200 } 4201 4202 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, 4203 uint32_t flags, uint32_t lkid) 4204 { 4205 struct dlm_lkb *lkb; 4206 struct dlm_args args; 4207 struct dlm_user_args *ua; 4208 int error; 4209 4210 lock_recovery(ls); 4211 4212 error = find_lkb(ls, lkid, &lkb); 4213 if (error) 4214 goto out; 4215 4216 ua = (struct dlm_user_args *)lkb->lkb_astparam; 4217 ua->castparam = ua_tmp->castparam; 4218 ua->user_lksb = ua_tmp->user_lksb; 4219 4220 error = set_unlock_args(flags, ua, &args); 4221 if (error) 4222 goto out_put; 4223 4224 error = cancel_lock(ls, lkb, &args); 4225 4226 if (error == -DLM_ECANCEL) 4227 error = 0; 4228 /* from validate_unlock_args() */ 4229 if (error == -EBUSY) 4230 error = 0; 4231 out_put: 4232 dlm_put_lkb(lkb); 4233 out: 4234 unlock_recovery(ls); 4235 kfree(ua_tmp); 4236 return error; 4237 } 4238 4239 /* lkb's that are removed from the waiters list by revert are just left on the 4240 orphans list with the granted orphan locks, to be freed by purge */ 4241 4242 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb) 4243 { 4244 struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam; 4245 struct dlm_args args; 4246 int error; 4247 4248 hold_lkb(lkb); 4249 mutex_lock(&ls->ls_orphans_mutex); 4250 list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans); 4251 mutex_unlock(&ls->ls_orphans_mutex); 4252 4253 set_unlock_args(0, ua, &args); 4254 4255 error = cancel_lock(ls, lkb, &args); 4256 if (error == -DLM_ECANCEL) 4257 error = 0; 4258 return error; 4259 } 4260 4261 /* The force flag allows the unlock to go ahead even if the lkb isn't granted. 4262 Regardless of what rsb queue the lock is on, it's removed and freed. */ 4263 4264 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb) 4265 { 4266 struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam; 4267 struct dlm_args args; 4268 int error; 4269 4270 set_unlock_args(DLM_LKF_FORCEUNLOCK, ua, &args); 4271 4272 error = unlock_lock(ls, lkb, &args); 4273 if (error == -DLM_EUNLOCK) 4274 error = 0; 4275 return error; 4276 } 4277 4278 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock() 4279 (which does lock_rsb) due to deadlock with receiving a message that does 4280 lock_rsb followed by dlm_user_add_ast() */ 4281 4282 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls, 4283 struct dlm_user_proc *proc) 4284 { 4285 struct dlm_lkb *lkb = NULL; 4286 4287 mutex_lock(&ls->ls_clear_proc_locks); 4288 if (list_empty(&proc->locks)) 4289 goto out; 4290 4291 lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue); 4292 list_del_init(&lkb->lkb_ownqueue); 4293 4294 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT) 4295 lkb->lkb_flags |= DLM_IFL_ORPHAN; 4296 else 4297 lkb->lkb_flags |= DLM_IFL_DEAD; 4298 out: 4299 mutex_unlock(&ls->ls_clear_proc_locks); 4300 return lkb; 4301 } 4302 4303 /* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which 4304 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts, 4305 which we clear here. */ 4306 4307 /* proc CLOSING flag is set so no more device_reads should look at proc->asts 4308 list, and no more device_writes should add lkb's to proc->locks list; so we 4309 shouldn't need to take asts_spin or locks_spin here. this assumes that 4310 device reads/writes/closes are serialized -- FIXME: we may need to serialize 4311 them ourself. */ 4312 4313 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) 4314 { 4315 struct dlm_lkb *lkb, *safe; 4316 4317 lock_recovery(ls); 4318 4319 while (1) { 4320 lkb = del_proc_lock(ls, proc); 4321 if (!lkb) 4322 break; 4323 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT) 4324 orphan_proc_lock(ls, lkb); 4325 else 4326 unlock_proc_lock(ls, lkb); 4327 4328 /* this removes the reference for the proc->locks list 4329 added by dlm_user_request, it may result in the lkb 4330 being freed */ 4331 4332 dlm_put_lkb(lkb); 4333 } 4334 4335 mutex_lock(&ls->ls_clear_proc_locks); 4336 4337 /* in-progress unlocks */ 4338 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) { 4339 list_del_init(&lkb->lkb_ownqueue); 4340 lkb->lkb_flags |= DLM_IFL_DEAD; 4341 dlm_put_lkb(lkb); 4342 } 4343 4344 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) { 4345 list_del(&lkb->lkb_astqueue); 4346 dlm_put_lkb(lkb); 4347 } 4348 4349 mutex_unlock(&ls->ls_clear_proc_locks); 4350 unlock_recovery(ls); 4351 } 4352 4353 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) 4354 { 4355 struct dlm_lkb *lkb, *safe; 4356 4357 while (1) { 4358 lkb = NULL; 4359 spin_lock(&proc->locks_spin); 4360 if (!list_empty(&proc->locks)) { 4361 lkb = list_entry(proc->locks.next, struct dlm_lkb, 4362 lkb_ownqueue); 4363 list_del_init(&lkb->lkb_ownqueue); 4364 } 4365 spin_unlock(&proc->locks_spin); 4366 4367 if (!lkb) 4368 break; 4369 4370 lkb->lkb_flags |= DLM_IFL_DEAD; 4371 unlock_proc_lock(ls, lkb); 4372 dlm_put_lkb(lkb); /* ref from proc->locks list */ 4373 } 4374 4375 spin_lock(&proc->locks_spin); 4376 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) { 4377 list_del_init(&lkb->lkb_ownqueue); 4378 lkb->lkb_flags |= DLM_IFL_DEAD; 4379 dlm_put_lkb(lkb); 4380 } 4381 spin_unlock(&proc->locks_spin); 4382 4383 spin_lock(&proc->asts_spin); 4384 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) { 4385 list_del(&lkb->lkb_astqueue); 4386 dlm_put_lkb(lkb); 4387 } 4388 spin_unlock(&proc->asts_spin); 4389 } 4390 4391 /* pid of 0 means purge all orphans */ 4392 4393 static void do_purge(struct dlm_ls *ls, int nodeid, int pid) 4394 { 4395 struct dlm_lkb *lkb, *safe; 4396 4397 mutex_lock(&ls->ls_orphans_mutex); 4398 list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) { 4399 if (pid && lkb->lkb_ownpid != pid) 4400 continue; 4401 unlock_proc_lock(ls, lkb); 4402 list_del_init(&lkb->lkb_ownqueue); 4403 dlm_put_lkb(lkb); 4404 } 4405 mutex_unlock(&ls->ls_orphans_mutex); 4406 } 4407 4408 static int send_purge(struct dlm_ls *ls, int nodeid, int pid) 4409 { 4410 struct dlm_message *ms; 4411 struct dlm_mhandle *mh; 4412 int error; 4413 4414 error = _create_message(ls, sizeof(struct dlm_message), nodeid, 4415 DLM_MSG_PURGE, &ms, &mh); 4416 if (error) 4417 return error; 4418 ms->m_nodeid = nodeid; 4419 ms->m_pid = pid; 4420 4421 return send_message(mh, ms); 4422 } 4423 4424 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc, 4425 int nodeid, int pid) 4426 { 4427 int error = 0; 4428 4429 if (nodeid != dlm_our_nodeid()) { 4430 error = send_purge(ls, nodeid, pid); 4431 } else { 4432 lock_recovery(ls); 4433 if (pid == current->pid) 4434 purge_proc_locks(ls, proc); 4435 else 4436 do_purge(ls, nodeid, pid); 4437 unlock_recovery(ls); 4438 } 4439 return error; 4440 } 4441 4442