1 /* -*- mode: c; c-basic-offset: 8; -*- 2 * vim: noexpandtab sw=8 ts=8 sts=0: 3 * 4 * dlmlock.c 5 * 6 * underlying calls for lock creation 7 * 8 * Copyright (C) 2004 Oracle. All rights reserved. 9 * 10 * This program is free software; you can redistribute it and/or 11 * modify it under the terms of the GNU General Public 12 * License as published by the Free Software Foundation; either 13 * version 2 of the License, or (at your option) any later version. 14 * 15 * This program is distributed in the hope that it will be useful, 16 * but WITHOUT ANY WARRANTY; without even the implied warranty of 17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 18 * General Public License for more details. 19 * 20 * You should have received a copy of the GNU General Public 21 * License along with this program; if not, write to the 22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 23 * Boston, MA 021110-1307, USA. 24 * 25 */ 26 27 28 #include <linux/module.h> 29 #include <linux/fs.h> 30 #include <linux/types.h> 31 #include <linux/slab.h> 32 #include <linux/highmem.h> 33 #include <linux/utsname.h> 34 #include <linux/init.h> 35 #include <linux/sysctl.h> 36 #include <linux/random.h> 37 #include <linux/blkdev.h> 38 #include <linux/socket.h> 39 #include <linux/inet.h> 40 #include <linux/spinlock.h> 41 #include <linux/delay.h> 42 43 44 #include "cluster/heartbeat.h" 45 #include "cluster/nodemanager.h" 46 #include "cluster/tcp.h" 47 48 #include "dlmapi.h" 49 #include "dlmcommon.h" 50 51 #include "dlmconvert.h" 52 53 #define MLOG_MASK_PREFIX ML_DLM 54 #include "cluster/masklog.h" 55 56 static spinlock_t dlm_cookie_lock = SPIN_LOCK_UNLOCKED; 57 static u64 dlm_next_cookie = 1; 58 59 static enum dlm_status dlm_send_remote_lock_request(struct dlm_ctxt *dlm, 60 struct dlm_lock_resource *res, 61 struct dlm_lock *lock, int flags); 62 static void dlm_init_lock(struct dlm_lock *newlock, int type, 63 u8 node, u64 cookie); 64 static void dlm_lock_release(struct kref *kref); 65 static void dlm_lock_detach_lockres(struct dlm_lock *lock); 66 67 /* Tell us whether we can grant a new lock request. 68 * locking: 69 * caller needs: res->spinlock 70 * taken: none 71 * held on exit: none 72 * returns: 1 if the lock can be granted, 0 otherwise. 73 */ 74 static int dlm_can_grant_new_lock(struct dlm_lock_resource *res, 75 struct dlm_lock *lock) 76 { 77 struct list_head *iter; 78 struct dlm_lock *tmplock; 79 80 list_for_each(iter, &res->granted) { 81 tmplock = list_entry(iter, struct dlm_lock, list); 82 83 if (!dlm_lock_compatible(tmplock->ml.type, lock->ml.type)) 84 return 0; 85 } 86 87 list_for_each(iter, &res->converting) { 88 tmplock = list_entry(iter, struct dlm_lock, list); 89 90 if (!dlm_lock_compatible(tmplock->ml.type, lock->ml.type)) 91 return 0; 92 } 93 94 return 1; 95 } 96 97 /* performs lock creation at the lockres master site 98 * locking: 99 * caller needs: none 100 * taken: takes and drops res->spinlock 101 * held on exit: none 102 * returns: DLM_NORMAL, DLM_NOTQUEUED 103 */ 104 static enum dlm_status dlmlock_master(struct dlm_ctxt *dlm, 105 struct dlm_lock_resource *res, 106 struct dlm_lock *lock, int flags) 107 { 108 int call_ast = 0, kick_thread = 0; 109 enum dlm_status status = DLM_NORMAL; 110 111 mlog_entry("type=%d\n", lock->ml.type); 112 113 spin_lock(&res->spinlock); 114 /* if called from dlm_create_lock_handler, need to 115 * ensure it will not sleep in dlm_wait_on_lockres */ 116 status = __dlm_lockres_state_to_status(res); 117 if (status != DLM_NORMAL && 118 lock->ml.node != dlm->node_num) { 119 /* erf. state changed after lock was dropped. */ 120 spin_unlock(&res->spinlock); 121 dlm_error(status); 122 return status; 123 } 124 __dlm_wait_on_lockres(res); 125 __dlm_lockres_reserve_ast(res); 126 127 if (dlm_can_grant_new_lock(res, lock)) { 128 mlog(0, "I can grant this lock right away\n"); 129 /* got it right away */ 130 lock->lksb->status = DLM_NORMAL; 131 status = DLM_NORMAL; 132 dlm_lock_get(lock); 133 list_add_tail(&lock->list, &res->granted); 134 135 /* for the recovery lock, we can't allow the ast 136 * to be queued since the dlmthread is already 137 * frozen. but the recovery lock is always locked 138 * with LKM_NOQUEUE so we do not need the ast in 139 * this special case */ 140 if (!dlm_is_recovery_lock(res->lockname.name, 141 res->lockname.len)) { 142 kick_thread = 1; 143 call_ast = 1; 144 } 145 } else { 146 /* for NOQUEUE request, unless we get the 147 * lock right away, return DLM_NOTQUEUED */ 148 if (flags & LKM_NOQUEUE) 149 status = DLM_NOTQUEUED; 150 else { 151 dlm_lock_get(lock); 152 list_add_tail(&lock->list, &res->blocked); 153 kick_thread = 1; 154 } 155 } 156 157 spin_unlock(&res->spinlock); 158 wake_up(&res->wq); 159 160 /* either queue the ast or release it */ 161 if (call_ast) 162 dlm_queue_ast(dlm, lock); 163 else 164 dlm_lockres_release_ast(dlm, res); 165 166 dlm_lockres_calc_usage(dlm, res); 167 if (kick_thread) 168 dlm_kick_thread(dlm, res); 169 170 return status; 171 } 172 173 void dlm_revert_pending_lock(struct dlm_lock_resource *res, 174 struct dlm_lock *lock) 175 { 176 /* remove from local queue if it failed */ 177 list_del_init(&lock->list); 178 lock->lksb->flags &= ~DLM_LKSB_GET_LVB; 179 } 180 181 182 /* 183 * locking: 184 * caller needs: none 185 * taken: takes and drops res->spinlock 186 * held on exit: none 187 * returns: DLM_DENIED, DLM_RECOVERING, or net status 188 */ 189 static enum dlm_status dlmlock_remote(struct dlm_ctxt *dlm, 190 struct dlm_lock_resource *res, 191 struct dlm_lock *lock, int flags) 192 { 193 enum dlm_status status = DLM_DENIED; 194 195 mlog_entry("type=%d\n", lock->ml.type); 196 mlog(0, "lockres %.*s, flags = 0x%x\n", res->lockname.len, 197 res->lockname.name, flags); 198 199 spin_lock(&res->spinlock); 200 201 /* will exit this call with spinlock held */ 202 __dlm_wait_on_lockres(res); 203 res->state |= DLM_LOCK_RES_IN_PROGRESS; 204 205 /* add lock to local (secondary) queue */ 206 dlm_lock_get(lock); 207 list_add_tail(&lock->list, &res->blocked); 208 lock->lock_pending = 1; 209 spin_unlock(&res->spinlock); 210 211 /* spec seems to say that you will get DLM_NORMAL when the lock 212 * has been queued, meaning we need to wait for a reply here. */ 213 status = dlm_send_remote_lock_request(dlm, res, lock, flags); 214 215 spin_lock(&res->spinlock); 216 res->state &= ~DLM_LOCK_RES_IN_PROGRESS; 217 lock->lock_pending = 0; 218 if (status != DLM_NORMAL) { 219 if (status != DLM_NOTQUEUED) 220 dlm_error(status); 221 dlm_revert_pending_lock(res, lock); 222 dlm_lock_put(lock); 223 } 224 spin_unlock(&res->spinlock); 225 226 dlm_lockres_calc_usage(dlm, res); 227 228 wake_up(&res->wq); 229 return status; 230 } 231 232 233 /* for remote lock creation. 234 * locking: 235 * caller needs: none, but need res->state & DLM_LOCK_RES_IN_PROGRESS 236 * taken: none 237 * held on exit: none 238 * returns: DLM_NOLOCKMGR, or net status 239 */ 240 static enum dlm_status dlm_send_remote_lock_request(struct dlm_ctxt *dlm, 241 struct dlm_lock_resource *res, 242 struct dlm_lock *lock, int flags) 243 { 244 struct dlm_create_lock create; 245 int tmpret, status = 0; 246 enum dlm_status ret; 247 248 mlog_entry_void(); 249 250 memset(&create, 0, sizeof(create)); 251 create.node_idx = dlm->node_num; 252 create.requested_type = lock->ml.type; 253 create.cookie = lock->ml.cookie; 254 create.namelen = res->lockname.len; 255 create.flags = cpu_to_be32(flags); 256 memcpy(create.name, res->lockname.name, create.namelen); 257 258 tmpret = o2net_send_message(DLM_CREATE_LOCK_MSG, dlm->key, &create, 259 sizeof(create), res->owner, &status); 260 if (tmpret >= 0) { 261 // successfully sent and received 262 ret = status; // this is already a dlm_status 263 } else { 264 mlog_errno(tmpret); 265 if (dlm_is_host_down(tmpret)) { 266 ret = DLM_RECOVERING; 267 mlog(0, "node %u died so returning DLM_RECOVERING " 268 "from lock message!\n", res->owner); 269 } else { 270 ret = dlm_err_to_dlm_status(tmpret); 271 } 272 } 273 274 return ret; 275 } 276 277 void dlm_lock_get(struct dlm_lock *lock) 278 { 279 kref_get(&lock->lock_refs); 280 } 281 282 void dlm_lock_put(struct dlm_lock *lock) 283 { 284 kref_put(&lock->lock_refs, dlm_lock_release); 285 } 286 287 static void dlm_lock_release(struct kref *kref) 288 { 289 struct dlm_lock *lock; 290 291 lock = container_of(kref, struct dlm_lock, lock_refs); 292 293 BUG_ON(!list_empty(&lock->list)); 294 BUG_ON(!list_empty(&lock->ast_list)); 295 BUG_ON(!list_empty(&lock->bast_list)); 296 BUG_ON(lock->ast_pending); 297 BUG_ON(lock->bast_pending); 298 299 dlm_lock_detach_lockres(lock); 300 301 if (lock->lksb_kernel_allocated) { 302 mlog(0, "freeing kernel-allocated lksb\n"); 303 kfree(lock->lksb); 304 } 305 kfree(lock); 306 } 307 308 /* associate a lock with it's lockres, getting a ref on the lockres */ 309 void dlm_lock_attach_lockres(struct dlm_lock *lock, 310 struct dlm_lock_resource *res) 311 { 312 dlm_lockres_get(res); 313 lock->lockres = res; 314 } 315 316 /* drop ref on lockres, if there is still one associated with lock */ 317 static void dlm_lock_detach_lockres(struct dlm_lock *lock) 318 { 319 struct dlm_lock_resource *res; 320 321 res = lock->lockres; 322 if (res) { 323 lock->lockres = NULL; 324 mlog(0, "removing lock's lockres reference\n"); 325 dlm_lockres_put(res); 326 } 327 } 328 329 static void dlm_init_lock(struct dlm_lock *newlock, int type, 330 u8 node, u64 cookie) 331 { 332 INIT_LIST_HEAD(&newlock->list); 333 INIT_LIST_HEAD(&newlock->ast_list); 334 INIT_LIST_HEAD(&newlock->bast_list); 335 spin_lock_init(&newlock->spinlock); 336 newlock->ml.type = type; 337 newlock->ml.convert_type = LKM_IVMODE; 338 newlock->ml.highest_blocked = LKM_IVMODE; 339 newlock->ml.node = node; 340 newlock->ml.pad1 = 0; 341 newlock->ml.list = 0; 342 newlock->ml.flags = 0; 343 newlock->ast = NULL; 344 newlock->bast = NULL; 345 newlock->astdata = NULL; 346 newlock->ml.cookie = cpu_to_be64(cookie); 347 newlock->ast_pending = 0; 348 newlock->bast_pending = 0; 349 newlock->convert_pending = 0; 350 newlock->lock_pending = 0; 351 newlock->unlock_pending = 0; 352 newlock->cancel_pending = 0; 353 newlock->lksb_kernel_allocated = 0; 354 355 kref_init(&newlock->lock_refs); 356 } 357 358 struct dlm_lock * dlm_new_lock(int type, u8 node, u64 cookie, 359 struct dlm_lockstatus *lksb) 360 { 361 struct dlm_lock *lock; 362 int kernel_allocated = 0; 363 364 lock = kcalloc(1, sizeof(*lock), GFP_KERNEL); 365 if (!lock) 366 return NULL; 367 368 if (!lksb) { 369 /* zero memory only if kernel-allocated */ 370 lksb = kcalloc(1, sizeof(*lksb), GFP_KERNEL); 371 if (!lksb) { 372 kfree(lock); 373 return NULL; 374 } 375 kernel_allocated = 1; 376 } 377 378 dlm_init_lock(lock, type, node, cookie); 379 if (kernel_allocated) 380 lock->lksb_kernel_allocated = 1; 381 lock->lksb = lksb; 382 lksb->lockid = lock; 383 return lock; 384 } 385 386 /* handler for lock creation net message 387 * locking: 388 * caller needs: none 389 * taken: takes and drops res->spinlock 390 * held on exit: none 391 * returns: DLM_NORMAL, DLM_SYSERR, DLM_IVLOCKID, DLM_NOTQUEUED 392 */ 393 int dlm_create_lock_handler(struct o2net_msg *msg, u32 len, void *data) 394 { 395 struct dlm_ctxt *dlm = data; 396 struct dlm_create_lock *create = (struct dlm_create_lock *)msg->buf; 397 struct dlm_lock_resource *res = NULL; 398 struct dlm_lock *newlock = NULL; 399 struct dlm_lockstatus *lksb = NULL; 400 enum dlm_status status = DLM_NORMAL; 401 char *name; 402 unsigned int namelen; 403 404 BUG_ON(!dlm); 405 406 mlog_entry_void(); 407 408 if (!dlm_grab(dlm)) 409 return DLM_REJECTED; 410 411 mlog_bug_on_msg(!dlm_domain_fully_joined(dlm), 412 "Domain %s not fully joined!\n", dlm->name); 413 414 name = create->name; 415 namelen = create->namelen; 416 417 status = DLM_IVBUFLEN; 418 if (namelen > DLM_LOCKID_NAME_MAX) { 419 dlm_error(status); 420 goto leave; 421 } 422 423 status = DLM_SYSERR; 424 newlock = dlm_new_lock(create->requested_type, 425 create->node_idx, 426 be64_to_cpu(create->cookie), NULL); 427 if (!newlock) { 428 dlm_error(status); 429 goto leave; 430 } 431 432 lksb = newlock->lksb; 433 434 if (be32_to_cpu(create->flags) & LKM_GET_LVB) { 435 lksb->flags |= DLM_LKSB_GET_LVB; 436 mlog(0, "set DLM_LKSB_GET_LVB flag\n"); 437 } 438 439 status = DLM_IVLOCKID; 440 res = dlm_lookup_lockres(dlm, name, namelen); 441 if (!res) { 442 dlm_error(status); 443 goto leave; 444 } 445 446 spin_lock(&res->spinlock); 447 status = __dlm_lockres_state_to_status(res); 448 spin_unlock(&res->spinlock); 449 450 if (status != DLM_NORMAL) { 451 mlog(0, "lockres recovering/migrating/in-progress\n"); 452 goto leave; 453 } 454 455 dlm_lock_attach_lockres(newlock, res); 456 457 status = dlmlock_master(dlm, res, newlock, be32_to_cpu(create->flags)); 458 leave: 459 if (status != DLM_NORMAL) 460 if (newlock) 461 dlm_lock_put(newlock); 462 463 if (res) 464 dlm_lockres_put(res); 465 466 dlm_put(dlm); 467 468 return status; 469 } 470 471 472 /* fetch next node-local (u8 nodenum + u56 cookie) into u64 */ 473 static inline void dlm_get_next_cookie(u8 node_num, u64 *cookie) 474 { 475 u64 tmpnode = node_num; 476 477 /* shift single byte of node num into top 8 bits */ 478 tmpnode <<= 56; 479 480 spin_lock(&dlm_cookie_lock); 481 *cookie = (dlm_next_cookie | tmpnode); 482 if (++dlm_next_cookie & 0xff00000000000000ull) { 483 mlog(0, "This node's cookie will now wrap!\n"); 484 dlm_next_cookie = 1; 485 } 486 spin_unlock(&dlm_cookie_lock); 487 } 488 489 enum dlm_status dlmlock(struct dlm_ctxt *dlm, int mode, 490 struct dlm_lockstatus *lksb, int flags, 491 const char *name, dlm_astlockfunc_t *ast, void *data, 492 dlm_bastlockfunc_t *bast) 493 { 494 enum dlm_status status; 495 struct dlm_lock_resource *res = NULL; 496 struct dlm_lock *lock = NULL; 497 int convert = 0, recovery = 0; 498 499 /* yes this function is a mess. 500 * TODO: clean this up. lots of common code in the 501 * lock and convert paths, especially in the retry blocks */ 502 if (!lksb) { 503 dlm_error(DLM_BADARGS); 504 return DLM_BADARGS; 505 } 506 507 status = DLM_BADPARAM; 508 if (mode != LKM_EXMODE && mode != LKM_PRMODE && mode != LKM_NLMODE) { 509 dlm_error(status); 510 goto error; 511 } 512 513 if (flags & ~LKM_VALID_FLAGS) { 514 dlm_error(status); 515 goto error; 516 } 517 518 convert = (flags & LKM_CONVERT); 519 recovery = (flags & LKM_RECOVERY); 520 521 if (recovery && 522 (!dlm_is_recovery_lock(name, strlen(name)) || convert) ) { 523 dlm_error(status); 524 goto error; 525 } 526 if (convert && (flags & LKM_LOCAL)) { 527 mlog(ML_ERROR, "strange LOCAL convert request!\n"); 528 goto error; 529 } 530 531 if (convert) { 532 /* CONVERT request */ 533 534 /* if converting, must pass in a valid dlm_lock */ 535 lock = lksb->lockid; 536 if (!lock) { 537 mlog(ML_ERROR, "NULL lock pointer in convert " 538 "request\n"); 539 goto error; 540 } 541 542 res = lock->lockres; 543 if (!res) { 544 mlog(ML_ERROR, "NULL lockres pointer in convert " 545 "request\n"); 546 goto error; 547 } 548 dlm_lockres_get(res); 549 550 /* XXX: for ocfs2 purposes, the ast/bast/astdata/lksb are 551 * static after the original lock call. convert requests will 552 * ensure that everything is the same, or return DLM_BADARGS. 553 * this means that DLM_DENIED_NOASTS will never be returned. 554 */ 555 if (lock->lksb != lksb || lock->ast != ast || 556 lock->bast != bast || lock->astdata != data) { 557 status = DLM_BADARGS; 558 mlog(ML_ERROR, "new args: lksb=%p, ast=%p, bast=%p, " 559 "astdata=%p\n", lksb, ast, bast, data); 560 mlog(ML_ERROR, "orig args: lksb=%p, ast=%p, bast=%p, " 561 "astdata=%p\n", lock->lksb, lock->ast, 562 lock->bast, lock->astdata); 563 goto error; 564 } 565 retry_convert: 566 dlm_wait_for_recovery(dlm); 567 568 if (res->owner == dlm->node_num) 569 status = dlmconvert_master(dlm, res, lock, flags, mode); 570 else 571 status = dlmconvert_remote(dlm, res, lock, flags, mode); 572 if (status == DLM_RECOVERING || status == DLM_MIGRATING || 573 status == DLM_FORWARD) { 574 /* for now, see how this works without sleeping 575 * and just retry right away. I suspect the reco 576 * or migration will complete fast enough that 577 * no waiting will be necessary */ 578 mlog(0, "retrying convert with migration/recovery/" 579 "in-progress\n"); 580 msleep(100); 581 goto retry_convert; 582 } 583 } else { 584 u64 tmpcookie; 585 586 /* LOCK request */ 587 status = DLM_BADARGS; 588 if (!name) { 589 dlm_error(status); 590 goto error; 591 } 592 593 status = DLM_IVBUFLEN; 594 if (strlen(name) > DLM_LOCKID_NAME_MAX || strlen(name) < 1) { 595 dlm_error(status); 596 goto error; 597 } 598 599 dlm_get_next_cookie(dlm->node_num, &tmpcookie); 600 lock = dlm_new_lock(mode, dlm->node_num, tmpcookie, lksb); 601 if (!lock) { 602 dlm_error(status); 603 goto error; 604 } 605 606 if (!recovery) 607 dlm_wait_for_recovery(dlm); 608 609 /* find or create the lock resource */ 610 res = dlm_get_lock_resource(dlm, name, flags); 611 if (!res) { 612 status = DLM_IVLOCKID; 613 dlm_error(status); 614 goto error; 615 } 616 617 mlog(0, "type=%d, flags = 0x%x\n", mode, flags); 618 mlog(0, "creating lock: lock=%p res=%p\n", lock, res); 619 620 dlm_lock_attach_lockres(lock, res); 621 lock->ast = ast; 622 lock->bast = bast; 623 lock->astdata = data; 624 625 retry_lock: 626 if (flags & LKM_VALBLK) { 627 mlog(0, "LKM_VALBLK passed by caller\n"); 628 629 /* LVB requests for non PR, PW or EX locks are 630 * ignored. */ 631 if (mode < LKM_PRMODE) 632 flags &= ~LKM_VALBLK; 633 else { 634 flags |= LKM_GET_LVB; 635 lock->lksb->flags |= DLM_LKSB_GET_LVB; 636 } 637 } 638 639 if (res->owner == dlm->node_num) 640 status = dlmlock_master(dlm, res, lock, flags); 641 else 642 status = dlmlock_remote(dlm, res, lock, flags); 643 644 if (status == DLM_RECOVERING || status == DLM_MIGRATING || 645 status == DLM_FORWARD) { 646 mlog(0, "retrying lock with migration/" 647 "recovery/in progress\n"); 648 msleep(100); 649 dlm_wait_for_recovery(dlm); 650 goto retry_lock; 651 } 652 653 if (status != DLM_NORMAL) { 654 lock->lksb->flags &= ~DLM_LKSB_GET_LVB; 655 if (status != DLM_NOTQUEUED) 656 dlm_error(status); 657 goto error; 658 } 659 } 660 661 error: 662 if (status != DLM_NORMAL) { 663 if (lock && !convert) 664 dlm_lock_put(lock); 665 // this is kind of unnecessary 666 lksb->status = status; 667 } 668 669 /* put lockres ref from the convert path 670 * or from dlm_get_lock_resource */ 671 if (res) 672 dlm_lockres_put(res); 673 674 return status; 675 } 676 EXPORT_SYMBOL_GPL(dlmlock); 677