1 // SPDX-License-Identifier: GPL-2.0-or-later 2 /* -*- mode: c; c-basic-offset: 8; -*- 3 * vim: noexpandtab sw=8 ts=8 sts=0: 4 * 5 * dlmlock.c 6 * 7 * underlying calls for lock creation 8 * 9 * Copyright (C) 2004 Oracle. All rights reserved. 10 */ 11 12 13 #include <linux/module.h> 14 #include <linux/fs.h> 15 #include <linux/types.h> 16 #include <linux/slab.h> 17 #include <linux/highmem.h> 18 #include <linux/init.h> 19 #include <linux/sysctl.h> 20 #include <linux/random.h> 21 #include <linux/blkdev.h> 22 #include <linux/socket.h> 23 #include <linux/inet.h> 24 #include <linux/spinlock.h> 25 #include <linux/delay.h> 26 27 28 #include "cluster/heartbeat.h" 29 #include "cluster/nodemanager.h" 30 #include "cluster/tcp.h" 31 32 #include "dlmapi.h" 33 #include "dlmcommon.h" 34 35 #include "dlmconvert.h" 36 37 #define MLOG_MASK_PREFIX ML_DLM 38 #include "cluster/masklog.h" 39 40 static struct kmem_cache *dlm_lock_cache; 41 42 static DEFINE_SPINLOCK(dlm_cookie_lock); 43 static u64 dlm_next_cookie = 1; 44 45 static enum dlm_status dlm_send_remote_lock_request(struct dlm_ctxt *dlm, 46 struct dlm_lock_resource *res, 47 struct dlm_lock *lock, int flags); 48 static void dlm_init_lock(struct dlm_lock *newlock, int type, 49 u8 node, u64 cookie); 50 static void dlm_lock_release(struct kref *kref); 51 static void dlm_lock_detach_lockres(struct dlm_lock *lock); 52 53 int dlm_init_lock_cache(void) 54 { 55 dlm_lock_cache = kmem_cache_create("o2dlm_lock", 56 sizeof(struct dlm_lock), 57 0, SLAB_HWCACHE_ALIGN, NULL); 58 if (dlm_lock_cache == NULL) 59 return -ENOMEM; 60 return 0; 61 } 62 63 void dlm_destroy_lock_cache(void) 64 { 65 kmem_cache_destroy(dlm_lock_cache); 66 } 67 68 /* Tell us whether we can grant a new lock request. 69 * locking: 70 * caller needs: res->spinlock 71 * taken: none 72 * held on exit: none 73 * returns: 1 if the lock can be granted, 0 otherwise. 74 */ 75 static int dlm_can_grant_new_lock(struct dlm_lock_resource *res, 76 struct dlm_lock *lock) 77 { 78 struct dlm_lock *tmplock; 79 80 list_for_each_entry(tmplock, &res->granted, list) { 81 if (!dlm_lock_compatible(tmplock->ml.type, lock->ml.type)) 82 return 0; 83 } 84 85 list_for_each_entry(tmplock, &res->converting, list) { 86 if (!dlm_lock_compatible(tmplock->ml.type, lock->ml.type)) 87 return 0; 88 if (!dlm_lock_compatible(tmplock->ml.convert_type, 89 lock->ml.type)) 90 return 0; 91 } 92 93 return 1; 94 } 95 96 /* performs lock creation at the lockres master site 97 * locking: 98 * caller needs: none 99 * taken: takes and drops res->spinlock 100 * held on exit: none 101 * returns: DLM_NORMAL, DLM_NOTQUEUED 102 */ 103 static enum dlm_status dlmlock_master(struct dlm_ctxt *dlm, 104 struct dlm_lock_resource *res, 105 struct dlm_lock *lock, int flags) 106 { 107 int call_ast = 0, kick_thread = 0; 108 enum dlm_status status = DLM_NORMAL; 109 110 mlog(0, "type=%d\n", lock->ml.type); 111 112 spin_lock(&res->spinlock); 113 /* if called from dlm_create_lock_handler, need to 114 * ensure it will not sleep in dlm_wait_on_lockres */ 115 status = __dlm_lockres_state_to_status(res); 116 if (status != DLM_NORMAL && 117 lock->ml.node != dlm->node_num) { 118 /* erf. state changed after lock was dropped. */ 119 spin_unlock(&res->spinlock); 120 dlm_error(status); 121 return status; 122 } 123 __dlm_wait_on_lockres(res); 124 __dlm_lockres_reserve_ast(res); 125 126 if (dlm_can_grant_new_lock(res, lock)) { 127 mlog(0, "I can grant this lock right away\n"); 128 /* got it right away */ 129 lock->lksb->status = DLM_NORMAL; 130 status = DLM_NORMAL; 131 dlm_lock_get(lock); 132 list_add_tail(&lock->list, &res->granted); 133 134 /* for the recovery lock, we can't allow the ast 135 * to be queued since the dlmthread is already 136 * frozen. but the recovery lock is always locked 137 * with LKM_NOQUEUE so we do not need the ast in 138 * this special case */ 139 if (!dlm_is_recovery_lock(res->lockname.name, 140 res->lockname.len)) { 141 kick_thread = 1; 142 call_ast = 1; 143 } else { 144 mlog(0, "%s: returning DLM_NORMAL to " 145 "node %u for reco lock\n", dlm->name, 146 lock->ml.node); 147 } 148 } else { 149 /* for NOQUEUE request, unless we get the 150 * lock right away, return DLM_NOTQUEUED */ 151 if (flags & LKM_NOQUEUE) { 152 status = DLM_NOTQUEUED; 153 if (dlm_is_recovery_lock(res->lockname.name, 154 res->lockname.len)) { 155 mlog(0, "%s: returning NOTQUEUED to " 156 "node %u for reco lock\n", dlm->name, 157 lock->ml.node); 158 } 159 } else { 160 status = DLM_NORMAL; 161 dlm_lock_get(lock); 162 list_add_tail(&lock->list, &res->blocked); 163 kick_thread = 1; 164 } 165 } 166 167 spin_unlock(&res->spinlock); 168 wake_up(&res->wq); 169 170 /* either queue the ast or release it */ 171 if (call_ast) 172 dlm_queue_ast(dlm, lock); 173 else 174 dlm_lockres_release_ast(dlm, res); 175 176 dlm_lockres_calc_usage(dlm, res); 177 if (kick_thread) 178 dlm_kick_thread(dlm, res); 179 180 return status; 181 } 182 183 void dlm_revert_pending_lock(struct dlm_lock_resource *res, 184 struct dlm_lock *lock) 185 { 186 /* remove from local queue if it failed */ 187 list_del_init(&lock->list); 188 lock->lksb->flags &= ~DLM_LKSB_GET_LVB; 189 } 190 191 192 /* 193 * locking: 194 * caller needs: none 195 * taken: takes and drops res->spinlock 196 * held on exit: none 197 * returns: DLM_DENIED, DLM_RECOVERING, or net status 198 */ 199 static enum dlm_status dlmlock_remote(struct dlm_ctxt *dlm, 200 struct dlm_lock_resource *res, 201 struct dlm_lock *lock, int flags) 202 { 203 enum dlm_status status = DLM_DENIED; 204 int lockres_changed = 1; 205 206 mlog(0, "type=%d, lockres %.*s, flags = 0x%x\n", 207 lock->ml.type, res->lockname.len, 208 res->lockname.name, flags); 209 210 /* 211 * Wait if resource is getting recovered, remastered, etc. 212 * If the resource was remastered and new owner is self, then exit. 213 */ 214 spin_lock(&res->spinlock); 215 __dlm_wait_on_lockres(res); 216 if (res->owner == dlm->node_num) { 217 spin_unlock(&res->spinlock); 218 return DLM_RECOVERING; 219 } 220 res->state |= DLM_LOCK_RES_IN_PROGRESS; 221 222 /* add lock to local (secondary) queue */ 223 dlm_lock_get(lock); 224 list_add_tail(&lock->list, &res->blocked); 225 lock->lock_pending = 1; 226 spin_unlock(&res->spinlock); 227 228 /* spec seems to say that you will get DLM_NORMAL when the lock 229 * has been queued, meaning we need to wait for a reply here. */ 230 status = dlm_send_remote_lock_request(dlm, res, lock, flags); 231 232 spin_lock(&res->spinlock); 233 res->state &= ~DLM_LOCK_RES_IN_PROGRESS; 234 lock->lock_pending = 0; 235 if (status != DLM_NORMAL) { 236 if (status == DLM_RECOVERING && 237 dlm_is_recovery_lock(res->lockname.name, 238 res->lockname.len)) { 239 /* recovery lock was mastered by dead node. 240 * we need to have calc_usage shoot down this 241 * lockres and completely remaster it. */ 242 mlog(0, "%s: recovery lock was owned by " 243 "dead node %u, remaster it now.\n", 244 dlm->name, res->owner); 245 } else if (status != DLM_NOTQUEUED) { 246 /* 247 * DO NOT call calc_usage, as this would unhash 248 * the remote lockres before we ever get to use 249 * it. treat as if we never made any change to 250 * the lockres. 251 */ 252 lockres_changed = 0; 253 dlm_error(status); 254 } 255 dlm_revert_pending_lock(res, lock); 256 dlm_lock_put(lock); 257 } else if (dlm_is_recovery_lock(res->lockname.name, 258 res->lockname.len)) { 259 /* special case for the $RECOVERY lock. 260 * there will never be an AST delivered to put 261 * this lock on the proper secondary queue 262 * (granted), so do it manually. */ 263 mlog(0, "%s: $RECOVERY lock for this node (%u) is " 264 "mastered by %u; got lock, manually granting (no ast)\n", 265 dlm->name, dlm->node_num, res->owner); 266 list_move_tail(&lock->list, &res->granted); 267 } 268 spin_unlock(&res->spinlock); 269 270 if (lockres_changed) 271 dlm_lockres_calc_usage(dlm, res); 272 273 wake_up(&res->wq); 274 return status; 275 } 276 277 278 /* for remote lock creation. 279 * locking: 280 * caller needs: none, but need res->state & DLM_LOCK_RES_IN_PROGRESS 281 * taken: none 282 * held on exit: none 283 * returns: DLM_NOLOCKMGR, or net status 284 */ 285 static enum dlm_status dlm_send_remote_lock_request(struct dlm_ctxt *dlm, 286 struct dlm_lock_resource *res, 287 struct dlm_lock *lock, int flags) 288 { 289 struct dlm_create_lock create; 290 int tmpret, status = 0; 291 enum dlm_status ret; 292 293 memset(&create, 0, sizeof(create)); 294 create.node_idx = dlm->node_num; 295 create.requested_type = lock->ml.type; 296 create.cookie = lock->ml.cookie; 297 create.namelen = res->lockname.len; 298 create.flags = cpu_to_be32(flags); 299 memcpy(create.name, res->lockname.name, create.namelen); 300 301 tmpret = o2net_send_message(DLM_CREATE_LOCK_MSG, dlm->key, &create, 302 sizeof(create), res->owner, &status); 303 if (tmpret >= 0) { 304 ret = status; 305 if (ret == DLM_REJECTED) { 306 mlog(ML_ERROR, "%s: res %.*s, Stale lockres no longer " 307 "owned by node %u. That node is coming back up " 308 "currently.\n", dlm->name, create.namelen, 309 create.name, res->owner); 310 dlm_print_one_lock_resource(res); 311 BUG(); 312 } 313 } else { 314 mlog(ML_ERROR, "%s: res %.*s, Error %d send CREATE LOCK to " 315 "node %u\n", dlm->name, create.namelen, create.name, 316 tmpret, res->owner); 317 if (dlm_is_host_down(tmpret)) 318 ret = DLM_RECOVERING; 319 else 320 ret = dlm_err_to_dlm_status(tmpret); 321 } 322 323 return ret; 324 } 325 326 void dlm_lock_get(struct dlm_lock *lock) 327 { 328 kref_get(&lock->lock_refs); 329 } 330 331 void dlm_lock_put(struct dlm_lock *lock) 332 { 333 kref_put(&lock->lock_refs, dlm_lock_release); 334 } 335 336 static void dlm_lock_release(struct kref *kref) 337 { 338 struct dlm_lock *lock; 339 340 lock = container_of(kref, struct dlm_lock, lock_refs); 341 342 BUG_ON(!list_empty(&lock->list)); 343 BUG_ON(!list_empty(&lock->ast_list)); 344 BUG_ON(!list_empty(&lock->bast_list)); 345 BUG_ON(lock->ast_pending); 346 BUG_ON(lock->bast_pending); 347 348 dlm_lock_detach_lockres(lock); 349 350 if (lock->lksb_kernel_allocated) { 351 mlog(0, "freeing kernel-allocated lksb\n"); 352 kfree(lock->lksb); 353 } 354 kmem_cache_free(dlm_lock_cache, lock); 355 } 356 357 /* associate a lock with it's lockres, getting a ref on the lockres */ 358 void dlm_lock_attach_lockres(struct dlm_lock *lock, 359 struct dlm_lock_resource *res) 360 { 361 dlm_lockres_get(res); 362 lock->lockres = res; 363 } 364 365 /* drop ref on lockres, if there is still one associated with lock */ 366 static void dlm_lock_detach_lockres(struct dlm_lock *lock) 367 { 368 struct dlm_lock_resource *res; 369 370 res = lock->lockres; 371 if (res) { 372 lock->lockres = NULL; 373 mlog(0, "removing lock's lockres reference\n"); 374 dlm_lockres_put(res); 375 } 376 } 377 378 static void dlm_init_lock(struct dlm_lock *newlock, int type, 379 u8 node, u64 cookie) 380 { 381 INIT_LIST_HEAD(&newlock->list); 382 INIT_LIST_HEAD(&newlock->ast_list); 383 INIT_LIST_HEAD(&newlock->bast_list); 384 spin_lock_init(&newlock->spinlock); 385 newlock->ml.type = type; 386 newlock->ml.convert_type = LKM_IVMODE; 387 newlock->ml.highest_blocked = LKM_IVMODE; 388 newlock->ml.node = node; 389 newlock->ml.pad1 = 0; 390 newlock->ml.list = 0; 391 newlock->ml.flags = 0; 392 newlock->ast = NULL; 393 newlock->bast = NULL; 394 newlock->astdata = NULL; 395 newlock->ml.cookie = cpu_to_be64(cookie); 396 newlock->ast_pending = 0; 397 newlock->bast_pending = 0; 398 newlock->convert_pending = 0; 399 newlock->lock_pending = 0; 400 newlock->unlock_pending = 0; 401 newlock->cancel_pending = 0; 402 newlock->lksb_kernel_allocated = 0; 403 404 kref_init(&newlock->lock_refs); 405 } 406 407 struct dlm_lock * dlm_new_lock(int type, u8 node, u64 cookie, 408 struct dlm_lockstatus *lksb) 409 { 410 struct dlm_lock *lock; 411 int kernel_allocated = 0; 412 413 lock = kmem_cache_zalloc(dlm_lock_cache, GFP_NOFS); 414 if (!lock) 415 return NULL; 416 417 if (!lksb) { 418 /* zero memory only if kernel-allocated */ 419 lksb = kzalloc(sizeof(*lksb), GFP_NOFS); 420 if (!lksb) { 421 kmem_cache_free(dlm_lock_cache, lock); 422 return NULL; 423 } 424 kernel_allocated = 1; 425 } 426 427 dlm_init_lock(lock, type, node, cookie); 428 if (kernel_allocated) 429 lock->lksb_kernel_allocated = 1; 430 lock->lksb = lksb; 431 lksb->lockid = lock; 432 return lock; 433 } 434 435 /* handler for lock creation net message 436 * locking: 437 * caller needs: none 438 * taken: takes and drops res->spinlock 439 * held on exit: none 440 * returns: DLM_NORMAL, DLM_SYSERR, DLM_IVLOCKID, DLM_NOTQUEUED 441 */ 442 int dlm_create_lock_handler(struct o2net_msg *msg, u32 len, void *data, 443 void **ret_data) 444 { 445 struct dlm_ctxt *dlm = data; 446 struct dlm_create_lock *create = (struct dlm_create_lock *)msg->buf; 447 struct dlm_lock_resource *res = NULL; 448 struct dlm_lock *newlock = NULL; 449 struct dlm_lockstatus *lksb = NULL; 450 enum dlm_status status = DLM_NORMAL; 451 char *name; 452 unsigned int namelen; 453 454 BUG_ON(!dlm); 455 456 if (!dlm_grab(dlm)) 457 return DLM_REJECTED; 458 459 name = create->name; 460 namelen = create->namelen; 461 status = DLM_REJECTED; 462 if (!dlm_domain_fully_joined(dlm)) { 463 mlog(ML_ERROR, "Domain %s not fully joined, but node %u is " 464 "sending a create_lock message for lock %.*s!\n", 465 dlm->name, create->node_idx, namelen, name); 466 dlm_error(status); 467 goto leave; 468 } 469 470 status = DLM_IVBUFLEN; 471 if (namelen > DLM_LOCKID_NAME_MAX) { 472 dlm_error(status); 473 goto leave; 474 } 475 476 status = DLM_SYSERR; 477 newlock = dlm_new_lock(create->requested_type, 478 create->node_idx, 479 be64_to_cpu(create->cookie), NULL); 480 if (!newlock) { 481 dlm_error(status); 482 goto leave; 483 } 484 485 lksb = newlock->lksb; 486 487 if (be32_to_cpu(create->flags) & LKM_GET_LVB) { 488 lksb->flags |= DLM_LKSB_GET_LVB; 489 mlog(0, "set DLM_LKSB_GET_LVB flag\n"); 490 } 491 492 status = DLM_IVLOCKID; 493 res = dlm_lookup_lockres(dlm, name, namelen); 494 if (!res) { 495 dlm_error(status); 496 goto leave; 497 } 498 499 spin_lock(&res->spinlock); 500 status = __dlm_lockres_state_to_status(res); 501 spin_unlock(&res->spinlock); 502 503 if (status != DLM_NORMAL) { 504 mlog(0, "lockres recovering/migrating/in-progress\n"); 505 goto leave; 506 } 507 508 dlm_lock_attach_lockres(newlock, res); 509 510 status = dlmlock_master(dlm, res, newlock, be32_to_cpu(create->flags)); 511 leave: 512 if (status != DLM_NORMAL) 513 if (newlock) 514 dlm_lock_put(newlock); 515 516 if (res) 517 dlm_lockres_put(res); 518 519 dlm_put(dlm); 520 521 return status; 522 } 523 524 525 /* fetch next node-local (u8 nodenum + u56 cookie) into u64 */ 526 static inline void dlm_get_next_cookie(u8 node_num, u64 *cookie) 527 { 528 u64 tmpnode = node_num; 529 530 /* shift single byte of node num into top 8 bits */ 531 tmpnode <<= 56; 532 533 spin_lock(&dlm_cookie_lock); 534 *cookie = (dlm_next_cookie | tmpnode); 535 if (++dlm_next_cookie & 0xff00000000000000ull) { 536 mlog(0, "This node's cookie will now wrap!\n"); 537 dlm_next_cookie = 1; 538 } 539 spin_unlock(&dlm_cookie_lock); 540 } 541 542 enum dlm_status dlmlock(struct dlm_ctxt *dlm, int mode, 543 struct dlm_lockstatus *lksb, int flags, 544 const char *name, int namelen, dlm_astlockfunc_t *ast, 545 void *data, dlm_bastlockfunc_t *bast) 546 { 547 enum dlm_status status; 548 struct dlm_lock_resource *res = NULL; 549 struct dlm_lock *lock = NULL; 550 int convert = 0, recovery = 0; 551 552 /* yes this function is a mess. 553 * TODO: clean this up. lots of common code in the 554 * lock and convert paths, especially in the retry blocks */ 555 if (!lksb) { 556 dlm_error(DLM_BADARGS); 557 return DLM_BADARGS; 558 } 559 560 status = DLM_BADPARAM; 561 if (mode != LKM_EXMODE && mode != LKM_PRMODE && mode != LKM_NLMODE) { 562 dlm_error(status); 563 goto error; 564 } 565 566 if (flags & ~LKM_VALID_FLAGS) { 567 dlm_error(status); 568 goto error; 569 } 570 571 convert = (flags & LKM_CONVERT); 572 recovery = (flags & LKM_RECOVERY); 573 574 if (recovery && 575 (!dlm_is_recovery_lock(name, namelen) || convert) ) { 576 dlm_error(status); 577 goto error; 578 } 579 if (convert && (flags & LKM_LOCAL)) { 580 mlog(ML_ERROR, "strange LOCAL convert request!\n"); 581 goto error; 582 } 583 584 if (convert) { 585 /* CONVERT request */ 586 587 /* if converting, must pass in a valid dlm_lock */ 588 lock = lksb->lockid; 589 if (!lock) { 590 mlog(ML_ERROR, "NULL lock pointer in convert " 591 "request\n"); 592 goto error; 593 } 594 595 res = lock->lockres; 596 if (!res) { 597 mlog(ML_ERROR, "NULL lockres pointer in convert " 598 "request\n"); 599 goto error; 600 } 601 dlm_lockres_get(res); 602 603 /* XXX: for ocfs2 purposes, the ast/bast/astdata/lksb are 604 * static after the original lock call. convert requests will 605 * ensure that everything is the same, or return DLM_BADARGS. 606 * this means that DLM_DENIED_NOASTS will never be returned. 607 */ 608 if (lock->lksb != lksb || lock->ast != ast || 609 lock->bast != bast || lock->astdata != data) { 610 status = DLM_BADARGS; 611 mlog(ML_ERROR, "new args: lksb=%p, ast=%p, bast=%p, " 612 "astdata=%p\n", lksb, ast, bast, data); 613 mlog(ML_ERROR, "orig args: lksb=%p, ast=%p, bast=%p, " 614 "astdata=%p\n", lock->lksb, lock->ast, 615 lock->bast, lock->astdata); 616 goto error; 617 } 618 retry_convert: 619 dlm_wait_for_recovery(dlm); 620 621 if (res->owner == dlm->node_num) 622 status = dlmconvert_master(dlm, res, lock, flags, mode); 623 else 624 status = dlmconvert_remote(dlm, res, lock, flags, mode); 625 if (status == DLM_RECOVERING || status == DLM_MIGRATING || 626 status == DLM_FORWARD) { 627 /* for now, see how this works without sleeping 628 * and just retry right away. I suspect the reco 629 * or migration will complete fast enough that 630 * no waiting will be necessary */ 631 mlog(0, "retrying convert with migration/recovery/" 632 "in-progress\n"); 633 msleep(100); 634 goto retry_convert; 635 } 636 } else { 637 u64 tmpcookie; 638 639 /* LOCK request */ 640 status = DLM_BADARGS; 641 if (!name) { 642 dlm_error(status); 643 goto error; 644 } 645 646 status = DLM_IVBUFLEN; 647 if (namelen > DLM_LOCKID_NAME_MAX || namelen < 1) { 648 dlm_error(status); 649 goto error; 650 } 651 652 dlm_get_next_cookie(dlm->node_num, &tmpcookie); 653 lock = dlm_new_lock(mode, dlm->node_num, tmpcookie, lksb); 654 if (!lock) { 655 dlm_error(status); 656 goto error; 657 } 658 659 if (!recovery) 660 dlm_wait_for_recovery(dlm); 661 662 /* find or create the lock resource */ 663 res = dlm_get_lock_resource(dlm, name, namelen, flags); 664 if (!res) { 665 status = DLM_IVLOCKID; 666 dlm_error(status); 667 goto error; 668 } 669 670 mlog(0, "type=%d, flags = 0x%x\n", mode, flags); 671 mlog(0, "creating lock: lock=%p res=%p\n", lock, res); 672 673 dlm_lock_attach_lockres(lock, res); 674 lock->ast = ast; 675 lock->bast = bast; 676 lock->astdata = data; 677 678 retry_lock: 679 if (flags & LKM_VALBLK) { 680 mlog(0, "LKM_VALBLK passed by caller\n"); 681 682 /* LVB requests for non PR, PW or EX locks are 683 * ignored. */ 684 if (mode < LKM_PRMODE) 685 flags &= ~LKM_VALBLK; 686 else { 687 flags |= LKM_GET_LVB; 688 lock->lksb->flags |= DLM_LKSB_GET_LVB; 689 } 690 } 691 692 if (res->owner == dlm->node_num) 693 status = dlmlock_master(dlm, res, lock, flags); 694 else 695 status = dlmlock_remote(dlm, res, lock, flags); 696 697 if (status == DLM_RECOVERING || status == DLM_MIGRATING || 698 status == DLM_FORWARD) { 699 msleep(100); 700 if (recovery) { 701 if (status != DLM_RECOVERING) 702 goto retry_lock; 703 /* wait to see the node go down, then 704 * drop down and allow the lockres to 705 * get cleaned up. need to remaster. */ 706 dlm_wait_for_node_death(dlm, res->owner, 707 DLM_NODE_DEATH_WAIT_MAX); 708 } else { 709 dlm_wait_for_recovery(dlm); 710 goto retry_lock; 711 } 712 } 713 714 /* Inflight taken in dlm_get_lock_resource() is dropped here */ 715 spin_lock(&res->spinlock); 716 dlm_lockres_drop_inflight_ref(dlm, res); 717 spin_unlock(&res->spinlock); 718 719 dlm_lockres_calc_usage(dlm, res); 720 dlm_kick_thread(dlm, res); 721 722 if (status != DLM_NORMAL) { 723 lock->lksb->flags &= ~DLM_LKSB_GET_LVB; 724 if (status != DLM_NOTQUEUED) 725 dlm_error(status); 726 goto error; 727 } 728 } 729 730 error: 731 if (status != DLM_NORMAL) { 732 if (lock && !convert) 733 dlm_lock_put(lock); 734 // this is kind of unnecessary 735 lksb->status = status; 736 } 737 738 /* put lockres ref from the convert path 739 * or from dlm_get_lock_resource */ 740 if (res) 741 dlm_lockres_put(res); 742 743 return status; 744 } 745 EXPORT_SYMBOL_GPL(dlmlock); 746