1 /* -*- mode: c; c-basic-offset: 8; -*- 2 * vim: noexpandtab sw=8 ts=8 sts=0: 3 * 4 * dlmthread.c 5 * 6 * standalone DLM module 7 * 8 * Copyright (C) 2004 Oracle. All rights reserved. 9 * 10 * This program is free software; you can redistribute it and/or 11 * modify it under the terms of the GNU General Public 12 * License as published by the Free Software Foundation; either 13 * version 2 of the License, or (at your option) any later version. 14 * 15 * This program is distributed in the hope that it will be useful, 16 * but WITHOUT ANY WARRANTY; without even the implied warranty of 17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 18 * General Public License for more details. 19 * 20 * You should have received a copy of the GNU General Public 21 * License along with this program; if not, write to the 22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 23 * Boston, MA 021110-1307, USA. 24 * 25 */ 26 27 28 #include <linux/module.h> 29 #include <linux/fs.h> 30 #include <linux/types.h> 31 #include <linux/highmem.h> 32 #include <linux/init.h> 33 #include <linux/sysctl.h> 34 #include <linux/random.h> 35 #include <linux/blkdev.h> 36 #include <linux/socket.h> 37 #include <linux/inet.h> 38 #include <linux/timer.h> 39 #include <linux/kthread.h> 40 #include <linux/delay.h> 41 42 43 #include "cluster/heartbeat.h" 44 #include "cluster/nodemanager.h" 45 #include "cluster/tcp.h" 46 47 #include "dlmapi.h" 48 #include "dlmcommon.h" 49 #include "dlmdomain.h" 50 51 #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_THREAD) 52 #include "cluster/masklog.h" 53 54 static int dlm_thread(void *data); 55 static void dlm_flush_asts(struct dlm_ctxt *dlm); 56 57 #define dlm_lock_is_remote(dlm, lock) ((lock)->ml.node != (dlm)->node_num) 58 59 /* will exit holding res->spinlock, but may drop in function */ 60 /* waits until flags are cleared on res->state */ 61 void __dlm_wait_on_lockres_flags(struct dlm_lock_resource *res, int flags) 62 { 63 DECLARE_WAITQUEUE(wait, current); 64 65 assert_spin_locked(&res->spinlock); 66 67 add_wait_queue(&res->wq, &wait); 68 repeat: 69 set_current_state(TASK_UNINTERRUPTIBLE); 70 if (res->state & flags) { 71 spin_unlock(&res->spinlock); 72 schedule(); 73 spin_lock(&res->spinlock); 74 goto repeat; 75 } 76 remove_wait_queue(&res->wq, &wait); 77 __set_current_state(TASK_RUNNING); 78 } 79 80 int __dlm_lockres_has_locks(struct dlm_lock_resource *res) 81 { 82 if (list_empty(&res->granted) && 83 list_empty(&res->converting) && 84 list_empty(&res->blocked)) 85 return 0; 86 return 1; 87 } 88 89 /* "unused": the lockres has no locks, is not on the dirty list, 90 * has no inflight locks (in the gap between mastery and acquiring 91 * the first lock), and has no bits in its refmap. 92 * truly ready to be freed. */ 93 int __dlm_lockres_unused(struct dlm_lock_resource *res) 94 { 95 int bit; 96 97 assert_spin_locked(&res->spinlock); 98 99 if (__dlm_lockres_has_locks(res)) 100 return 0; 101 102 /* Locks are in the process of being created */ 103 if (res->inflight_locks) 104 return 0; 105 106 if (!list_empty(&res->dirty) || res->state & DLM_LOCK_RES_DIRTY) 107 return 0; 108 109 if (res->state & DLM_LOCK_RES_RECOVERING) 110 return 0; 111 112 /* Another node has this resource with this node as the master */ 113 bit = find_next_bit(res->refmap, O2NM_MAX_NODES, 0); 114 if (bit < O2NM_MAX_NODES) 115 return 0; 116 117 return 1; 118 } 119 120 121 /* Call whenever you may have added or deleted something from one of 122 * the lockres queue's. This will figure out whether it belongs on the 123 * unused list or not and does the appropriate thing. */ 124 void __dlm_lockres_calc_usage(struct dlm_ctxt *dlm, 125 struct dlm_lock_resource *res) 126 { 127 assert_spin_locked(&dlm->spinlock); 128 assert_spin_locked(&res->spinlock); 129 130 if (__dlm_lockres_unused(res)){ 131 if (list_empty(&res->purge)) { 132 mlog(0, "%s: Adding res %.*s to purge list\n", 133 dlm->name, res->lockname.len, res->lockname.name); 134 135 res->last_used = jiffies; 136 dlm_lockres_get(res); 137 list_add_tail(&res->purge, &dlm->purge_list); 138 dlm->purge_count++; 139 } 140 } else if (!list_empty(&res->purge)) { 141 mlog(0, "%s: Removing res %.*s from purge list\n", 142 dlm->name, res->lockname.len, res->lockname.name); 143 144 list_del_init(&res->purge); 145 dlm_lockres_put(res); 146 dlm->purge_count--; 147 } 148 } 149 150 void dlm_lockres_calc_usage(struct dlm_ctxt *dlm, 151 struct dlm_lock_resource *res) 152 { 153 spin_lock(&dlm->spinlock); 154 spin_lock(&res->spinlock); 155 156 __dlm_lockres_calc_usage(dlm, res); 157 158 spin_unlock(&res->spinlock); 159 spin_unlock(&dlm->spinlock); 160 } 161 162 static void dlm_purge_lockres(struct dlm_ctxt *dlm, 163 struct dlm_lock_resource *res) 164 { 165 int master; 166 int ret = 0; 167 168 assert_spin_locked(&dlm->spinlock); 169 assert_spin_locked(&res->spinlock); 170 171 master = (res->owner == dlm->node_num); 172 173 mlog(0, "%s: Purging res %.*s, master %d\n", dlm->name, 174 res->lockname.len, res->lockname.name, master); 175 176 if (!master) { 177 res->state |= DLM_LOCK_RES_DROPPING_REF; 178 /* drop spinlock... retake below */ 179 spin_unlock(&res->spinlock); 180 spin_unlock(&dlm->spinlock); 181 182 spin_lock(&res->spinlock); 183 /* This ensures that clear refmap is sent after the set */ 184 __dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG); 185 spin_unlock(&res->spinlock); 186 187 /* clear our bit from the master's refmap, ignore errors */ 188 ret = dlm_drop_lockres_ref(dlm, res); 189 if (ret < 0) { 190 if (!dlm_is_host_down(ret)) 191 BUG(); 192 } 193 spin_lock(&dlm->spinlock); 194 spin_lock(&res->spinlock); 195 } 196 197 if (!list_empty(&res->purge)) { 198 mlog(0, "%s: Removing res %.*s from purgelist, master %d\n", 199 dlm->name, res->lockname.len, res->lockname.name, master); 200 list_del_init(&res->purge); 201 dlm_lockres_put(res); 202 dlm->purge_count--; 203 } 204 205 if (!__dlm_lockres_unused(res)) { 206 mlog(ML_ERROR, "%s: res %.*s in use after deref\n", 207 dlm->name, res->lockname.len, res->lockname.name); 208 __dlm_print_one_lock_resource(res); 209 BUG(); 210 } 211 212 __dlm_unhash_lockres(dlm, res); 213 214 /* lockres is not in the hash now. drop the flag and wake up 215 * any processes waiting in dlm_get_lock_resource. */ 216 if (!master) { 217 res->state &= ~DLM_LOCK_RES_DROPPING_REF; 218 spin_unlock(&res->spinlock); 219 wake_up(&res->wq); 220 } else 221 spin_unlock(&res->spinlock); 222 } 223 224 static void dlm_run_purge_list(struct dlm_ctxt *dlm, 225 int purge_now) 226 { 227 unsigned int run_max, unused; 228 unsigned long purge_jiffies; 229 struct dlm_lock_resource *lockres; 230 231 spin_lock(&dlm->spinlock); 232 run_max = dlm->purge_count; 233 234 while(run_max && !list_empty(&dlm->purge_list)) { 235 run_max--; 236 237 lockres = list_entry(dlm->purge_list.next, 238 struct dlm_lock_resource, purge); 239 240 spin_lock(&lockres->spinlock); 241 242 purge_jiffies = lockres->last_used + 243 msecs_to_jiffies(DLM_PURGE_INTERVAL_MS); 244 245 /* Make sure that we want to be processing this guy at 246 * this time. */ 247 if (!purge_now && time_after(purge_jiffies, jiffies)) { 248 /* Since resources are added to the purge list 249 * in tail order, we can stop at the first 250 * unpurgable resource -- anyone added after 251 * him will have a greater last_used value */ 252 spin_unlock(&lockres->spinlock); 253 break; 254 } 255 256 /* Status of the lockres *might* change so double 257 * check. If the lockres is unused, holding the dlm 258 * spinlock will prevent people from getting and more 259 * refs on it. */ 260 unused = __dlm_lockres_unused(lockres); 261 if (!unused || 262 (lockres->state & DLM_LOCK_RES_MIGRATING)) { 263 mlog(0, "%s: res %.*s is in use or being remastered, " 264 "used %d, state %d\n", dlm->name, 265 lockres->lockname.len, lockres->lockname.name, 266 !unused, lockres->state); 267 list_move_tail(&dlm->purge_list, &lockres->purge); 268 spin_unlock(&lockres->spinlock); 269 continue; 270 } 271 272 dlm_lockres_get(lockres); 273 274 dlm_purge_lockres(dlm, lockres); 275 276 dlm_lockres_put(lockres); 277 278 /* Avoid adding any scheduling latencies */ 279 cond_resched_lock(&dlm->spinlock); 280 } 281 282 spin_unlock(&dlm->spinlock); 283 } 284 285 static void dlm_shuffle_lists(struct dlm_ctxt *dlm, 286 struct dlm_lock_resource *res) 287 { 288 struct dlm_lock *lock, *target; 289 struct list_head *iter; 290 struct list_head *head; 291 int can_grant = 1; 292 293 /* 294 * Because this function is called with the lockres 295 * spinlock, and because we know that it is not migrating/ 296 * recovering/in-progress, it is fine to reserve asts and 297 * basts right before queueing them all throughout 298 */ 299 assert_spin_locked(&dlm->ast_lock); 300 assert_spin_locked(&res->spinlock); 301 BUG_ON((res->state & (DLM_LOCK_RES_MIGRATING| 302 DLM_LOCK_RES_RECOVERING| 303 DLM_LOCK_RES_IN_PROGRESS))); 304 305 converting: 306 if (list_empty(&res->converting)) 307 goto blocked; 308 mlog(0, "%s: res %.*s has locks on the convert queue\n", dlm->name, 309 res->lockname.len, res->lockname.name); 310 311 target = list_entry(res->converting.next, struct dlm_lock, list); 312 if (target->ml.convert_type == LKM_IVMODE) { 313 mlog(ML_ERROR, "%s: res %.*s converting lock to invalid mode\n", 314 dlm->name, res->lockname.len, res->lockname.name); 315 BUG(); 316 } 317 head = &res->granted; 318 list_for_each(iter, head) { 319 lock = list_entry(iter, struct dlm_lock, list); 320 if (lock==target) 321 continue; 322 if (!dlm_lock_compatible(lock->ml.type, 323 target->ml.convert_type)) { 324 can_grant = 0; 325 /* queue the BAST if not already */ 326 if (lock->ml.highest_blocked == LKM_IVMODE) { 327 __dlm_lockres_reserve_ast(res); 328 __dlm_queue_bast(dlm, lock); 329 } 330 /* update the highest_blocked if needed */ 331 if (lock->ml.highest_blocked < target->ml.convert_type) 332 lock->ml.highest_blocked = 333 target->ml.convert_type; 334 } 335 } 336 head = &res->converting; 337 list_for_each(iter, head) { 338 lock = list_entry(iter, struct dlm_lock, list); 339 if (lock==target) 340 continue; 341 if (!dlm_lock_compatible(lock->ml.type, 342 target->ml.convert_type)) { 343 can_grant = 0; 344 if (lock->ml.highest_blocked == LKM_IVMODE) { 345 __dlm_lockres_reserve_ast(res); 346 __dlm_queue_bast(dlm, lock); 347 } 348 if (lock->ml.highest_blocked < target->ml.convert_type) 349 lock->ml.highest_blocked = 350 target->ml.convert_type; 351 } 352 } 353 354 /* we can convert the lock */ 355 if (can_grant) { 356 spin_lock(&target->spinlock); 357 BUG_ON(target->ml.highest_blocked != LKM_IVMODE); 358 359 mlog(0, "%s: res %.*s, AST for Converting lock %u:%llu, type " 360 "%d => %d, node %u\n", dlm->name, res->lockname.len, 361 res->lockname.name, 362 dlm_get_lock_cookie_node(be64_to_cpu(target->ml.cookie)), 363 dlm_get_lock_cookie_seq(be64_to_cpu(target->ml.cookie)), 364 target->ml.type, 365 target->ml.convert_type, target->ml.node); 366 367 target->ml.type = target->ml.convert_type; 368 target->ml.convert_type = LKM_IVMODE; 369 list_move_tail(&target->list, &res->granted); 370 371 BUG_ON(!target->lksb); 372 target->lksb->status = DLM_NORMAL; 373 374 spin_unlock(&target->spinlock); 375 376 __dlm_lockres_reserve_ast(res); 377 __dlm_queue_ast(dlm, target); 378 /* go back and check for more */ 379 goto converting; 380 } 381 382 blocked: 383 if (list_empty(&res->blocked)) 384 goto leave; 385 target = list_entry(res->blocked.next, struct dlm_lock, list); 386 387 head = &res->granted; 388 list_for_each(iter, head) { 389 lock = list_entry(iter, struct dlm_lock, list); 390 if (lock==target) 391 continue; 392 if (!dlm_lock_compatible(lock->ml.type, target->ml.type)) { 393 can_grant = 0; 394 if (lock->ml.highest_blocked == LKM_IVMODE) { 395 __dlm_lockres_reserve_ast(res); 396 __dlm_queue_bast(dlm, lock); 397 } 398 if (lock->ml.highest_blocked < target->ml.type) 399 lock->ml.highest_blocked = target->ml.type; 400 } 401 } 402 403 head = &res->converting; 404 list_for_each(iter, head) { 405 lock = list_entry(iter, struct dlm_lock, list); 406 if (lock==target) 407 continue; 408 if (!dlm_lock_compatible(lock->ml.type, target->ml.type)) { 409 can_grant = 0; 410 if (lock->ml.highest_blocked == LKM_IVMODE) { 411 __dlm_lockres_reserve_ast(res); 412 __dlm_queue_bast(dlm, lock); 413 } 414 if (lock->ml.highest_blocked < target->ml.type) 415 lock->ml.highest_blocked = target->ml.type; 416 } 417 } 418 419 /* we can grant the blocked lock (only 420 * possible if converting list empty) */ 421 if (can_grant) { 422 spin_lock(&target->spinlock); 423 BUG_ON(target->ml.highest_blocked != LKM_IVMODE); 424 425 mlog(0, "%s: res %.*s, AST for Blocked lock %u:%llu, type %d, " 426 "node %u\n", dlm->name, res->lockname.len, 427 res->lockname.name, 428 dlm_get_lock_cookie_node(be64_to_cpu(target->ml.cookie)), 429 dlm_get_lock_cookie_seq(be64_to_cpu(target->ml.cookie)), 430 target->ml.type, target->ml.node); 431 432 /* target->ml.type is already correct */ 433 list_move_tail(&target->list, &res->granted); 434 435 BUG_ON(!target->lksb); 436 target->lksb->status = DLM_NORMAL; 437 438 spin_unlock(&target->spinlock); 439 440 __dlm_lockres_reserve_ast(res); 441 __dlm_queue_ast(dlm, target); 442 /* go back and check for more */ 443 goto converting; 444 } 445 446 leave: 447 return; 448 } 449 450 /* must have NO locks when calling this with res !=NULL * */ 451 void dlm_kick_thread(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) 452 { 453 if (res) { 454 spin_lock(&dlm->spinlock); 455 spin_lock(&res->spinlock); 456 __dlm_dirty_lockres(dlm, res); 457 spin_unlock(&res->spinlock); 458 spin_unlock(&dlm->spinlock); 459 } 460 wake_up(&dlm->dlm_thread_wq); 461 } 462 463 void __dlm_dirty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) 464 { 465 assert_spin_locked(&dlm->spinlock); 466 assert_spin_locked(&res->spinlock); 467 468 /* don't shuffle secondary queues */ 469 if ((res->owner == dlm->node_num)) { 470 if (res->state & (DLM_LOCK_RES_MIGRATING | 471 DLM_LOCK_RES_BLOCK_DIRTY)) 472 return; 473 474 if (list_empty(&res->dirty)) { 475 /* ref for dirty_list */ 476 dlm_lockres_get(res); 477 list_add_tail(&res->dirty, &dlm->dirty_list); 478 res->state |= DLM_LOCK_RES_DIRTY; 479 } 480 } 481 482 mlog(0, "%s: res %.*s\n", dlm->name, res->lockname.len, 483 res->lockname.name); 484 } 485 486 487 /* Launch the NM thread for the mounted volume */ 488 int dlm_launch_thread(struct dlm_ctxt *dlm) 489 { 490 mlog(0, "Starting dlm_thread...\n"); 491 492 dlm->dlm_thread_task = kthread_run(dlm_thread, dlm, "dlm_thread"); 493 if (IS_ERR(dlm->dlm_thread_task)) { 494 mlog_errno(PTR_ERR(dlm->dlm_thread_task)); 495 dlm->dlm_thread_task = NULL; 496 return -EINVAL; 497 } 498 499 return 0; 500 } 501 502 void dlm_complete_thread(struct dlm_ctxt *dlm) 503 { 504 if (dlm->dlm_thread_task) { 505 mlog(ML_KTHREAD, "Waiting for dlm thread to exit\n"); 506 kthread_stop(dlm->dlm_thread_task); 507 dlm->dlm_thread_task = NULL; 508 } 509 } 510 511 static int dlm_dirty_list_empty(struct dlm_ctxt *dlm) 512 { 513 int empty; 514 515 spin_lock(&dlm->spinlock); 516 empty = list_empty(&dlm->dirty_list); 517 spin_unlock(&dlm->spinlock); 518 519 return empty; 520 } 521 522 static void dlm_flush_asts(struct dlm_ctxt *dlm) 523 { 524 int ret; 525 struct dlm_lock *lock; 526 struct dlm_lock_resource *res; 527 u8 hi; 528 529 spin_lock(&dlm->ast_lock); 530 while (!list_empty(&dlm->pending_asts)) { 531 lock = list_entry(dlm->pending_asts.next, 532 struct dlm_lock, ast_list); 533 /* get an extra ref on lock */ 534 dlm_lock_get(lock); 535 res = lock->lockres; 536 mlog(0, "%s: res %.*s, Flush AST for lock %u:%llu, type %d, " 537 "node %u\n", dlm->name, res->lockname.len, 538 res->lockname.name, 539 dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)), 540 dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)), 541 lock->ml.type, lock->ml.node); 542 543 BUG_ON(!lock->ast_pending); 544 545 /* remove from list (including ref) */ 546 list_del_init(&lock->ast_list); 547 dlm_lock_put(lock); 548 spin_unlock(&dlm->ast_lock); 549 550 if (lock->ml.node != dlm->node_num) { 551 ret = dlm_do_remote_ast(dlm, res, lock); 552 if (ret < 0) 553 mlog_errno(ret); 554 } else 555 dlm_do_local_ast(dlm, res, lock); 556 557 spin_lock(&dlm->ast_lock); 558 559 /* possible that another ast was queued while 560 * we were delivering the last one */ 561 if (!list_empty(&lock->ast_list)) { 562 mlog(0, "%s: res %.*s, AST queued while flushing last " 563 "one\n", dlm->name, res->lockname.len, 564 res->lockname.name); 565 } else 566 lock->ast_pending = 0; 567 568 /* drop the extra ref. 569 * this may drop it completely. */ 570 dlm_lock_put(lock); 571 dlm_lockres_release_ast(dlm, res); 572 } 573 574 while (!list_empty(&dlm->pending_basts)) { 575 lock = list_entry(dlm->pending_basts.next, 576 struct dlm_lock, bast_list); 577 /* get an extra ref on lock */ 578 dlm_lock_get(lock); 579 res = lock->lockres; 580 581 BUG_ON(!lock->bast_pending); 582 583 /* get the highest blocked lock, and reset */ 584 spin_lock(&lock->spinlock); 585 BUG_ON(lock->ml.highest_blocked <= LKM_IVMODE); 586 hi = lock->ml.highest_blocked; 587 lock->ml.highest_blocked = LKM_IVMODE; 588 spin_unlock(&lock->spinlock); 589 590 /* remove from list (including ref) */ 591 list_del_init(&lock->bast_list); 592 dlm_lock_put(lock); 593 spin_unlock(&dlm->ast_lock); 594 595 mlog(0, "%s: res %.*s, Flush BAST for lock %u:%llu, " 596 "blocked %d, node %u\n", 597 dlm->name, res->lockname.len, res->lockname.name, 598 dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)), 599 dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)), 600 hi, lock->ml.node); 601 602 if (lock->ml.node != dlm->node_num) { 603 ret = dlm_send_proxy_bast(dlm, res, lock, hi); 604 if (ret < 0) 605 mlog_errno(ret); 606 } else 607 dlm_do_local_bast(dlm, res, lock, hi); 608 609 spin_lock(&dlm->ast_lock); 610 611 /* possible that another bast was queued while 612 * we were delivering the last one */ 613 if (!list_empty(&lock->bast_list)) { 614 mlog(0, "%s: res %.*s, BAST queued while flushing last " 615 "one\n", dlm->name, res->lockname.len, 616 res->lockname.name); 617 } else 618 lock->bast_pending = 0; 619 620 /* drop the extra ref. 621 * this may drop it completely. */ 622 dlm_lock_put(lock); 623 dlm_lockres_release_ast(dlm, res); 624 } 625 wake_up(&dlm->ast_wq); 626 spin_unlock(&dlm->ast_lock); 627 } 628 629 630 #define DLM_THREAD_TIMEOUT_MS (4 * 1000) 631 #define DLM_THREAD_MAX_DIRTY 100 632 #define DLM_THREAD_MAX_ASTS 10 633 634 static int dlm_thread(void *data) 635 { 636 struct dlm_lock_resource *res; 637 struct dlm_ctxt *dlm = data; 638 unsigned long timeout = msecs_to_jiffies(DLM_THREAD_TIMEOUT_MS); 639 640 mlog(0, "dlm thread running for %s...\n", dlm->name); 641 642 while (!kthread_should_stop()) { 643 int n = DLM_THREAD_MAX_DIRTY; 644 645 /* dlm_shutting_down is very point-in-time, but that 646 * doesn't matter as we'll just loop back around if we 647 * get false on the leading edge of a state 648 * transition. */ 649 dlm_run_purge_list(dlm, dlm_shutting_down(dlm)); 650 651 /* We really don't want to hold dlm->spinlock while 652 * calling dlm_shuffle_lists on each lockres that 653 * needs to have its queues adjusted and AST/BASTs 654 * run. So let's pull each entry off the dirty_list 655 * and drop dlm->spinlock ASAP. Once off the list, 656 * res->spinlock needs to be taken again to protect 657 * the queues while calling dlm_shuffle_lists. */ 658 spin_lock(&dlm->spinlock); 659 while (!list_empty(&dlm->dirty_list)) { 660 int delay = 0; 661 res = list_entry(dlm->dirty_list.next, 662 struct dlm_lock_resource, dirty); 663 664 /* peel a lockres off, remove it from the list, 665 * unset the dirty flag and drop the dlm lock */ 666 BUG_ON(!res); 667 dlm_lockres_get(res); 668 669 spin_lock(&res->spinlock); 670 /* We clear the DLM_LOCK_RES_DIRTY state once we shuffle lists below */ 671 list_del_init(&res->dirty); 672 spin_unlock(&res->spinlock); 673 spin_unlock(&dlm->spinlock); 674 /* Drop dirty_list ref */ 675 dlm_lockres_put(res); 676 677 /* lockres can be re-dirtied/re-added to the 678 * dirty_list in this gap, but that is ok */ 679 680 spin_lock(&dlm->ast_lock); 681 spin_lock(&res->spinlock); 682 if (res->owner != dlm->node_num) { 683 __dlm_print_one_lock_resource(res); 684 mlog(ML_ERROR, "%s: inprog %d, mig %d, reco %d," 685 " dirty %d\n", dlm->name, 686 !!(res->state & DLM_LOCK_RES_IN_PROGRESS), 687 !!(res->state & DLM_LOCK_RES_MIGRATING), 688 !!(res->state & DLM_LOCK_RES_RECOVERING), 689 !!(res->state & DLM_LOCK_RES_DIRTY)); 690 } 691 BUG_ON(res->owner != dlm->node_num); 692 693 /* it is now ok to move lockreses in these states 694 * to the dirty list, assuming that they will only be 695 * dirty for a short while. */ 696 BUG_ON(res->state & DLM_LOCK_RES_MIGRATING); 697 if (res->state & (DLM_LOCK_RES_IN_PROGRESS | 698 DLM_LOCK_RES_RECOVERING)) { 699 /* move it to the tail and keep going */ 700 res->state &= ~DLM_LOCK_RES_DIRTY; 701 spin_unlock(&res->spinlock); 702 spin_unlock(&dlm->ast_lock); 703 mlog(0, "%s: res %.*s, inprogress, delay list " 704 "shuffle, state %d\n", dlm->name, 705 res->lockname.len, res->lockname.name, 706 res->state); 707 delay = 1; 708 goto in_progress; 709 } 710 711 /* at this point the lockres is not migrating/ 712 * recovering/in-progress. we have the lockres 713 * spinlock and do NOT have the dlm lock. 714 * safe to reserve/queue asts and run the lists. */ 715 716 /* called while holding lockres lock */ 717 dlm_shuffle_lists(dlm, res); 718 res->state &= ~DLM_LOCK_RES_DIRTY; 719 spin_unlock(&res->spinlock); 720 spin_unlock(&dlm->ast_lock); 721 722 dlm_lockres_calc_usage(dlm, res); 723 724 in_progress: 725 726 spin_lock(&dlm->spinlock); 727 /* if the lock was in-progress, stick 728 * it on the back of the list */ 729 if (delay) { 730 spin_lock(&res->spinlock); 731 __dlm_dirty_lockres(dlm, res); 732 spin_unlock(&res->spinlock); 733 } 734 dlm_lockres_put(res); 735 736 /* unlikely, but we may need to give time to 737 * other tasks */ 738 if (!--n) { 739 mlog(0, "%s: Throttling dlm thread\n", 740 dlm->name); 741 break; 742 } 743 } 744 745 spin_unlock(&dlm->spinlock); 746 dlm_flush_asts(dlm); 747 748 /* yield and continue right away if there is more work to do */ 749 if (!n) { 750 cond_resched(); 751 continue; 752 } 753 754 wait_event_interruptible_timeout(dlm->dlm_thread_wq, 755 !dlm_dirty_list_empty(dlm) || 756 kthread_should_stop(), 757 timeout); 758 } 759 760 mlog(0, "quitting DLM thread\n"); 761 return 0; 762 } 763