1 /* -*- mode: c; c-basic-offset: 8; -*- 2 * vim: noexpandtab sw=8 ts=8 sts=0: 3 * 4 * dlmthread.c 5 * 6 * standalone DLM module 7 * 8 * Copyright (C) 2004 Oracle. All rights reserved. 9 * 10 * This program is free software; you can redistribute it and/or 11 * modify it under the terms of the GNU General Public 12 * License as published by the Free Software Foundation; either 13 * version 2 of the License, or (at your option) any later version. 14 * 15 * This program is distributed in the hope that it will be useful, 16 * but WITHOUT ANY WARRANTY; without even the implied warranty of 17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 18 * General Public License for more details. 19 * 20 * You should have received a copy of the GNU General Public 21 * License along with this program; if not, write to the 22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 23 * Boston, MA 021110-1307, USA. 24 * 25 */ 26 27 28 #include <linux/module.h> 29 #include <linux/fs.h> 30 #include <linux/types.h> 31 #include <linux/highmem.h> 32 #include <linux/init.h> 33 #include <linux/sysctl.h> 34 #include <linux/random.h> 35 #include <linux/blkdev.h> 36 #include <linux/socket.h> 37 #include <linux/inet.h> 38 #include <linux/timer.h> 39 #include <linux/kthread.h> 40 #include <linux/delay.h> 41 42 43 #include "cluster/heartbeat.h" 44 #include "cluster/nodemanager.h" 45 #include "cluster/tcp.h" 46 47 #include "dlmapi.h" 48 #include "dlmcommon.h" 49 #include "dlmdomain.h" 50 51 #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_THREAD) 52 #include "cluster/masklog.h" 53 54 static int dlm_thread(void *data); 55 static void dlm_flush_asts(struct dlm_ctxt *dlm); 56 57 #define dlm_lock_is_remote(dlm, lock) ((lock)->ml.node != (dlm)->node_num) 58 59 /* will exit holding res->spinlock, but may drop in function */ 60 /* waits until flags are cleared on res->state */ 61 void __dlm_wait_on_lockres_flags(struct dlm_lock_resource *res, int flags) 62 { 63 DECLARE_WAITQUEUE(wait, current); 64 65 assert_spin_locked(&res->spinlock); 66 67 add_wait_queue(&res->wq, &wait); 68 repeat: 69 set_current_state(TASK_UNINTERRUPTIBLE); 70 if (res->state & flags) { 71 spin_unlock(&res->spinlock); 72 schedule(); 73 spin_lock(&res->spinlock); 74 goto repeat; 75 } 76 remove_wait_queue(&res->wq, &wait); 77 __set_current_state(TASK_RUNNING); 78 } 79 80 int __dlm_lockres_has_locks(struct dlm_lock_resource *res) 81 { 82 if (list_empty(&res->granted) && 83 list_empty(&res->converting) && 84 list_empty(&res->blocked)) 85 return 0; 86 return 1; 87 } 88 89 /* "unused": the lockres has no locks, is not on the dirty list, 90 * has no inflight locks (in the gap between mastery and acquiring 91 * the first lock), and has no bits in its refmap. 92 * truly ready to be freed. */ 93 int __dlm_lockres_unused(struct dlm_lock_resource *res) 94 { 95 int bit; 96 97 assert_spin_locked(&res->spinlock); 98 99 if (__dlm_lockres_has_locks(res)) 100 return 0; 101 102 /* Locks are in the process of being created */ 103 if (res->inflight_locks) 104 return 0; 105 106 if (!list_empty(&res->dirty) || res->state & DLM_LOCK_RES_DIRTY) 107 return 0; 108 109 if (res->state & DLM_LOCK_RES_RECOVERING) 110 return 0; 111 112 /* Another node has this resource with this node as the master */ 113 bit = find_next_bit(res->refmap, O2NM_MAX_NODES, 0); 114 if (bit < O2NM_MAX_NODES) 115 return 0; 116 117 return 1; 118 } 119 120 121 /* Call whenever you may have added or deleted something from one of 122 * the lockres queue's. This will figure out whether it belongs on the 123 * unused list or not and does the appropriate thing. */ 124 void __dlm_lockres_calc_usage(struct dlm_ctxt *dlm, 125 struct dlm_lock_resource *res) 126 { 127 assert_spin_locked(&dlm->spinlock); 128 assert_spin_locked(&res->spinlock); 129 130 if (__dlm_lockres_unused(res)){ 131 if (list_empty(&res->purge)) { 132 mlog(0, "%s: Adding res %.*s to purge list\n", 133 dlm->name, res->lockname.len, res->lockname.name); 134 135 res->last_used = jiffies; 136 dlm_lockres_get(res); 137 list_add_tail(&res->purge, &dlm->purge_list); 138 dlm->purge_count++; 139 } 140 } else if (!list_empty(&res->purge)) { 141 mlog(0, "%s: Removing res %.*s from purge list\n", 142 dlm->name, res->lockname.len, res->lockname.name); 143 144 list_del_init(&res->purge); 145 dlm_lockres_put(res); 146 dlm->purge_count--; 147 } 148 } 149 150 void dlm_lockres_calc_usage(struct dlm_ctxt *dlm, 151 struct dlm_lock_resource *res) 152 { 153 spin_lock(&dlm->spinlock); 154 spin_lock(&res->spinlock); 155 156 __dlm_lockres_calc_usage(dlm, res); 157 158 spin_unlock(&res->spinlock); 159 spin_unlock(&dlm->spinlock); 160 } 161 162 static void dlm_purge_lockres(struct dlm_ctxt *dlm, 163 struct dlm_lock_resource *res) 164 { 165 int master; 166 int ret = 0; 167 168 assert_spin_locked(&dlm->spinlock); 169 assert_spin_locked(&res->spinlock); 170 171 master = (res->owner == dlm->node_num); 172 173 mlog(0, "%s: Purging res %.*s, master %d\n", dlm->name, 174 res->lockname.len, res->lockname.name, master); 175 176 if (!master) { 177 res->state |= DLM_LOCK_RES_DROPPING_REF; 178 /* drop spinlock... retake below */ 179 spin_unlock(&res->spinlock); 180 spin_unlock(&dlm->spinlock); 181 182 spin_lock(&res->spinlock); 183 /* This ensures that clear refmap is sent after the set */ 184 __dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG); 185 spin_unlock(&res->spinlock); 186 187 /* clear our bit from the master's refmap, ignore errors */ 188 ret = dlm_drop_lockres_ref(dlm, res); 189 if (ret < 0) { 190 if (!dlm_is_host_down(ret)) 191 BUG(); 192 } 193 spin_lock(&dlm->spinlock); 194 spin_lock(&res->spinlock); 195 } 196 197 if (!list_empty(&res->purge)) { 198 mlog(0, "%s: Removing res %.*s from purgelist, master %d\n", 199 dlm->name, res->lockname.len, res->lockname.name, master); 200 list_del_init(&res->purge); 201 dlm_lockres_put(res); 202 dlm->purge_count--; 203 } 204 205 if (!__dlm_lockres_unused(res)) { 206 mlog(ML_ERROR, "%s: res %.*s in use after deref\n", 207 dlm->name, res->lockname.len, res->lockname.name); 208 __dlm_print_one_lock_resource(res); 209 BUG(); 210 } 211 212 __dlm_unhash_lockres(dlm, res); 213 214 /* lockres is not in the hash now. drop the flag and wake up 215 * any processes waiting in dlm_get_lock_resource. */ 216 if (!master) { 217 res->state &= ~DLM_LOCK_RES_DROPPING_REF; 218 spin_unlock(&res->spinlock); 219 wake_up(&res->wq); 220 } else 221 spin_unlock(&res->spinlock); 222 } 223 224 static void dlm_run_purge_list(struct dlm_ctxt *dlm, 225 int purge_now) 226 { 227 unsigned int run_max, unused; 228 unsigned long purge_jiffies; 229 struct dlm_lock_resource *lockres; 230 231 spin_lock(&dlm->spinlock); 232 run_max = dlm->purge_count; 233 234 while(run_max && !list_empty(&dlm->purge_list)) { 235 run_max--; 236 237 lockres = list_entry(dlm->purge_list.next, 238 struct dlm_lock_resource, purge); 239 240 spin_lock(&lockres->spinlock); 241 242 purge_jiffies = lockres->last_used + 243 msecs_to_jiffies(DLM_PURGE_INTERVAL_MS); 244 245 /* Make sure that we want to be processing this guy at 246 * this time. */ 247 if (!purge_now && time_after(purge_jiffies, jiffies)) { 248 /* Since resources are added to the purge list 249 * in tail order, we can stop at the first 250 * unpurgable resource -- anyone added after 251 * him will have a greater last_used value */ 252 spin_unlock(&lockres->spinlock); 253 break; 254 } 255 256 /* Status of the lockres *might* change so double 257 * check. If the lockres is unused, holding the dlm 258 * spinlock will prevent people from getting and more 259 * refs on it. */ 260 unused = __dlm_lockres_unused(lockres); 261 if (!unused || 262 (lockres->state & DLM_LOCK_RES_MIGRATING)) { 263 mlog(0, "%s: res %.*s is in use or being remastered, " 264 "used %d, state %d\n", dlm->name, 265 lockres->lockname.len, lockres->lockname.name, 266 !unused, lockres->state); 267 list_move_tail(&dlm->purge_list, &lockres->purge); 268 spin_unlock(&lockres->spinlock); 269 continue; 270 } 271 272 dlm_lockres_get(lockres); 273 274 dlm_purge_lockres(dlm, lockres); 275 276 dlm_lockres_put(lockres); 277 278 /* Avoid adding any scheduling latencies */ 279 cond_resched_lock(&dlm->spinlock); 280 } 281 282 spin_unlock(&dlm->spinlock); 283 } 284 285 static void dlm_shuffle_lists(struct dlm_ctxt *dlm, 286 struct dlm_lock_resource *res) 287 { 288 struct dlm_lock *lock, *target; 289 int can_grant = 1; 290 291 /* 292 * Because this function is called with the lockres 293 * spinlock, and because we know that it is not migrating/ 294 * recovering/in-progress, it is fine to reserve asts and 295 * basts right before queueing them all throughout 296 */ 297 assert_spin_locked(&dlm->ast_lock); 298 assert_spin_locked(&res->spinlock); 299 BUG_ON((res->state & (DLM_LOCK_RES_MIGRATING| 300 DLM_LOCK_RES_RECOVERING| 301 DLM_LOCK_RES_IN_PROGRESS))); 302 303 converting: 304 if (list_empty(&res->converting)) 305 goto blocked; 306 mlog(0, "%s: res %.*s has locks on the convert queue\n", dlm->name, 307 res->lockname.len, res->lockname.name); 308 309 target = list_entry(res->converting.next, struct dlm_lock, list); 310 if (target->ml.convert_type == LKM_IVMODE) { 311 mlog(ML_ERROR, "%s: res %.*s converting lock to invalid mode\n", 312 dlm->name, res->lockname.len, res->lockname.name); 313 BUG(); 314 } 315 list_for_each_entry(lock, &res->granted, list) { 316 if (lock==target) 317 continue; 318 if (!dlm_lock_compatible(lock->ml.type, 319 target->ml.convert_type)) { 320 can_grant = 0; 321 /* queue the BAST if not already */ 322 if (lock->ml.highest_blocked == LKM_IVMODE) { 323 __dlm_lockres_reserve_ast(res); 324 __dlm_queue_bast(dlm, lock); 325 } 326 /* update the highest_blocked if needed */ 327 if (lock->ml.highest_blocked < target->ml.convert_type) 328 lock->ml.highest_blocked = 329 target->ml.convert_type; 330 } 331 } 332 333 list_for_each_entry(lock, &res->converting, list) { 334 if (lock==target) 335 continue; 336 if (!dlm_lock_compatible(lock->ml.type, 337 target->ml.convert_type)) { 338 can_grant = 0; 339 if (lock->ml.highest_blocked == LKM_IVMODE) { 340 __dlm_lockres_reserve_ast(res); 341 __dlm_queue_bast(dlm, lock); 342 } 343 if (lock->ml.highest_blocked < target->ml.convert_type) 344 lock->ml.highest_blocked = 345 target->ml.convert_type; 346 } 347 } 348 349 /* we can convert the lock */ 350 if (can_grant) { 351 spin_lock(&target->spinlock); 352 BUG_ON(target->ml.highest_blocked != LKM_IVMODE); 353 354 mlog(0, "%s: res %.*s, AST for Converting lock %u:%llu, type " 355 "%d => %d, node %u\n", dlm->name, res->lockname.len, 356 res->lockname.name, 357 dlm_get_lock_cookie_node(be64_to_cpu(target->ml.cookie)), 358 dlm_get_lock_cookie_seq(be64_to_cpu(target->ml.cookie)), 359 target->ml.type, 360 target->ml.convert_type, target->ml.node); 361 362 target->ml.type = target->ml.convert_type; 363 target->ml.convert_type = LKM_IVMODE; 364 list_move_tail(&target->list, &res->granted); 365 366 BUG_ON(!target->lksb); 367 target->lksb->status = DLM_NORMAL; 368 369 spin_unlock(&target->spinlock); 370 371 __dlm_lockres_reserve_ast(res); 372 __dlm_queue_ast(dlm, target); 373 /* go back and check for more */ 374 goto converting; 375 } 376 377 blocked: 378 if (list_empty(&res->blocked)) 379 goto leave; 380 target = list_entry(res->blocked.next, struct dlm_lock, list); 381 382 list_for_each_entry(lock, &res->granted, list) { 383 if (lock==target) 384 continue; 385 if (!dlm_lock_compatible(lock->ml.type, target->ml.type)) { 386 can_grant = 0; 387 if (lock->ml.highest_blocked == LKM_IVMODE) { 388 __dlm_lockres_reserve_ast(res); 389 __dlm_queue_bast(dlm, lock); 390 } 391 if (lock->ml.highest_blocked < target->ml.type) 392 lock->ml.highest_blocked = target->ml.type; 393 } 394 } 395 396 list_for_each_entry(lock, &res->converting, list) { 397 if (lock==target) 398 continue; 399 if (!dlm_lock_compatible(lock->ml.type, target->ml.type)) { 400 can_grant = 0; 401 if (lock->ml.highest_blocked == LKM_IVMODE) { 402 __dlm_lockres_reserve_ast(res); 403 __dlm_queue_bast(dlm, lock); 404 } 405 if (lock->ml.highest_blocked < target->ml.type) 406 lock->ml.highest_blocked = target->ml.type; 407 } 408 } 409 410 /* we can grant the blocked lock (only 411 * possible if converting list empty) */ 412 if (can_grant) { 413 spin_lock(&target->spinlock); 414 BUG_ON(target->ml.highest_blocked != LKM_IVMODE); 415 416 mlog(0, "%s: res %.*s, AST for Blocked lock %u:%llu, type %d, " 417 "node %u\n", dlm->name, res->lockname.len, 418 res->lockname.name, 419 dlm_get_lock_cookie_node(be64_to_cpu(target->ml.cookie)), 420 dlm_get_lock_cookie_seq(be64_to_cpu(target->ml.cookie)), 421 target->ml.type, target->ml.node); 422 423 /* target->ml.type is already correct */ 424 list_move_tail(&target->list, &res->granted); 425 426 BUG_ON(!target->lksb); 427 target->lksb->status = DLM_NORMAL; 428 429 spin_unlock(&target->spinlock); 430 431 __dlm_lockres_reserve_ast(res); 432 __dlm_queue_ast(dlm, target); 433 /* go back and check for more */ 434 goto converting; 435 } 436 437 leave: 438 return; 439 } 440 441 /* must have NO locks when calling this with res !=NULL * */ 442 void dlm_kick_thread(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) 443 { 444 if (res) { 445 spin_lock(&dlm->spinlock); 446 spin_lock(&res->spinlock); 447 __dlm_dirty_lockres(dlm, res); 448 spin_unlock(&res->spinlock); 449 spin_unlock(&dlm->spinlock); 450 } 451 wake_up(&dlm->dlm_thread_wq); 452 } 453 454 void __dlm_dirty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) 455 { 456 assert_spin_locked(&dlm->spinlock); 457 assert_spin_locked(&res->spinlock); 458 459 /* don't shuffle secondary queues */ 460 if ((res->owner == dlm->node_num)) { 461 if (res->state & (DLM_LOCK_RES_MIGRATING | 462 DLM_LOCK_RES_BLOCK_DIRTY)) 463 return; 464 465 if (list_empty(&res->dirty)) { 466 /* ref for dirty_list */ 467 dlm_lockres_get(res); 468 list_add_tail(&res->dirty, &dlm->dirty_list); 469 res->state |= DLM_LOCK_RES_DIRTY; 470 } 471 } 472 473 mlog(0, "%s: res %.*s\n", dlm->name, res->lockname.len, 474 res->lockname.name); 475 } 476 477 478 /* Launch the NM thread for the mounted volume */ 479 int dlm_launch_thread(struct dlm_ctxt *dlm) 480 { 481 mlog(0, "Starting dlm_thread...\n"); 482 483 dlm->dlm_thread_task = kthread_run(dlm_thread, dlm, "dlm_thread"); 484 if (IS_ERR(dlm->dlm_thread_task)) { 485 mlog_errno(PTR_ERR(dlm->dlm_thread_task)); 486 dlm->dlm_thread_task = NULL; 487 return -EINVAL; 488 } 489 490 return 0; 491 } 492 493 void dlm_complete_thread(struct dlm_ctxt *dlm) 494 { 495 if (dlm->dlm_thread_task) { 496 mlog(ML_KTHREAD, "Waiting for dlm thread to exit\n"); 497 kthread_stop(dlm->dlm_thread_task); 498 dlm->dlm_thread_task = NULL; 499 } 500 } 501 502 static int dlm_dirty_list_empty(struct dlm_ctxt *dlm) 503 { 504 int empty; 505 506 spin_lock(&dlm->spinlock); 507 empty = list_empty(&dlm->dirty_list); 508 spin_unlock(&dlm->spinlock); 509 510 return empty; 511 } 512 513 static void dlm_flush_asts(struct dlm_ctxt *dlm) 514 { 515 int ret; 516 struct dlm_lock *lock; 517 struct dlm_lock_resource *res; 518 u8 hi; 519 520 spin_lock(&dlm->ast_lock); 521 while (!list_empty(&dlm->pending_asts)) { 522 lock = list_entry(dlm->pending_asts.next, 523 struct dlm_lock, ast_list); 524 /* get an extra ref on lock */ 525 dlm_lock_get(lock); 526 res = lock->lockres; 527 mlog(0, "%s: res %.*s, Flush AST for lock %u:%llu, type %d, " 528 "node %u\n", dlm->name, res->lockname.len, 529 res->lockname.name, 530 dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)), 531 dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)), 532 lock->ml.type, lock->ml.node); 533 534 BUG_ON(!lock->ast_pending); 535 536 /* remove from list (including ref) */ 537 list_del_init(&lock->ast_list); 538 dlm_lock_put(lock); 539 spin_unlock(&dlm->ast_lock); 540 541 if (lock->ml.node != dlm->node_num) { 542 ret = dlm_do_remote_ast(dlm, res, lock); 543 if (ret < 0) 544 mlog_errno(ret); 545 } else 546 dlm_do_local_ast(dlm, res, lock); 547 548 spin_lock(&dlm->ast_lock); 549 550 /* possible that another ast was queued while 551 * we were delivering the last one */ 552 if (!list_empty(&lock->ast_list)) { 553 mlog(0, "%s: res %.*s, AST queued while flushing last " 554 "one\n", dlm->name, res->lockname.len, 555 res->lockname.name); 556 } else 557 lock->ast_pending = 0; 558 559 /* drop the extra ref. 560 * this may drop it completely. */ 561 dlm_lock_put(lock); 562 dlm_lockres_release_ast(dlm, res); 563 } 564 565 while (!list_empty(&dlm->pending_basts)) { 566 lock = list_entry(dlm->pending_basts.next, 567 struct dlm_lock, bast_list); 568 /* get an extra ref on lock */ 569 dlm_lock_get(lock); 570 res = lock->lockres; 571 572 BUG_ON(!lock->bast_pending); 573 574 /* get the highest blocked lock, and reset */ 575 spin_lock(&lock->spinlock); 576 BUG_ON(lock->ml.highest_blocked <= LKM_IVMODE); 577 hi = lock->ml.highest_blocked; 578 lock->ml.highest_blocked = LKM_IVMODE; 579 spin_unlock(&lock->spinlock); 580 581 /* remove from list (including ref) */ 582 list_del_init(&lock->bast_list); 583 dlm_lock_put(lock); 584 spin_unlock(&dlm->ast_lock); 585 586 mlog(0, "%s: res %.*s, Flush BAST for lock %u:%llu, " 587 "blocked %d, node %u\n", 588 dlm->name, res->lockname.len, res->lockname.name, 589 dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)), 590 dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)), 591 hi, lock->ml.node); 592 593 if (lock->ml.node != dlm->node_num) { 594 ret = dlm_send_proxy_bast(dlm, res, lock, hi); 595 if (ret < 0) 596 mlog_errno(ret); 597 } else 598 dlm_do_local_bast(dlm, res, lock, hi); 599 600 spin_lock(&dlm->ast_lock); 601 602 /* possible that another bast was queued while 603 * we were delivering the last one */ 604 if (!list_empty(&lock->bast_list)) { 605 mlog(0, "%s: res %.*s, BAST queued while flushing last " 606 "one\n", dlm->name, res->lockname.len, 607 res->lockname.name); 608 } else 609 lock->bast_pending = 0; 610 611 /* drop the extra ref. 612 * this may drop it completely. */ 613 dlm_lock_put(lock); 614 dlm_lockres_release_ast(dlm, res); 615 } 616 wake_up(&dlm->ast_wq); 617 spin_unlock(&dlm->ast_lock); 618 } 619 620 621 #define DLM_THREAD_TIMEOUT_MS (4 * 1000) 622 #define DLM_THREAD_MAX_DIRTY 100 623 #define DLM_THREAD_MAX_ASTS 10 624 625 static int dlm_thread(void *data) 626 { 627 struct dlm_lock_resource *res; 628 struct dlm_ctxt *dlm = data; 629 unsigned long timeout = msecs_to_jiffies(DLM_THREAD_TIMEOUT_MS); 630 631 mlog(0, "dlm thread running for %s...\n", dlm->name); 632 633 while (!kthread_should_stop()) { 634 int n = DLM_THREAD_MAX_DIRTY; 635 636 /* dlm_shutting_down is very point-in-time, but that 637 * doesn't matter as we'll just loop back around if we 638 * get false on the leading edge of a state 639 * transition. */ 640 dlm_run_purge_list(dlm, dlm_shutting_down(dlm)); 641 642 /* We really don't want to hold dlm->spinlock while 643 * calling dlm_shuffle_lists on each lockres that 644 * needs to have its queues adjusted and AST/BASTs 645 * run. So let's pull each entry off the dirty_list 646 * and drop dlm->spinlock ASAP. Once off the list, 647 * res->spinlock needs to be taken again to protect 648 * the queues while calling dlm_shuffle_lists. */ 649 spin_lock(&dlm->spinlock); 650 while (!list_empty(&dlm->dirty_list)) { 651 int delay = 0; 652 res = list_entry(dlm->dirty_list.next, 653 struct dlm_lock_resource, dirty); 654 655 /* peel a lockres off, remove it from the list, 656 * unset the dirty flag and drop the dlm lock */ 657 BUG_ON(!res); 658 dlm_lockres_get(res); 659 660 spin_lock(&res->spinlock); 661 /* We clear the DLM_LOCK_RES_DIRTY state once we shuffle lists below */ 662 list_del_init(&res->dirty); 663 spin_unlock(&res->spinlock); 664 spin_unlock(&dlm->spinlock); 665 /* Drop dirty_list ref */ 666 dlm_lockres_put(res); 667 668 /* lockres can be re-dirtied/re-added to the 669 * dirty_list in this gap, but that is ok */ 670 671 spin_lock(&dlm->ast_lock); 672 spin_lock(&res->spinlock); 673 if (res->owner != dlm->node_num) { 674 __dlm_print_one_lock_resource(res); 675 mlog(ML_ERROR, "%s: inprog %d, mig %d, reco %d," 676 " dirty %d\n", dlm->name, 677 !!(res->state & DLM_LOCK_RES_IN_PROGRESS), 678 !!(res->state & DLM_LOCK_RES_MIGRATING), 679 !!(res->state & DLM_LOCK_RES_RECOVERING), 680 !!(res->state & DLM_LOCK_RES_DIRTY)); 681 } 682 BUG_ON(res->owner != dlm->node_num); 683 684 /* it is now ok to move lockreses in these states 685 * to the dirty list, assuming that they will only be 686 * dirty for a short while. */ 687 BUG_ON(res->state & DLM_LOCK_RES_MIGRATING); 688 if (res->state & (DLM_LOCK_RES_IN_PROGRESS | 689 DLM_LOCK_RES_RECOVERING)) { 690 /* move it to the tail and keep going */ 691 res->state &= ~DLM_LOCK_RES_DIRTY; 692 spin_unlock(&res->spinlock); 693 spin_unlock(&dlm->ast_lock); 694 mlog(0, "%s: res %.*s, inprogress, delay list " 695 "shuffle, state %d\n", dlm->name, 696 res->lockname.len, res->lockname.name, 697 res->state); 698 delay = 1; 699 goto in_progress; 700 } 701 702 /* at this point the lockres is not migrating/ 703 * recovering/in-progress. we have the lockres 704 * spinlock and do NOT have the dlm lock. 705 * safe to reserve/queue asts and run the lists. */ 706 707 /* called while holding lockres lock */ 708 dlm_shuffle_lists(dlm, res); 709 res->state &= ~DLM_LOCK_RES_DIRTY; 710 spin_unlock(&res->spinlock); 711 spin_unlock(&dlm->ast_lock); 712 713 dlm_lockres_calc_usage(dlm, res); 714 715 in_progress: 716 717 spin_lock(&dlm->spinlock); 718 /* if the lock was in-progress, stick 719 * it on the back of the list */ 720 if (delay) { 721 spin_lock(&res->spinlock); 722 __dlm_dirty_lockres(dlm, res); 723 spin_unlock(&res->spinlock); 724 } 725 dlm_lockres_put(res); 726 727 /* unlikely, but we may need to give time to 728 * other tasks */ 729 if (!--n) { 730 mlog(0, "%s: Throttling dlm thread\n", 731 dlm->name); 732 break; 733 } 734 } 735 736 spin_unlock(&dlm->spinlock); 737 dlm_flush_asts(dlm); 738 739 /* yield and continue right away if there is more work to do */ 740 if (!n) { 741 cond_resched(); 742 continue; 743 } 744 745 wait_event_interruptible_timeout(dlm->dlm_thread_wq, 746 !dlm_dirty_list_empty(dlm) || 747 kthread_should_stop(), 748 timeout); 749 } 750 751 mlog(0, "quitting DLM thread\n"); 752 return 0; 753 } 754