1 /* -*- mode: c; c-basic-offset: 8; -*- 2 * vim: noexpandtab sw=8 ts=8 sts=0: 3 * 4 * dlmthread.c 5 * 6 * standalone DLM module 7 * 8 * Copyright (C) 2004 Oracle. All rights reserved. 9 * 10 * This program is free software; you can redistribute it and/or 11 * modify it under the terms of the GNU General Public 12 * License as published by the Free Software Foundation; either 13 * version 2 of the License, or (at your option) any later version. 14 * 15 * This program is distributed in the hope that it will be useful, 16 * but WITHOUT ANY WARRANTY; without even the implied warranty of 17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 18 * General Public License for more details. 19 * 20 * You should have received a copy of the GNU General Public 21 * License along with this program; if not, write to the 22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 23 * Boston, MA 021110-1307, USA. 24 * 25 */ 26 27 28 #include <linux/module.h> 29 #include <linux/fs.h> 30 #include <linux/types.h> 31 #include <linux/highmem.h> 32 #include <linux/init.h> 33 #include <linux/sysctl.h> 34 #include <linux/random.h> 35 #include <linux/blkdev.h> 36 #include <linux/socket.h> 37 #include <linux/inet.h> 38 #include <linux/timer.h> 39 #include <linux/kthread.h> 40 #include <linux/delay.h> 41 42 43 #include "cluster/heartbeat.h" 44 #include "cluster/nodemanager.h" 45 #include "cluster/tcp.h" 46 47 #include "dlmapi.h" 48 #include "dlmcommon.h" 49 #include "dlmdomain.h" 50 51 #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_THREAD) 52 #include "cluster/masklog.h" 53 54 static int dlm_thread(void *data); 55 static void dlm_flush_asts(struct dlm_ctxt *dlm); 56 57 #define dlm_lock_is_remote(dlm, lock) ((lock)->ml.node != (dlm)->node_num) 58 59 /* will exit holding res->spinlock, but may drop in function */ 60 /* waits until flags are cleared on res->state */ 61 void __dlm_wait_on_lockres_flags(struct dlm_lock_resource *res, int flags) 62 { 63 DECLARE_WAITQUEUE(wait, current); 64 65 assert_spin_locked(&res->spinlock); 66 67 add_wait_queue(&res->wq, &wait); 68 repeat: 69 set_current_state(TASK_UNINTERRUPTIBLE); 70 if (res->state & flags) { 71 spin_unlock(&res->spinlock); 72 schedule(); 73 spin_lock(&res->spinlock); 74 goto repeat; 75 } 76 remove_wait_queue(&res->wq, &wait); 77 __set_current_state(TASK_RUNNING); 78 } 79 80 int __dlm_lockres_has_locks(struct dlm_lock_resource *res) 81 { 82 if (list_empty(&res->granted) && 83 list_empty(&res->converting) && 84 list_empty(&res->blocked)) 85 return 0; 86 return 1; 87 } 88 89 /* "unused": the lockres has no locks, is not on the dirty list, 90 * has no inflight locks (in the gap between mastery and acquiring 91 * the first lock), and has no bits in its refmap. 92 * truly ready to be freed. */ 93 int __dlm_lockres_unused(struct dlm_lock_resource *res) 94 { 95 int bit; 96 97 assert_spin_locked(&res->spinlock); 98 99 if (__dlm_lockres_has_locks(res)) 100 return 0; 101 102 /* Locks are in the process of being created */ 103 if (res->inflight_locks) 104 return 0; 105 106 if (!list_empty(&res->dirty) || res->state & DLM_LOCK_RES_DIRTY) 107 return 0; 108 109 if (res->state & DLM_LOCK_RES_RECOVERING) 110 return 0; 111 112 /* Another node has this resource with this node as the master */ 113 bit = find_next_bit(res->refmap, O2NM_MAX_NODES, 0); 114 if (bit < O2NM_MAX_NODES) 115 return 0; 116 117 return 1; 118 } 119 120 121 /* Call whenever you may have added or deleted something from one of 122 * the lockres queue's. This will figure out whether it belongs on the 123 * unused list or not and does the appropriate thing. */ 124 void __dlm_lockres_calc_usage(struct dlm_ctxt *dlm, 125 struct dlm_lock_resource *res) 126 { 127 assert_spin_locked(&dlm->spinlock); 128 assert_spin_locked(&res->spinlock); 129 130 if (__dlm_lockres_unused(res)){ 131 if (list_empty(&res->purge)) { 132 mlog(0, "%s: Adding res %.*s to purge list\n", 133 dlm->name, res->lockname.len, res->lockname.name); 134 135 res->last_used = jiffies; 136 dlm_lockres_get(res); 137 list_add_tail(&res->purge, &dlm->purge_list); 138 dlm->purge_count++; 139 } 140 } else if (!list_empty(&res->purge)) { 141 mlog(0, "%s: Removing res %.*s from purge list\n", 142 dlm->name, res->lockname.len, res->lockname.name); 143 144 list_del_init(&res->purge); 145 dlm_lockres_put(res); 146 dlm->purge_count--; 147 } 148 } 149 150 void dlm_lockres_calc_usage(struct dlm_ctxt *dlm, 151 struct dlm_lock_resource *res) 152 { 153 spin_lock(&dlm->spinlock); 154 spin_lock(&res->spinlock); 155 156 __dlm_lockres_calc_usage(dlm, res); 157 158 spin_unlock(&res->spinlock); 159 spin_unlock(&dlm->spinlock); 160 } 161 162 static void dlm_purge_lockres(struct dlm_ctxt *dlm, 163 struct dlm_lock_resource *res) 164 { 165 int master; 166 int ret = 0; 167 168 assert_spin_locked(&dlm->spinlock); 169 assert_spin_locked(&res->spinlock); 170 171 master = (res->owner == dlm->node_num); 172 173 mlog(0, "%s: Purging res %.*s, master %d\n", dlm->name, 174 res->lockname.len, res->lockname.name, master); 175 176 if (!master) { 177 res->state |= DLM_LOCK_RES_DROPPING_REF; 178 /* drop spinlock... retake below */ 179 spin_unlock(&res->spinlock); 180 spin_unlock(&dlm->spinlock); 181 182 spin_lock(&res->spinlock); 183 /* This ensures that clear refmap is sent after the set */ 184 __dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG); 185 spin_unlock(&res->spinlock); 186 187 /* clear our bit from the master's refmap, ignore errors */ 188 ret = dlm_drop_lockres_ref(dlm, res); 189 if (ret < 0) { 190 if (!dlm_is_host_down(ret)) 191 BUG(); 192 } 193 spin_lock(&dlm->spinlock); 194 spin_lock(&res->spinlock); 195 } 196 197 if (!list_empty(&res->purge)) { 198 mlog(0, "%s: Removing res %.*s from purgelist, master %d\n", 199 dlm->name, res->lockname.len, res->lockname.name, master); 200 list_del_init(&res->purge); 201 dlm_lockres_put(res); 202 dlm->purge_count--; 203 } 204 205 if (!__dlm_lockres_unused(res)) { 206 mlog(ML_ERROR, "%s: res %.*s in use after deref\n", 207 dlm->name, res->lockname.len, res->lockname.name); 208 __dlm_print_one_lock_resource(res); 209 BUG(); 210 } 211 212 __dlm_unhash_lockres(dlm, res); 213 214 /* lockres is not in the hash now. drop the flag and wake up 215 * any processes waiting in dlm_get_lock_resource. */ 216 if (!master) { 217 res->state &= ~DLM_LOCK_RES_DROPPING_REF; 218 spin_unlock(&res->spinlock); 219 wake_up(&res->wq); 220 } else 221 spin_unlock(&res->spinlock); 222 } 223 224 static void dlm_run_purge_list(struct dlm_ctxt *dlm, 225 int purge_now) 226 { 227 unsigned int run_max, unused; 228 unsigned long purge_jiffies; 229 struct dlm_lock_resource *lockres; 230 231 spin_lock(&dlm->spinlock); 232 run_max = dlm->purge_count; 233 234 while(run_max && !list_empty(&dlm->purge_list)) { 235 run_max--; 236 237 lockres = list_entry(dlm->purge_list.next, 238 struct dlm_lock_resource, purge); 239 240 spin_lock(&lockres->spinlock); 241 242 purge_jiffies = lockres->last_used + 243 msecs_to_jiffies(DLM_PURGE_INTERVAL_MS); 244 245 /* Make sure that we want to be processing this guy at 246 * this time. */ 247 if (!purge_now && time_after(purge_jiffies, jiffies)) { 248 /* Since resources are added to the purge list 249 * in tail order, we can stop at the first 250 * unpurgable resource -- anyone added after 251 * him will have a greater last_used value */ 252 spin_unlock(&lockres->spinlock); 253 break; 254 } 255 256 /* Status of the lockres *might* change so double 257 * check. If the lockres is unused, holding the dlm 258 * spinlock will prevent people from getting and more 259 * refs on it. */ 260 unused = __dlm_lockres_unused(lockres); 261 if (!unused || 262 (lockres->state & DLM_LOCK_RES_MIGRATING) || 263 (lockres->inflight_assert_workers != 0)) { 264 mlog(0, "%s: res %.*s is in use or being remastered, " 265 "used %d, state %d, assert master workers %u\n", 266 dlm->name, lockres->lockname.len, 267 lockres->lockname.name, 268 !unused, lockres->state, 269 lockres->inflight_assert_workers); 270 list_move_tail(&lockres->purge, &dlm->purge_list); 271 spin_unlock(&lockres->spinlock); 272 continue; 273 } 274 275 dlm_lockres_get(lockres); 276 277 dlm_purge_lockres(dlm, lockres); 278 279 dlm_lockres_put(lockres); 280 281 /* Avoid adding any scheduling latencies */ 282 cond_resched_lock(&dlm->spinlock); 283 } 284 285 spin_unlock(&dlm->spinlock); 286 } 287 288 static void dlm_shuffle_lists(struct dlm_ctxt *dlm, 289 struct dlm_lock_resource *res) 290 { 291 struct dlm_lock *lock, *target; 292 int can_grant = 1; 293 294 /* 295 * Because this function is called with the lockres 296 * spinlock, and because we know that it is not migrating/ 297 * recovering/in-progress, it is fine to reserve asts and 298 * basts right before queueing them all throughout 299 */ 300 assert_spin_locked(&dlm->ast_lock); 301 assert_spin_locked(&res->spinlock); 302 BUG_ON((res->state & (DLM_LOCK_RES_MIGRATING| 303 DLM_LOCK_RES_RECOVERING| 304 DLM_LOCK_RES_IN_PROGRESS))); 305 306 converting: 307 if (list_empty(&res->converting)) 308 goto blocked; 309 mlog(0, "%s: res %.*s has locks on the convert queue\n", dlm->name, 310 res->lockname.len, res->lockname.name); 311 312 target = list_entry(res->converting.next, struct dlm_lock, list); 313 if (target->ml.convert_type == LKM_IVMODE) { 314 mlog(ML_ERROR, "%s: res %.*s converting lock to invalid mode\n", 315 dlm->name, res->lockname.len, res->lockname.name); 316 BUG(); 317 } 318 list_for_each_entry(lock, &res->granted, list) { 319 if (lock==target) 320 continue; 321 if (!dlm_lock_compatible(lock->ml.type, 322 target->ml.convert_type)) { 323 can_grant = 0; 324 /* queue the BAST if not already */ 325 if (lock->ml.highest_blocked == LKM_IVMODE) { 326 __dlm_lockres_reserve_ast(res); 327 __dlm_queue_bast(dlm, lock); 328 } 329 /* update the highest_blocked if needed */ 330 if (lock->ml.highest_blocked < target->ml.convert_type) 331 lock->ml.highest_blocked = 332 target->ml.convert_type; 333 } 334 } 335 336 list_for_each_entry(lock, &res->converting, list) { 337 if (lock==target) 338 continue; 339 if (!dlm_lock_compatible(lock->ml.type, 340 target->ml.convert_type)) { 341 can_grant = 0; 342 if (lock->ml.highest_blocked == LKM_IVMODE) { 343 __dlm_lockres_reserve_ast(res); 344 __dlm_queue_bast(dlm, lock); 345 } 346 if (lock->ml.highest_blocked < target->ml.convert_type) 347 lock->ml.highest_blocked = 348 target->ml.convert_type; 349 } 350 } 351 352 /* we can convert the lock */ 353 if (can_grant) { 354 spin_lock(&target->spinlock); 355 BUG_ON(target->ml.highest_blocked != LKM_IVMODE); 356 357 mlog(0, "%s: res %.*s, AST for Converting lock %u:%llu, type " 358 "%d => %d, node %u\n", dlm->name, res->lockname.len, 359 res->lockname.name, 360 dlm_get_lock_cookie_node(be64_to_cpu(target->ml.cookie)), 361 dlm_get_lock_cookie_seq(be64_to_cpu(target->ml.cookie)), 362 target->ml.type, 363 target->ml.convert_type, target->ml.node); 364 365 target->ml.type = target->ml.convert_type; 366 target->ml.convert_type = LKM_IVMODE; 367 list_move_tail(&target->list, &res->granted); 368 369 BUG_ON(!target->lksb); 370 target->lksb->status = DLM_NORMAL; 371 372 spin_unlock(&target->spinlock); 373 374 __dlm_lockres_reserve_ast(res); 375 __dlm_queue_ast(dlm, target); 376 /* go back and check for more */ 377 goto converting; 378 } 379 380 blocked: 381 if (list_empty(&res->blocked)) 382 goto leave; 383 target = list_entry(res->blocked.next, struct dlm_lock, list); 384 385 list_for_each_entry(lock, &res->granted, list) { 386 if (lock==target) 387 continue; 388 if (!dlm_lock_compatible(lock->ml.type, target->ml.type)) { 389 can_grant = 0; 390 if (lock->ml.highest_blocked == LKM_IVMODE) { 391 __dlm_lockres_reserve_ast(res); 392 __dlm_queue_bast(dlm, lock); 393 } 394 if (lock->ml.highest_blocked < target->ml.type) 395 lock->ml.highest_blocked = target->ml.type; 396 } 397 } 398 399 list_for_each_entry(lock, &res->converting, list) { 400 if (lock==target) 401 continue; 402 if (!dlm_lock_compatible(lock->ml.type, target->ml.type)) { 403 can_grant = 0; 404 if (lock->ml.highest_blocked == LKM_IVMODE) { 405 __dlm_lockres_reserve_ast(res); 406 __dlm_queue_bast(dlm, lock); 407 } 408 if (lock->ml.highest_blocked < target->ml.type) 409 lock->ml.highest_blocked = target->ml.type; 410 } 411 } 412 413 /* we can grant the blocked lock (only 414 * possible if converting list empty) */ 415 if (can_grant) { 416 spin_lock(&target->spinlock); 417 BUG_ON(target->ml.highest_blocked != LKM_IVMODE); 418 419 mlog(0, "%s: res %.*s, AST for Blocked lock %u:%llu, type %d, " 420 "node %u\n", dlm->name, res->lockname.len, 421 res->lockname.name, 422 dlm_get_lock_cookie_node(be64_to_cpu(target->ml.cookie)), 423 dlm_get_lock_cookie_seq(be64_to_cpu(target->ml.cookie)), 424 target->ml.type, target->ml.node); 425 426 /* target->ml.type is already correct */ 427 list_move_tail(&target->list, &res->granted); 428 429 BUG_ON(!target->lksb); 430 target->lksb->status = DLM_NORMAL; 431 432 spin_unlock(&target->spinlock); 433 434 __dlm_lockres_reserve_ast(res); 435 __dlm_queue_ast(dlm, target); 436 /* go back and check for more */ 437 goto converting; 438 } 439 440 leave: 441 return; 442 } 443 444 /* must have NO locks when calling this with res !=NULL * */ 445 void dlm_kick_thread(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) 446 { 447 if (res) { 448 spin_lock(&dlm->spinlock); 449 spin_lock(&res->spinlock); 450 __dlm_dirty_lockres(dlm, res); 451 spin_unlock(&res->spinlock); 452 spin_unlock(&dlm->spinlock); 453 } 454 wake_up(&dlm->dlm_thread_wq); 455 } 456 457 void __dlm_dirty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) 458 { 459 assert_spin_locked(&dlm->spinlock); 460 assert_spin_locked(&res->spinlock); 461 462 /* don't shuffle secondary queues */ 463 if ((res->owner == dlm->node_num)) { 464 if (res->state & (DLM_LOCK_RES_MIGRATING | 465 DLM_LOCK_RES_BLOCK_DIRTY)) 466 return; 467 468 if (list_empty(&res->dirty)) { 469 /* ref for dirty_list */ 470 dlm_lockres_get(res); 471 list_add_tail(&res->dirty, &dlm->dirty_list); 472 res->state |= DLM_LOCK_RES_DIRTY; 473 } 474 } 475 476 mlog(0, "%s: res %.*s\n", dlm->name, res->lockname.len, 477 res->lockname.name); 478 } 479 480 481 /* Launch the NM thread for the mounted volume */ 482 int dlm_launch_thread(struct dlm_ctxt *dlm) 483 { 484 mlog(0, "Starting dlm_thread...\n"); 485 486 dlm->dlm_thread_task = kthread_run(dlm_thread, dlm, "dlm_thread"); 487 if (IS_ERR(dlm->dlm_thread_task)) { 488 mlog_errno(PTR_ERR(dlm->dlm_thread_task)); 489 dlm->dlm_thread_task = NULL; 490 return -EINVAL; 491 } 492 493 return 0; 494 } 495 496 void dlm_complete_thread(struct dlm_ctxt *dlm) 497 { 498 if (dlm->dlm_thread_task) { 499 mlog(ML_KTHREAD, "Waiting for dlm thread to exit\n"); 500 kthread_stop(dlm->dlm_thread_task); 501 dlm->dlm_thread_task = NULL; 502 } 503 } 504 505 static int dlm_dirty_list_empty(struct dlm_ctxt *dlm) 506 { 507 int empty; 508 509 spin_lock(&dlm->spinlock); 510 empty = list_empty(&dlm->dirty_list); 511 spin_unlock(&dlm->spinlock); 512 513 return empty; 514 } 515 516 static void dlm_flush_asts(struct dlm_ctxt *dlm) 517 { 518 int ret; 519 struct dlm_lock *lock; 520 struct dlm_lock_resource *res; 521 u8 hi; 522 523 spin_lock(&dlm->ast_lock); 524 while (!list_empty(&dlm->pending_asts)) { 525 lock = list_entry(dlm->pending_asts.next, 526 struct dlm_lock, ast_list); 527 /* get an extra ref on lock */ 528 dlm_lock_get(lock); 529 res = lock->lockres; 530 mlog(0, "%s: res %.*s, Flush AST for lock %u:%llu, type %d, " 531 "node %u\n", dlm->name, res->lockname.len, 532 res->lockname.name, 533 dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)), 534 dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)), 535 lock->ml.type, lock->ml.node); 536 537 BUG_ON(!lock->ast_pending); 538 539 /* remove from list (including ref) */ 540 list_del_init(&lock->ast_list); 541 dlm_lock_put(lock); 542 spin_unlock(&dlm->ast_lock); 543 544 if (lock->ml.node != dlm->node_num) { 545 ret = dlm_do_remote_ast(dlm, res, lock); 546 if (ret < 0) 547 mlog_errno(ret); 548 } else 549 dlm_do_local_ast(dlm, res, lock); 550 551 spin_lock(&dlm->ast_lock); 552 553 /* possible that another ast was queued while 554 * we were delivering the last one */ 555 if (!list_empty(&lock->ast_list)) { 556 mlog(0, "%s: res %.*s, AST queued while flushing last " 557 "one\n", dlm->name, res->lockname.len, 558 res->lockname.name); 559 } else 560 lock->ast_pending = 0; 561 562 /* drop the extra ref. 563 * this may drop it completely. */ 564 dlm_lock_put(lock); 565 dlm_lockres_release_ast(dlm, res); 566 } 567 568 while (!list_empty(&dlm->pending_basts)) { 569 lock = list_entry(dlm->pending_basts.next, 570 struct dlm_lock, bast_list); 571 /* get an extra ref on lock */ 572 dlm_lock_get(lock); 573 res = lock->lockres; 574 575 BUG_ON(!lock->bast_pending); 576 577 /* get the highest blocked lock, and reset */ 578 spin_lock(&lock->spinlock); 579 BUG_ON(lock->ml.highest_blocked <= LKM_IVMODE); 580 hi = lock->ml.highest_blocked; 581 lock->ml.highest_blocked = LKM_IVMODE; 582 spin_unlock(&lock->spinlock); 583 584 /* remove from list (including ref) */ 585 list_del_init(&lock->bast_list); 586 dlm_lock_put(lock); 587 spin_unlock(&dlm->ast_lock); 588 589 mlog(0, "%s: res %.*s, Flush BAST for lock %u:%llu, " 590 "blocked %d, node %u\n", 591 dlm->name, res->lockname.len, res->lockname.name, 592 dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)), 593 dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)), 594 hi, lock->ml.node); 595 596 if (lock->ml.node != dlm->node_num) { 597 ret = dlm_send_proxy_bast(dlm, res, lock, hi); 598 if (ret < 0) 599 mlog_errno(ret); 600 } else 601 dlm_do_local_bast(dlm, res, lock, hi); 602 603 spin_lock(&dlm->ast_lock); 604 605 /* possible that another bast was queued while 606 * we were delivering the last one */ 607 if (!list_empty(&lock->bast_list)) { 608 mlog(0, "%s: res %.*s, BAST queued while flushing last " 609 "one\n", dlm->name, res->lockname.len, 610 res->lockname.name); 611 } else 612 lock->bast_pending = 0; 613 614 /* drop the extra ref. 615 * this may drop it completely. */ 616 dlm_lock_put(lock); 617 dlm_lockres_release_ast(dlm, res); 618 } 619 wake_up(&dlm->ast_wq); 620 spin_unlock(&dlm->ast_lock); 621 } 622 623 624 #define DLM_THREAD_TIMEOUT_MS (4 * 1000) 625 #define DLM_THREAD_MAX_DIRTY 100 626 #define DLM_THREAD_MAX_ASTS 10 627 628 static int dlm_thread(void *data) 629 { 630 struct dlm_lock_resource *res; 631 struct dlm_ctxt *dlm = data; 632 unsigned long timeout = msecs_to_jiffies(DLM_THREAD_TIMEOUT_MS); 633 634 mlog(0, "dlm thread running for %s...\n", dlm->name); 635 636 while (!kthread_should_stop()) { 637 int n = DLM_THREAD_MAX_DIRTY; 638 639 /* dlm_shutting_down is very point-in-time, but that 640 * doesn't matter as we'll just loop back around if we 641 * get false on the leading edge of a state 642 * transition. */ 643 dlm_run_purge_list(dlm, dlm_shutting_down(dlm)); 644 645 /* We really don't want to hold dlm->spinlock while 646 * calling dlm_shuffle_lists on each lockres that 647 * needs to have its queues adjusted and AST/BASTs 648 * run. So let's pull each entry off the dirty_list 649 * and drop dlm->spinlock ASAP. Once off the list, 650 * res->spinlock needs to be taken again to protect 651 * the queues while calling dlm_shuffle_lists. */ 652 spin_lock(&dlm->spinlock); 653 while (!list_empty(&dlm->dirty_list)) { 654 int delay = 0; 655 res = list_entry(dlm->dirty_list.next, 656 struct dlm_lock_resource, dirty); 657 658 /* peel a lockres off, remove it from the list, 659 * unset the dirty flag and drop the dlm lock */ 660 BUG_ON(!res); 661 dlm_lockres_get(res); 662 663 spin_lock(&res->spinlock); 664 /* We clear the DLM_LOCK_RES_DIRTY state once we shuffle lists below */ 665 list_del_init(&res->dirty); 666 spin_unlock(&res->spinlock); 667 spin_unlock(&dlm->spinlock); 668 /* Drop dirty_list ref */ 669 dlm_lockres_put(res); 670 671 /* lockres can be re-dirtied/re-added to the 672 * dirty_list in this gap, but that is ok */ 673 674 spin_lock(&dlm->ast_lock); 675 spin_lock(&res->spinlock); 676 if (res->owner != dlm->node_num) { 677 __dlm_print_one_lock_resource(res); 678 mlog(ML_ERROR, "%s: inprog %d, mig %d, reco %d," 679 " dirty %d\n", dlm->name, 680 !!(res->state & DLM_LOCK_RES_IN_PROGRESS), 681 !!(res->state & DLM_LOCK_RES_MIGRATING), 682 !!(res->state & DLM_LOCK_RES_RECOVERING), 683 !!(res->state & DLM_LOCK_RES_DIRTY)); 684 } 685 BUG_ON(res->owner != dlm->node_num); 686 687 /* it is now ok to move lockreses in these states 688 * to the dirty list, assuming that they will only be 689 * dirty for a short while. */ 690 BUG_ON(res->state & DLM_LOCK_RES_MIGRATING); 691 if (res->state & (DLM_LOCK_RES_IN_PROGRESS | 692 DLM_LOCK_RES_RECOVERING)) { 693 /* move it to the tail and keep going */ 694 res->state &= ~DLM_LOCK_RES_DIRTY; 695 spin_unlock(&res->spinlock); 696 spin_unlock(&dlm->ast_lock); 697 mlog(0, "%s: res %.*s, inprogress, delay list " 698 "shuffle, state %d\n", dlm->name, 699 res->lockname.len, res->lockname.name, 700 res->state); 701 delay = 1; 702 goto in_progress; 703 } 704 705 /* at this point the lockres is not migrating/ 706 * recovering/in-progress. we have the lockres 707 * spinlock and do NOT have the dlm lock. 708 * safe to reserve/queue asts and run the lists. */ 709 710 /* called while holding lockres lock */ 711 dlm_shuffle_lists(dlm, res); 712 res->state &= ~DLM_LOCK_RES_DIRTY; 713 spin_unlock(&res->spinlock); 714 spin_unlock(&dlm->ast_lock); 715 716 dlm_lockres_calc_usage(dlm, res); 717 718 in_progress: 719 720 spin_lock(&dlm->spinlock); 721 /* if the lock was in-progress, stick 722 * it on the back of the list */ 723 if (delay) { 724 spin_lock(&res->spinlock); 725 __dlm_dirty_lockres(dlm, res); 726 spin_unlock(&res->spinlock); 727 } 728 dlm_lockres_put(res); 729 730 /* unlikely, but we may need to give time to 731 * other tasks */ 732 if (!--n) { 733 mlog(0, "%s: Throttling dlm thread\n", 734 dlm->name); 735 break; 736 } 737 } 738 739 spin_unlock(&dlm->spinlock); 740 dlm_flush_asts(dlm); 741 742 /* yield and continue right away if there is more work to do */ 743 if (!n) { 744 cond_resched(); 745 continue; 746 } 747 748 wait_event_interruptible_timeout(dlm->dlm_thread_wq, 749 !dlm_dirty_list_empty(dlm) || 750 kthread_should_stop(), 751 timeout); 752 } 753 754 mlog(0, "quitting DLM thread\n"); 755 return 0; 756 } 757