1 /* -*- mode: c; c-basic-offset: 8; -*- 2 * vim: noexpandtab sw=8 ts=8 sts=0: 3 * 4 * dlmthread.c 5 * 6 * standalone DLM module 7 * 8 * Copyright (C) 2004 Oracle. All rights reserved. 9 * 10 * This program is free software; you can redistribute it and/or 11 * modify it under the terms of the GNU General Public 12 * License as published by the Free Software Foundation; either 13 * version 2 of the License, or (at your option) any later version. 14 * 15 * This program is distributed in the hope that it will be useful, 16 * but WITHOUT ANY WARRANTY; without even the implied warranty of 17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 18 * General Public License for more details. 19 * 20 * You should have received a copy of the GNU General Public 21 * License along with this program; if not, write to the 22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 23 * Boston, MA 021110-1307, USA. 24 * 25 */ 26 27 28 #include <linux/module.h> 29 #include <linux/fs.h> 30 #include <linux/types.h> 31 #include <linux/highmem.h> 32 #include <linux/init.h> 33 #include <linux/sysctl.h> 34 #include <linux/random.h> 35 #include <linux/blkdev.h> 36 #include <linux/socket.h> 37 #include <linux/inet.h> 38 #include <linux/timer.h> 39 #include <linux/kthread.h> 40 #include <linux/delay.h> 41 42 43 #include "cluster/heartbeat.h" 44 #include "cluster/nodemanager.h" 45 #include "cluster/tcp.h" 46 47 #include "dlmapi.h" 48 #include "dlmcommon.h" 49 #include "dlmdomain.h" 50 51 #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_THREAD) 52 #include "cluster/masklog.h" 53 54 static int dlm_thread(void *data); 55 static void dlm_flush_asts(struct dlm_ctxt *dlm); 56 57 #define dlm_lock_is_remote(dlm, lock) ((lock)->ml.node != (dlm)->node_num) 58 59 /* will exit holding res->spinlock, but may drop in function */ 60 /* waits until flags are cleared on res->state */ 61 void __dlm_wait_on_lockres_flags(struct dlm_lock_resource *res, int flags) 62 { 63 DECLARE_WAITQUEUE(wait, current); 64 65 assert_spin_locked(&res->spinlock); 66 67 add_wait_queue(&res->wq, &wait); 68 repeat: 69 set_current_state(TASK_UNINTERRUPTIBLE); 70 if (res->state & flags) { 71 spin_unlock(&res->spinlock); 72 schedule(); 73 spin_lock(&res->spinlock); 74 goto repeat; 75 } 76 remove_wait_queue(&res->wq, &wait); 77 __set_current_state(TASK_RUNNING); 78 } 79 80 int __dlm_lockres_has_locks(struct dlm_lock_resource *res) 81 { 82 if (list_empty(&res->granted) && 83 list_empty(&res->converting) && 84 list_empty(&res->blocked)) 85 return 0; 86 return 1; 87 } 88 89 /* "unused": the lockres has no locks, is not on the dirty list, 90 * has no inflight locks (in the gap between mastery and acquiring 91 * the first lock), and has no bits in its refmap. 92 * truly ready to be freed. */ 93 int __dlm_lockres_unused(struct dlm_lock_resource *res) 94 { 95 int bit; 96 97 if (__dlm_lockres_has_locks(res)) 98 return 0; 99 100 if (!list_empty(&res->dirty) || res->state & DLM_LOCK_RES_DIRTY) 101 return 0; 102 103 if (res->state & DLM_LOCK_RES_RECOVERING) 104 return 0; 105 106 bit = find_next_bit(res->refmap, O2NM_MAX_NODES, 0); 107 if (bit < O2NM_MAX_NODES) 108 return 0; 109 110 /* 111 * since the bit for dlm->node_num is not set, inflight_locks better 112 * be zero 113 */ 114 BUG_ON(res->inflight_locks != 0); 115 return 1; 116 } 117 118 119 /* Call whenever you may have added or deleted something from one of 120 * the lockres queue's. This will figure out whether it belongs on the 121 * unused list or not and does the appropriate thing. */ 122 void __dlm_lockres_calc_usage(struct dlm_ctxt *dlm, 123 struct dlm_lock_resource *res) 124 { 125 mlog_entry("%.*s\n", res->lockname.len, res->lockname.name); 126 127 assert_spin_locked(&dlm->spinlock); 128 assert_spin_locked(&res->spinlock); 129 130 if (__dlm_lockres_unused(res)){ 131 if (list_empty(&res->purge)) { 132 mlog(0, "putting lockres %.*s:%p onto purge list\n", 133 res->lockname.len, res->lockname.name, res); 134 135 res->last_used = jiffies; 136 dlm_lockres_get(res); 137 list_add_tail(&res->purge, &dlm->purge_list); 138 dlm->purge_count++; 139 } 140 } else if (!list_empty(&res->purge)) { 141 mlog(0, "removing lockres %.*s:%p from purge list, owner=%u\n", 142 res->lockname.len, res->lockname.name, res, res->owner); 143 144 list_del_init(&res->purge); 145 dlm_lockres_put(res); 146 dlm->purge_count--; 147 } 148 } 149 150 void dlm_lockres_calc_usage(struct dlm_ctxt *dlm, 151 struct dlm_lock_resource *res) 152 { 153 mlog_entry("%.*s\n", res->lockname.len, res->lockname.name); 154 spin_lock(&dlm->spinlock); 155 spin_lock(&res->spinlock); 156 157 __dlm_lockres_calc_usage(dlm, res); 158 159 spin_unlock(&res->spinlock); 160 spin_unlock(&dlm->spinlock); 161 } 162 163 static void dlm_purge_lockres(struct dlm_ctxt *dlm, 164 struct dlm_lock_resource *res) 165 { 166 int master; 167 int ret = 0; 168 169 assert_spin_locked(&dlm->spinlock); 170 assert_spin_locked(&res->spinlock); 171 172 master = (res->owner == dlm->node_num); 173 174 175 mlog(0, "purging lockres %.*s, master = %d\n", res->lockname.len, 176 res->lockname.name, master); 177 178 if (!master) { 179 res->state |= DLM_LOCK_RES_DROPPING_REF; 180 /* drop spinlock... retake below */ 181 spin_unlock(&res->spinlock); 182 spin_unlock(&dlm->spinlock); 183 184 spin_lock(&res->spinlock); 185 /* This ensures that clear refmap is sent after the set */ 186 __dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG); 187 spin_unlock(&res->spinlock); 188 189 /* clear our bit from the master's refmap, ignore errors */ 190 ret = dlm_drop_lockres_ref(dlm, res); 191 if (ret < 0) { 192 mlog_errno(ret); 193 if (!dlm_is_host_down(ret)) 194 BUG(); 195 } 196 mlog(0, "%s:%.*s: dlm_deref_lockres returned %d\n", 197 dlm->name, res->lockname.len, res->lockname.name, ret); 198 spin_lock(&dlm->spinlock); 199 spin_lock(&res->spinlock); 200 } 201 202 if (!list_empty(&res->purge)) { 203 mlog(0, "removing lockres %.*s:%p from purgelist, " 204 "master = %d\n", res->lockname.len, res->lockname.name, 205 res, master); 206 list_del_init(&res->purge); 207 dlm_lockres_put(res); 208 dlm->purge_count--; 209 } 210 211 if (!__dlm_lockres_unused(res)) { 212 mlog(ML_ERROR, "found lockres %s:%.*s: in use after deref\n", 213 dlm->name, res->lockname.len, res->lockname.name); 214 __dlm_print_one_lock_resource(res); 215 BUG(); 216 } 217 218 __dlm_unhash_lockres(res); 219 220 /* lockres is not in the hash now. drop the flag and wake up 221 * any processes waiting in dlm_get_lock_resource. */ 222 if (!master) { 223 res->state &= ~DLM_LOCK_RES_DROPPING_REF; 224 spin_unlock(&res->spinlock); 225 wake_up(&res->wq); 226 } else 227 spin_unlock(&res->spinlock); 228 } 229 230 static void dlm_run_purge_list(struct dlm_ctxt *dlm, 231 int purge_now) 232 { 233 unsigned int run_max, unused; 234 unsigned long purge_jiffies; 235 struct dlm_lock_resource *lockres; 236 237 spin_lock(&dlm->spinlock); 238 run_max = dlm->purge_count; 239 240 while(run_max && !list_empty(&dlm->purge_list)) { 241 run_max--; 242 243 lockres = list_entry(dlm->purge_list.next, 244 struct dlm_lock_resource, purge); 245 246 spin_lock(&lockres->spinlock); 247 248 purge_jiffies = lockres->last_used + 249 msecs_to_jiffies(DLM_PURGE_INTERVAL_MS); 250 251 /* Make sure that we want to be processing this guy at 252 * this time. */ 253 if (!purge_now && time_after(purge_jiffies, jiffies)) { 254 /* Since resources are added to the purge list 255 * in tail order, we can stop at the first 256 * unpurgable resource -- anyone added after 257 * him will have a greater last_used value */ 258 spin_unlock(&lockres->spinlock); 259 break; 260 } 261 262 /* Status of the lockres *might* change so double 263 * check. If the lockres is unused, holding the dlm 264 * spinlock will prevent people from getting and more 265 * refs on it. */ 266 unused = __dlm_lockres_unused(lockres); 267 if (!unused || 268 (lockres->state & DLM_LOCK_RES_MIGRATING)) { 269 mlog(0, "lockres %s:%.*s: is in use or " 270 "being remastered, used %d, state %d\n", 271 dlm->name, lockres->lockname.len, 272 lockres->lockname.name, !unused, lockres->state); 273 list_move_tail(&dlm->purge_list, &lockres->purge); 274 spin_unlock(&lockres->spinlock); 275 continue; 276 } 277 278 dlm_lockres_get(lockres); 279 280 dlm_purge_lockres(dlm, lockres); 281 282 dlm_lockres_put(lockres); 283 284 /* Avoid adding any scheduling latencies */ 285 cond_resched_lock(&dlm->spinlock); 286 } 287 288 spin_unlock(&dlm->spinlock); 289 } 290 291 static void dlm_shuffle_lists(struct dlm_ctxt *dlm, 292 struct dlm_lock_resource *res) 293 { 294 struct dlm_lock *lock, *target; 295 struct list_head *iter; 296 struct list_head *head; 297 int can_grant = 1; 298 299 //mlog(0, "res->lockname.len=%d\n", res->lockname.len); 300 //mlog(0, "res->lockname.name=%p\n", res->lockname.name); 301 //mlog(0, "shuffle res %.*s\n", res->lockname.len, 302 // res->lockname.name); 303 304 /* because this function is called with the lockres 305 * spinlock, and because we know that it is not migrating/ 306 * recovering/in-progress, it is fine to reserve asts and 307 * basts right before queueing them all throughout */ 308 assert_spin_locked(&dlm->ast_lock); 309 assert_spin_locked(&res->spinlock); 310 BUG_ON((res->state & (DLM_LOCK_RES_MIGRATING| 311 DLM_LOCK_RES_RECOVERING| 312 DLM_LOCK_RES_IN_PROGRESS))); 313 314 converting: 315 if (list_empty(&res->converting)) 316 goto blocked; 317 mlog(0, "res %.*s has locks on a convert queue\n", res->lockname.len, 318 res->lockname.name); 319 320 target = list_entry(res->converting.next, struct dlm_lock, list); 321 if (target->ml.convert_type == LKM_IVMODE) { 322 mlog(ML_ERROR, "%.*s: converting a lock with no " 323 "convert_type!\n", res->lockname.len, res->lockname.name); 324 BUG(); 325 } 326 head = &res->granted; 327 list_for_each(iter, head) { 328 lock = list_entry(iter, struct dlm_lock, list); 329 if (lock==target) 330 continue; 331 if (!dlm_lock_compatible(lock->ml.type, 332 target->ml.convert_type)) { 333 can_grant = 0; 334 /* queue the BAST if not already */ 335 if (lock->ml.highest_blocked == LKM_IVMODE) { 336 __dlm_lockres_reserve_ast(res); 337 __dlm_queue_bast(dlm, lock); 338 } 339 /* update the highest_blocked if needed */ 340 if (lock->ml.highest_blocked < target->ml.convert_type) 341 lock->ml.highest_blocked = 342 target->ml.convert_type; 343 } 344 } 345 head = &res->converting; 346 list_for_each(iter, head) { 347 lock = list_entry(iter, struct dlm_lock, list); 348 if (lock==target) 349 continue; 350 if (!dlm_lock_compatible(lock->ml.type, 351 target->ml.convert_type)) { 352 can_grant = 0; 353 if (lock->ml.highest_blocked == LKM_IVMODE) { 354 __dlm_lockres_reserve_ast(res); 355 __dlm_queue_bast(dlm, lock); 356 } 357 if (lock->ml.highest_blocked < target->ml.convert_type) 358 lock->ml.highest_blocked = 359 target->ml.convert_type; 360 } 361 } 362 363 /* we can convert the lock */ 364 if (can_grant) { 365 spin_lock(&target->spinlock); 366 BUG_ON(target->ml.highest_blocked != LKM_IVMODE); 367 368 mlog(0, "calling ast for converting lock: %.*s, have: %d, " 369 "granting: %d, node: %u\n", res->lockname.len, 370 res->lockname.name, target->ml.type, 371 target->ml.convert_type, target->ml.node); 372 373 target->ml.type = target->ml.convert_type; 374 target->ml.convert_type = LKM_IVMODE; 375 list_move_tail(&target->list, &res->granted); 376 377 BUG_ON(!target->lksb); 378 target->lksb->status = DLM_NORMAL; 379 380 spin_unlock(&target->spinlock); 381 382 __dlm_lockres_reserve_ast(res); 383 __dlm_queue_ast(dlm, target); 384 /* go back and check for more */ 385 goto converting; 386 } 387 388 blocked: 389 if (list_empty(&res->blocked)) 390 goto leave; 391 target = list_entry(res->blocked.next, struct dlm_lock, list); 392 393 head = &res->granted; 394 list_for_each(iter, head) { 395 lock = list_entry(iter, struct dlm_lock, list); 396 if (lock==target) 397 continue; 398 if (!dlm_lock_compatible(lock->ml.type, target->ml.type)) { 399 can_grant = 0; 400 if (lock->ml.highest_blocked == LKM_IVMODE) { 401 __dlm_lockres_reserve_ast(res); 402 __dlm_queue_bast(dlm, lock); 403 } 404 if (lock->ml.highest_blocked < target->ml.type) 405 lock->ml.highest_blocked = target->ml.type; 406 } 407 } 408 409 head = &res->converting; 410 list_for_each(iter, head) { 411 lock = list_entry(iter, struct dlm_lock, list); 412 if (lock==target) 413 continue; 414 if (!dlm_lock_compatible(lock->ml.type, target->ml.type)) { 415 can_grant = 0; 416 if (lock->ml.highest_blocked == LKM_IVMODE) { 417 __dlm_lockres_reserve_ast(res); 418 __dlm_queue_bast(dlm, lock); 419 } 420 if (lock->ml.highest_blocked < target->ml.type) 421 lock->ml.highest_blocked = target->ml.type; 422 } 423 } 424 425 /* we can grant the blocked lock (only 426 * possible if converting list empty) */ 427 if (can_grant) { 428 spin_lock(&target->spinlock); 429 BUG_ON(target->ml.highest_blocked != LKM_IVMODE); 430 431 mlog(0, "calling ast for blocked lock: %.*s, granting: %d, " 432 "node: %u\n", res->lockname.len, res->lockname.name, 433 target->ml.type, target->ml.node); 434 435 // target->ml.type is already correct 436 list_move_tail(&target->list, &res->granted); 437 438 BUG_ON(!target->lksb); 439 target->lksb->status = DLM_NORMAL; 440 441 spin_unlock(&target->spinlock); 442 443 __dlm_lockres_reserve_ast(res); 444 __dlm_queue_ast(dlm, target); 445 /* go back and check for more */ 446 goto converting; 447 } 448 449 leave: 450 return; 451 } 452 453 /* must have NO locks when calling this with res !=NULL * */ 454 void dlm_kick_thread(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) 455 { 456 mlog_entry("dlm=%p, res=%p\n", dlm, res); 457 if (res) { 458 spin_lock(&dlm->spinlock); 459 spin_lock(&res->spinlock); 460 __dlm_dirty_lockres(dlm, res); 461 spin_unlock(&res->spinlock); 462 spin_unlock(&dlm->spinlock); 463 } 464 wake_up(&dlm->dlm_thread_wq); 465 } 466 467 void __dlm_dirty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) 468 { 469 mlog_entry("dlm=%p, res=%p\n", dlm, res); 470 471 assert_spin_locked(&dlm->spinlock); 472 assert_spin_locked(&res->spinlock); 473 474 /* don't shuffle secondary queues */ 475 if ((res->owner == dlm->node_num)) { 476 if (res->state & (DLM_LOCK_RES_MIGRATING | 477 DLM_LOCK_RES_BLOCK_DIRTY)) 478 return; 479 480 if (list_empty(&res->dirty)) { 481 /* ref for dirty_list */ 482 dlm_lockres_get(res); 483 list_add_tail(&res->dirty, &dlm->dirty_list); 484 res->state |= DLM_LOCK_RES_DIRTY; 485 } 486 } 487 } 488 489 490 /* Launch the NM thread for the mounted volume */ 491 int dlm_launch_thread(struct dlm_ctxt *dlm) 492 { 493 mlog(0, "starting dlm thread...\n"); 494 495 dlm->dlm_thread_task = kthread_run(dlm_thread, dlm, "dlm_thread"); 496 if (IS_ERR(dlm->dlm_thread_task)) { 497 mlog_errno(PTR_ERR(dlm->dlm_thread_task)); 498 dlm->dlm_thread_task = NULL; 499 return -EINVAL; 500 } 501 502 return 0; 503 } 504 505 void dlm_complete_thread(struct dlm_ctxt *dlm) 506 { 507 if (dlm->dlm_thread_task) { 508 mlog(ML_KTHREAD, "waiting for dlm thread to exit\n"); 509 kthread_stop(dlm->dlm_thread_task); 510 dlm->dlm_thread_task = NULL; 511 } 512 } 513 514 static int dlm_dirty_list_empty(struct dlm_ctxt *dlm) 515 { 516 int empty; 517 518 spin_lock(&dlm->spinlock); 519 empty = list_empty(&dlm->dirty_list); 520 spin_unlock(&dlm->spinlock); 521 522 return empty; 523 } 524 525 static void dlm_flush_asts(struct dlm_ctxt *dlm) 526 { 527 int ret; 528 struct dlm_lock *lock; 529 struct dlm_lock_resource *res; 530 u8 hi; 531 532 spin_lock(&dlm->ast_lock); 533 while (!list_empty(&dlm->pending_asts)) { 534 lock = list_entry(dlm->pending_asts.next, 535 struct dlm_lock, ast_list); 536 /* get an extra ref on lock */ 537 dlm_lock_get(lock); 538 res = lock->lockres; 539 mlog(0, "delivering an ast for this lockres\n"); 540 541 BUG_ON(!lock->ast_pending); 542 543 /* remove from list (including ref) */ 544 list_del_init(&lock->ast_list); 545 dlm_lock_put(lock); 546 spin_unlock(&dlm->ast_lock); 547 548 if (lock->ml.node != dlm->node_num) { 549 ret = dlm_do_remote_ast(dlm, res, lock); 550 if (ret < 0) 551 mlog_errno(ret); 552 } else 553 dlm_do_local_ast(dlm, res, lock); 554 555 spin_lock(&dlm->ast_lock); 556 557 /* possible that another ast was queued while 558 * we were delivering the last one */ 559 if (!list_empty(&lock->ast_list)) { 560 mlog(0, "aha another ast got queued while " 561 "we were finishing the last one. will " 562 "keep the ast_pending flag set.\n"); 563 } else 564 lock->ast_pending = 0; 565 566 /* drop the extra ref. 567 * this may drop it completely. */ 568 dlm_lock_put(lock); 569 dlm_lockres_release_ast(dlm, res); 570 } 571 572 while (!list_empty(&dlm->pending_basts)) { 573 lock = list_entry(dlm->pending_basts.next, 574 struct dlm_lock, bast_list); 575 /* get an extra ref on lock */ 576 dlm_lock_get(lock); 577 res = lock->lockres; 578 579 BUG_ON(!lock->bast_pending); 580 581 /* get the highest blocked lock, and reset */ 582 spin_lock(&lock->spinlock); 583 BUG_ON(lock->ml.highest_blocked <= LKM_IVMODE); 584 hi = lock->ml.highest_blocked; 585 lock->ml.highest_blocked = LKM_IVMODE; 586 spin_unlock(&lock->spinlock); 587 588 /* remove from list (including ref) */ 589 list_del_init(&lock->bast_list); 590 dlm_lock_put(lock); 591 spin_unlock(&dlm->ast_lock); 592 593 mlog(0, "delivering a bast for this lockres " 594 "(blocked = %d\n", hi); 595 596 if (lock->ml.node != dlm->node_num) { 597 ret = dlm_send_proxy_bast(dlm, res, lock, hi); 598 if (ret < 0) 599 mlog_errno(ret); 600 } else 601 dlm_do_local_bast(dlm, res, lock, hi); 602 603 spin_lock(&dlm->ast_lock); 604 605 /* possible that another bast was queued while 606 * we were delivering the last one */ 607 if (!list_empty(&lock->bast_list)) { 608 mlog(0, "aha another bast got queued while " 609 "we were finishing the last one. will " 610 "keep the bast_pending flag set.\n"); 611 } else 612 lock->bast_pending = 0; 613 614 /* drop the extra ref. 615 * this may drop it completely. */ 616 dlm_lock_put(lock); 617 dlm_lockres_release_ast(dlm, res); 618 } 619 wake_up(&dlm->ast_wq); 620 spin_unlock(&dlm->ast_lock); 621 } 622 623 624 #define DLM_THREAD_TIMEOUT_MS (4 * 1000) 625 #define DLM_THREAD_MAX_DIRTY 100 626 #define DLM_THREAD_MAX_ASTS 10 627 628 static int dlm_thread(void *data) 629 { 630 struct dlm_lock_resource *res; 631 struct dlm_ctxt *dlm = data; 632 unsigned long timeout = msecs_to_jiffies(DLM_THREAD_TIMEOUT_MS); 633 634 mlog(0, "dlm thread running for %s...\n", dlm->name); 635 636 while (!kthread_should_stop()) { 637 int n = DLM_THREAD_MAX_DIRTY; 638 639 /* dlm_shutting_down is very point-in-time, but that 640 * doesn't matter as we'll just loop back around if we 641 * get false on the leading edge of a state 642 * transition. */ 643 dlm_run_purge_list(dlm, dlm_shutting_down(dlm)); 644 645 /* We really don't want to hold dlm->spinlock while 646 * calling dlm_shuffle_lists on each lockres that 647 * needs to have its queues adjusted and AST/BASTs 648 * run. So let's pull each entry off the dirty_list 649 * and drop dlm->spinlock ASAP. Once off the list, 650 * res->spinlock needs to be taken again to protect 651 * the queues while calling dlm_shuffle_lists. */ 652 spin_lock(&dlm->spinlock); 653 while (!list_empty(&dlm->dirty_list)) { 654 int delay = 0; 655 res = list_entry(dlm->dirty_list.next, 656 struct dlm_lock_resource, dirty); 657 658 /* peel a lockres off, remove it from the list, 659 * unset the dirty flag and drop the dlm lock */ 660 BUG_ON(!res); 661 dlm_lockres_get(res); 662 663 spin_lock(&res->spinlock); 664 /* We clear the DLM_LOCK_RES_DIRTY state once we shuffle lists below */ 665 list_del_init(&res->dirty); 666 spin_unlock(&res->spinlock); 667 spin_unlock(&dlm->spinlock); 668 /* Drop dirty_list ref */ 669 dlm_lockres_put(res); 670 671 /* lockres can be re-dirtied/re-added to the 672 * dirty_list in this gap, but that is ok */ 673 674 spin_lock(&dlm->ast_lock); 675 spin_lock(&res->spinlock); 676 if (res->owner != dlm->node_num) { 677 __dlm_print_one_lock_resource(res); 678 mlog(ML_ERROR, "inprog:%s, mig:%s, reco:%s, dirty:%s\n", 679 res->state & DLM_LOCK_RES_IN_PROGRESS ? "yes" : "no", 680 res->state & DLM_LOCK_RES_MIGRATING ? "yes" : "no", 681 res->state & DLM_LOCK_RES_RECOVERING ? "yes" : "no", 682 res->state & DLM_LOCK_RES_DIRTY ? "yes" : "no"); 683 } 684 BUG_ON(res->owner != dlm->node_num); 685 686 /* it is now ok to move lockreses in these states 687 * to the dirty list, assuming that they will only be 688 * dirty for a short while. */ 689 BUG_ON(res->state & DLM_LOCK_RES_MIGRATING); 690 if (res->state & (DLM_LOCK_RES_IN_PROGRESS | 691 DLM_LOCK_RES_RECOVERING)) { 692 /* move it to the tail and keep going */ 693 res->state &= ~DLM_LOCK_RES_DIRTY; 694 spin_unlock(&res->spinlock); 695 spin_unlock(&dlm->ast_lock); 696 mlog(0, "delaying list shuffling for in-" 697 "progress lockres %.*s, state=%d\n", 698 res->lockname.len, res->lockname.name, 699 res->state); 700 delay = 1; 701 goto in_progress; 702 } 703 704 /* at this point the lockres is not migrating/ 705 * recovering/in-progress. we have the lockres 706 * spinlock and do NOT have the dlm lock. 707 * safe to reserve/queue asts and run the lists. */ 708 709 mlog(0, "calling dlm_shuffle_lists with dlm=%s, " 710 "res=%.*s\n", dlm->name, 711 res->lockname.len, res->lockname.name); 712 713 /* called while holding lockres lock */ 714 dlm_shuffle_lists(dlm, res); 715 res->state &= ~DLM_LOCK_RES_DIRTY; 716 spin_unlock(&res->spinlock); 717 spin_unlock(&dlm->ast_lock); 718 719 dlm_lockres_calc_usage(dlm, res); 720 721 in_progress: 722 723 spin_lock(&dlm->spinlock); 724 /* if the lock was in-progress, stick 725 * it on the back of the list */ 726 if (delay) { 727 spin_lock(&res->spinlock); 728 __dlm_dirty_lockres(dlm, res); 729 spin_unlock(&res->spinlock); 730 } 731 dlm_lockres_put(res); 732 733 /* unlikely, but we may need to give time to 734 * other tasks */ 735 if (!--n) { 736 mlog(0, "throttling dlm_thread\n"); 737 break; 738 } 739 } 740 741 spin_unlock(&dlm->spinlock); 742 dlm_flush_asts(dlm); 743 744 /* yield and continue right away if there is more work to do */ 745 if (!n) { 746 cond_resched(); 747 continue; 748 } 749 750 wait_event_interruptible_timeout(dlm->dlm_thread_wq, 751 !dlm_dirty_list_empty(dlm) || 752 kthread_should_stop(), 753 timeout); 754 } 755 756 mlog(0, "quitting DLM thread\n"); 757 return 0; 758 } 759