1 /* -*- mode: c; c-basic-offset: 8; -*- 2 * vim: noexpandtab sw=8 ts=8 sts=0: 3 * 4 * dlmthread.c 5 * 6 * standalone DLM module 7 * 8 * Copyright (C) 2004 Oracle. All rights reserved. 9 * 10 * This program is free software; you can redistribute it and/or 11 * modify it under the terms of the GNU General Public 12 * License as published by the Free Software Foundation; either 13 * version 2 of the License, or (at your option) any later version. 14 * 15 * This program is distributed in the hope that it will be useful, 16 * but WITHOUT ANY WARRANTY; without even the implied warranty of 17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 18 * General Public License for more details. 19 * 20 * You should have received a copy of the GNU General Public 21 * License along with this program; if not, write to the 22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 23 * Boston, MA 021110-1307, USA. 24 * 25 */ 26 27 28 #include <linux/module.h> 29 #include <linux/fs.h> 30 #include <linux/types.h> 31 #include <linux/highmem.h> 32 #include <linux/init.h> 33 #include <linux/sysctl.h> 34 #include <linux/random.h> 35 #include <linux/blkdev.h> 36 #include <linux/socket.h> 37 #include <linux/inet.h> 38 #include <linux/timer.h> 39 #include <linux/kthread.h> 40 #include <linux/delay.h> 41 42 43 #include "cluster/heartbeat.h" 44 #include "cluster/nodemanager.h" 45 #include "cluster/tcp.h" 46 47 #include "dlmapi.h" 48 #include "dlmcommon.h" 49 #include "dlmdomain.h" 50 51 #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_THREAD) 52 #include "cluster/masklog.h" 53 54 static int dlm_thread(void *data); 55 static void dlm_flush_asts(struct dlm_ctxt *dlm); 56 57 #define dlm_lock_is_remote(dlm, lock) ((lock)->ml.node != (dlm)->node_num) 58 59 /* will exit holding res->spinlock, but may drop in function */ 60 /* waits until flags are cleared on res->state */ 61 void __dlm_wait_on_lockres_flags(struct dlm_lock_resource *res, int flags) 62 { 63 DECLARE_WAITQUEUE(wait, current); 64 65 assert_spin_locked(&res->spinlock); 66 67 add_wait_queue(&res->wq, &wait); 68 repeat: 69 set_current_state(TASK_UNINTERRUPTIBLE); 70 if (res->state & flags) { 71 spin_unlock(&res->spinlock); 72 schedule(); 73 spin_lock(&res->spinlock); 74 goto repeat; 75 } 76 remove_wait_queue(&res->wq, &wait); 77 __set_current_state(TASK_RUNNING); 78 } 79 80 int __dlm_lockres_has_locks(struct dlm_lock_resource *res) 81 { 82 if (list_empty(&res->granted) && 83 list_empty(&res->converting) && 84 list_empty(&res->blocked)) 85 return 0; 86 return 1; 87 } 88 89 /* "unused": the lockres has no locks, is not on the dirty list, 90 * has no inflight locks (in the gap between mastery and acquiring 91 * the first lock), and has no bits in its refmap. 92 * truly ready to be freed. */ 93 int __dlm_lockres_unused(struct dlm_lock_resource *res) 94 { 95 int bit; 96 97 assert_spin_locked(&res->spinlock); 98 99 if (__dlm_lockres_has_locks(res)) 100 return 0; 101 102 /* Locks are in the process of being created */ 103 if (res->inflight_locks) 104 return 0; 105 106 if (!list_empty(&res->dirty) || res->state & DLM_LOCK_RES_DIRTY) 107 return 0; 108 109 if (res->state & DLM_LOCK_RES_RECOVERING) 110 return 0; 111 112 /* Another node has this resource with this node as the master */ 113 bit = find_next_bit(res->refmap, O2NM_MAX_NODES, 0); 114 if (bit < O2NM_MAX_NODES) 115 return 0; 116 117 return 1; 118 } 119 120 121 /* Call whenever you may have added or deleted something from one of 122 * the lockres queue's. This will figure out whether it belongs on the 123 * unused list or not and does the appropriate thing. */ 124 void __dlm_lockres_calc_usage(struct dlm_ctxt *dlm, 125 struct dlm_lock_resource *res) 126 { 127 assert_spin_locked(&dlm->spinlock); 128 assert_spin_locked(&res->spinlock); 129 130 if (__dlm_lockres_unused(res)){ 131 if (list_empty(&res->purge)) { 132 mlog(0, "%s: Adding res %.*s to purge list\n", 133 dlm->name, res->lockname.len, res->lockname.name); 134 135 res->last_used = jiffies; 136 dlm_lockres_get(res); 137 list_add_tail(&res->purge, &dlm->purge_list); 138 dlm->purge_count++; 139 } 140 } else if (!list_empty(&res->purge)) { 141 mlog(0, "%s: Removing res %.*s from purge list\n", 142 dlm->name, res->lockname.len, res->lockname.name); 143 144 list_del_init(&res->purge); 145 dlm_lockres_put(res); 146 dlm->purge_count--; 147 } 148 } 149 150 void dlm_lockres_calc_usage(struct dlm_ctxt *dlm, 151 struct dlm_lock_resource *res) 152 { 153 spin_lock(&dlm->spinlock); 154 spin_lock(&res->spinlock); 155 156 __dlm_lockres_calc_usage(dlm, res); 157 158 spin_unlock(&res->spinlock); 159 spin_unlock(&dlm->spinlock); 160 } 161 162 static void dlm_purge_lockres(struct dlm_ctxt *dlm, 163 struct dlm_lock_resource *res) 164 { 165 int master; 166 int ret = 0; 167 168 assert_spin_locked(&dlm->spinlock); 169 assert_spin_locked(&res->spinlock); 170 171 master = (res->owner == dlm->node_num); 172 173 mlog(0, "%s: Purging res %.*s, master %d\n", dlm->name, 174 res->lockname.len, res->lockname.name, master); 175 176 if (!master) { 177 res->state |= DLM_LOCK_RES_DROPPING_REF; 178 /* drop spinlock... retake below */ 179 spin_unlock(&res->spinlock); 180 spin_unlock(&dlm->spinlock); 181 182 spin_lock(&res->spinlock); 183 /* This ensures that clear refmap is sent after the set */ 184 __dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG); 185 spin_unlock(&res->spinlock); 186 187 /* clear our bit from the master's refmap, ignore errors */ 188 ret = dlm_drop_lockres_ref(dlm, res); 189 if (ret < 0) { 190 if (!dlm_is_host_down(ret)) 191 BUG(); 192 } 193 spin_lock(&dlm->spinlock); 194 spin_lock(&res->spinlock); 195 } 196 197 if (!list_empty(&res->purge)) { 198 mlog(0, "%s: Removing res %.*s from purgelist, master %d\n", 199 dlm->name, res->lockname.len, res->lockname.name, master); 200 list_del_init(&res->purge); 201 dlm_lockres_put(res); 202 dlm->purge_count--; 203 } 204 205 if (!__dlm_lockres_unused(res)) { 206 mlog(ML_ERROR, "%s: res %.*s in use after deref\n", 207 dlm->name, res->lockname.len, res->lockname.name); 208 __dlm_print_one_lock_resource(res); 209 BUG(); 210 } 211 212 __dlm_unhash_lockres(dlm, res); 213 214 spin_lock(&dlm->track_lock); 215 if (!list_empty(&res->tracking)) 216 list_del_init(&res->tracking); 217 else { 218 mlog(ML_ERROR, "Resource %.*s not on the Tracking list\n", 219 res->lockname.len, res->lockname.name); 220 __dlm_print_one_lock_resource(res); 221 } 222 spin_unlock(&dlm->track_lock); 223 224 /* lockres is not in the hash now. drop the flag and wake up 225 * any processes waiting in dlm_get_lock_resource. */ 226 if (!master) { 227 res->state &= ~DLM_LOCK_RES_DROPPING_REF; 228 spin_unlock(&res->spinlock); 229 wake_up(&res->wq); 230 } else 231 spin_unlock(&res->spinlock); 232 } 233 234 static void dlm_run_purge_list(struct dlm_ctxt *dlm, 235 int purge_now) 236 { 237 unsigned int run_max, unused; 238 unsigned long purge_jiffies; 239 struct dlm_lock_resource *lockres; 240 241 spin_lock(&dlm->spinlock); 242 run_max = dlm->purge_count; 243 244 while(run_max && !list_empty(&dlm->purge_list)) { 245 run_max--; 246 247 lockres = list_entry(dlm->purge_list.next, 248 struct dlm_lock_resource, purge); 249 250 spin_lock(&lockres->spinlock); 251 252 purge_jiffies = lockres->last_used + 253 msecs_to_jiffies(DLM_PURGE_INTERVAL_MS); 254 255 /* Make sure that we want to be processing this guy at 256 * this time. */ 257 if (!purge_now && time_after(purge_jiffies, jiffies)) { 258 /* Since resources are added to the purge list 259 * in tail order, we can stop at the first 260 * unpurgable resource -- anyone added after 261 * him will have a greater last_used value */ 262 spin_unlock(&lockres->spinlock); 263 break; 264 } 265 266 /* Status of the lockres *might* change so double 267 * check. If the lockres is unused, holding the dlm 268 * spinlock will prevent people from getting and more 269 * refs on it. */ 270 unused = __dlm_lockres_unused(lockres); 271 if (!unused || 272 (lockres->state & DLM_LOCK_RES_MIGRATING) || 273 (lockres->inflight_assert_workers != 0)) { 274 mlog(0, "%s: res %.*s is in use or being remastered, " 275 "used %d, state %d, assert master workers %u\n", 276 dlm->name, lockres->lockname.len, 277 lockres->lockname.name, 278 !unused, lockres->state, 279 lockres->inflight_assert_workers); 280 list_move_tail(&lockres->purge, &dlm->purge_list); 281 spin_unlock(&lockres->spinlock); 282 continue; 283 } 284 285 dlm_lockres_get(lockres); 286 287 dlm_purge_lockres(dlm, lockres); 288 289 dlm_lockres_put(lockres); 290 291 /* Avoid adding any scheduling latencies */ 292 cond_resched_lock(&dlm->spinlock); 293 } 294 295 spin_unlock(&dlm->spinlock); 296 } 297 298 static void dlm_shuffle_lists(struct dlm_ctxt *dlm, 299 struct dlm_lock_resource *res) 300 { 301 struct dlm_lock *lock, *target; 302 int can_grant = 1; 303 304 /* 305 * Because this function is called with the lockres 306 * spinlock, and because we know that it is not migrating/ 307 * recovering/in-progress, it is fine to reserve asts and 308 * basts right before queueing them all throughout 309 */ 310 assert_spin_locked(&dlm->ast_lock); 311 assert_spin_locked(&res->spinlock); 312 BUG_ON((res->state & (DLM_LOCK_RES_MIGRATING| 313 DLM_LOCK_RES_RECOVERING| 314 DLM_LOCK_RES_IN_PROGRESS))); 315 316 converting: 317 if (list_empty(&res->converting)) 318 goto blocked; 319 mlog(0, "%s: res %.*s has locks on the convert queue\n", dlm->name, 320 res->lockname.len, res->lockname.name); 321 322 target = list_entry(res->converting.next, struct dlm_lock, list); 323 if (target->ml.convert_type == LKM_IVMODE) { 324 mlog(ML_ERROR, "%s: res %.*s converting lock to invalid mode\n", 325 dlm->name, res->lockname.len, res->lockname.name); 326 BUG(); 327 } 328 list_for_each_entry(lock, &res->granted, list) { 329 if (lock==target) 330 continue; 331 if (!dlm_lock_compatible(lock->ml.type, 332 target->ml.convert_type)) { 333 can_grant = 0; 334 /* queue the BAST if not already */ 335 if (lock->ml.highest_blocked == LKM_IVMODE) { 336 __dlm_lockres_reserve_ast(res); 337 __dlm_queue_bast(dlm, lock); 338 } 339 /* update the highest_blocked if needed */ 340 if (lock->ml.highest_blocked < target->ml.convert_type) 341 lock->ml.highest_blocked = 342 target->ml.convert_type; 343 } 344 } 345 346 list_for_each_entry(lock, &res->converting, list) { 347 if (lock==target) 348 continue; 349 if (!dlm_lock_compatible(lock->ml.type, 350 target->ml.convert_type)) { 351 can_grant = 0; 352 if (lock->ml.highest_blocked == LKM_IVMODE) { 353 __dlm_lockres_reserve_ast(res); 354 __dlm_queue_bast(dlm, lock); 355 } 356 if (lock->ml.highest_blocked < target->ml.convert_type) 357 lock->ml.highest_blocked = 358 target->ml.convert_type; 359 } 360 } 361 362 /* we can convert the lock */ 363 if (can_grant) { 364 spin_lock(&target->spinlock); 365 BUG_ON(target->ml.highest_blocked != LKM_IVMODE); 366 367 mlog(0, "%s: res %.*s, AST for Converting lock %u:%llu, type " 368 "%d => %d, node %u\n", dlm->name, res->lockname.len, 369 res->lockname.name, 370 dlm_get_lock_cookie_node(be64_to_cpu(target->ml.cookie)), 371 dlm_get_lock_cookie_seq(be64_to_cpu(target->ml.cookie)), 372 target->ml.type, 373 target->ml.convert_type, target->ml.node); 374 375 target->ml.type = target->ml.convert_type; 376 target->ml.convert_type = LKM_IVMODE; 377 list_move_tail(&target->list, &res->granted); 378 379 BUG_ON(!target->lksb); 380 target->lksb->status = DLM_NORMAL; 381 382 spin_unlock(&target->spinlock); 383 384 __dlm_lockres_reserve_ast(res); 385 __dlm_queue_ast(dlm, target); 386 /* go back and check for more */ 387 goto converting; 388 } 389 390 blocked: 391 if (list_empty(&res->blocked)) 392 goto leave; 393 target = list_entry(res->blocked.next, struct dlm_lock, list); 394 395 list_for_each_entry(lock, &res->granted, list) { 396 if (lock==target) 397 continue; 398 if (!dlm_lock_compatible(lock->ml.type, target->ml.type)) { 399 can_grant = 0; 400 if (lock->ml.highest_blocked == LKM_IVMODE) { 401 __dlm_lockres_reserve_ast(res); 402 __dlm_queue_bast(dlm, lock); 403 } 404 if (lock->ml.highest_blocked < target->ml.type) 405 lock->ml.highest_blocked = target->ml.type; 406 } 407 } 408 409 list_for_each_entry(lock, &res->converting, list) { 410 if (lock==target) 411 continue; 412 if (!dlm_lock_compatible(lock->ml.type, target->ml.type)) { 413 can_grant = 0; 414 if (lock->ml.highest_blocked == LKM_IVMODE) { 415 __dlm_lockres_reserve_ast(res); 416 __dlm_queue_bast(dlm, lock); 417 } 418 if (lock->ml.highest_blocked < target->ml.type) 419 lock->ml.highest_blocked = target->ml.type; 420 } 421 } 422 423 /* we can grant the blocked lock (only 424 * possible if converting list empty) */ 425 if (can_grant) { 426 spin_lock(&target->spinlock); 427 BUG_ON(target->ml.highest_blocked != LKM_IVMODE); 428 429 mlog(0, "%s: res %.*s, AST for Blocked lock %u:%llu, type %d, " 430 "node %u\n", dlm->name, res->lockname.len, 431 res->lockname.name, 432 dlm_get_lock_cookie_node(be64_to_cpu(target->ml.cookie)), 433 dlm_get_lock_cookie_seq(be64_to_cpu(target->ml.cookie)), 434 target->ml.type, target->ml.node); 435 436 /* target->ml.type is already correct */ 437 list_move_tail(&target->list, &res->granted); 438 439 BUG_ON(!target->lksb); 440 target->lksb->status = DLM_NORMAL; 441 442 spin_unlock(&target->spinlock); 443 444 __dlm_lockres_reserve_ast(res); 445 __dlm_queue_ast(dlm, target); 446 /* go back and check for more */ 447 goto converting; 448 } 449 450 leave: 451 return; 452 } 453 454 /* must have NO locks when calling this with res !=NULL * */ 455 void dlm_kick_thread(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) 456 { 457 if (res) { 458 spin_lock(&dlm->spinlock); 459 spin_lock(&res->spinlock); 460 __dlm_dirty_lockres(dlm, res); 461 spin_unlock(&res->spinlock); 462 spin_unlock(&dlm->spinlock); 463 } 464 wake_up(&dlm->dlm_thread_wq); 465 } 466 467 void __dlm_dirty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) 468 { 469 assert_spin_locked(&dlm->spinlock); 470 assert_spin_locked(&res->spinlock); 471 472 /* don't shuffle secondary queues */ 473 if ((res->owner == dlm->node_num)) { 474 if (res->state & (DLM_LOCK_RES_MIGRATING | 475 DLM_LOCK_RES_BLOCK_DIRTY)) 476 return; 477 478 if (list_empty(&res->dirty)) { 479 /* ref for dirty_list */ 480 dlm_lockres_get(res); 481 list_add_tail(&res->dirty, &dlm->dirty_list); 482 res->state |= DLM_LOCK_RES_DIRTY; 483 } 484 } 485 486 mlog(0, "%s: res %.*s\n", dlm->name, res->lockname.len, 487 res->lockname.name); 488 } 489 490 491 /* Launch the NM thread for the mounted volume */ 492 int dlm_launch_thread(struct dlm_ctxt *dlm) 493 { 494 mlog(0, "Starting dlm_thread...\n"); 495 496 dlm->dlm_thread_task = kthread_run(dlm_thread, dlm, "dlm-%s", 497 dlm->name); 498 if (IS_ERR(dlm->dlm_thread_task)) { 499 mlog_errno(PTR_ERR(dlm->dlm_thread_task)); 500 dlm->dlm_thread_task = NULL; 501 return -EINVAL; 502 } 503 504 return 0; 505 } 506 507 void dlm_complete_thread(struct dlm_ctxt *dlm) 508 { 509 if (dlm->dlm_thread_task) { 510 mlog(ML_KTHREAD, "Waiting for dlm thread to exit\n"); 511 kthread_stop(dlm->dlm_thread_task); 512 dlm->dlm_thread_task = NULL; 513 } 514 } 515 516 static int dlm_dirty_list_empty(struct dlm_ctxt *dlm) 517 { 518 int empty; 519 520 spin_lock(&dlm->spinlock); 521 empty = list_empty(&dlm->dirty_list); 522 spin_unlock(&dlm->spinlock); 523 524 return empty; 525 } 526 527 static void dlm_flush_asts(struct dlm_ctxt *dlm) 528 { 529 int ret; 530 struct dlm_lock *lock; 531 struct dlm_lock_resource *res; 532 u8 hi; 533 534 spin_lock(&dlm->ast_lock); 535 while (!list_empty(&dlm->pending_asts)) { 536 lock = list_entry(dlm->pending_asts.next, 537 struct dlm_lock, ast_list); 538 /* get an extra ref on lock */ 539 dlm_lock_get(lock); 540 res = lock->lockres; 541 mlog(0, "%s: res %.*s, Flush AST for lock %u:%llu, type %d, " 542 "node %u\n", dlm->name, res->lockname.len, 543 res->lockname.name, 544 dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)), 545 dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)), 546 lock->ml.type, lock->ml.node); 547 548 BUG_ON(!lock->ast_pending); 549 550 /* remove from list (including ref) */ 551 list_del_init(&lock->ast_list); 552 dlm_lock_put(lock); 553 spin_unlock(&dlm->ast_lock); 554 555 if (lock->ml.node != dlm->node_num) { 556 ret = dlm_do_remote_ast(dlm, res, lock); 557 if (ret < 0) 558 mlog_errno(ret); 559 } else 560 dlm_do_local_ast(dlm, res, lock); 561 562 spin_lock(&dlm->ast_lock); 563 564 /* possible that another ast was queued while 565 * we were delivering the last one */ 566 if (!list_empty(&lock->ast_list)) { 567 mlog(0, "%s: res %.*s, AST queued while flushing last " 568 "one\n", dlm->name, res->lockname.len, 569 res->lockname.name); 570 } else 571 lock->ast_pending = 0; 572 573 /* drop the extra ref. 574 * this may drop it completely. */ 575 dlm_lock_put(lock); 576 dlm_lockres_release_ast(dlm, res); 577 } 578 579 while (!list_empty(&dlm->pending_basts)) { 580 lock = list_entry(dlm->pending_basts.next, 581 struct dlm_lock, bast_list); 582 /* get an extra ref on lock */ 583 dlm_lock_get(lock); 584 res = lock->lockres; 585 586 BUG_ON(!lock->bast_pending); 587 588 /* get the highest blocked lock, and reset */ 589 spin_lock(&lock->spinlock); 590 BUG_ON(lock->ml.highest_blocked <= LKM_IVMODE); 591 hi = lock->ml.highest_blocked; 592 lock->ml.highest_blocked = LKM_IVMODE; 593 spin_unlock(&lock->spinlock); 594 595 /* remove from list (including ref) */ 596 list_del_init(&lock->bast_list); 597 dlm_lock_put(lock); 598 spin_unlock(&dlm->ast_lock); 599 600 mlog(0, "%s: res %.*s, Flush BAST for lock %u:%llu, " 601 "blocked %d, node %u\n", 602 dlm->name, res->lockname.len, res->lockname.name, 603 dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)), 604 dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)), 605 hi, lock->ml.node); 606 607 if (lock->ml.node != dlm->node_num) { 608 ret = dlm_send_proxy_bast(dlm, res, lock, hi); 609 if (ret < 0) 610 mlog_errno(ret); 611 } else 612 dlm_do_local_bast(dlm, res, lock, hi); 613 614 spin_lock(&dlm->ast_lock); 615 616 /* possible that another bast was queued while 617 * we were delivering the last one */ 618 if (!list_empty(&lock->bast_list)) { 619 mlog(0, "%s: res %.*s, BAST queued while flushing last " 620 "one\n", dlm->name, res->lockname.len, 621 res->lockname.name); 622 } else 623 lock->bast_pending = 0; 624 625 /* drop the extra ref. 626 * this may drop it completely. */ 627 dlm_lock_put(lock); 628 dlm_lockres_release_ast(dlm, res); 629 } 630 wake_up(&dlm->ast_wq); 631 spin_unlock(&dlm->ast_lock); 632 } 633 634 635 #define DLM_THREAD_TIMEOUT_MS (4 * 1000) 636 #define DLM_THREAD_MAX_DIRTY 100 637 #define DLM_THREAD_MAX_ASTS 10 638 639 static int dlm_thread(void *data) 640 { 641 struct dlm_lock_resource *res; 642 struct dlm_ctxt *dlm = data; 643 unsigned long timeout = msecs_to_jiffies(DLM_THREAD_TIMEOUT_MS); 644 645 mlog(0, "dlm thread running for %s...\n", dlm->name); 646 647 while (!kthread_should_stop()) { 648 int n = DLM_THREAD_MAX_DIRTY; 649 650 /* dlm_shutting_down is very point-in-time, but that 651 * doesn't matter as we'll just loop back around if we 652 * get false on the leading edge of a state 653 * transition. */ 654 dlm_run_purge_list(dlm, dlm_shutting_down(dlm)); 655 656 /* We really don't want to hold dlm->spinlock while 657 * calling dlm_shuffle_lists on each lockres that 658 * needs to have its queues adjusted and AST/BASTs 659 * run. So let's pull each entry off the dirty_list 660 * and drop dlm->spinlock ASAP. Once off the list, 661 * res->spinlock needs to be taken again to protect 662 * the queues while calling dlm_shuffle_lists. */ 663 spin_lock(&dlm->spinlock); 664 while (!list_empty(&dlm->dirty_list)) { 665 int delay = 0; 666 res = list_entry(dlm->dirty_list.next, 667 struct dlm_lock_resource, dirty); 668 669 /* peel a lockres off, remove it from the list, 670 * unset the dirty flag and drop the dlm lock */ 671 BUG_ON(!res); 672 dlm_lockres_get(res); 673 674 spin_lock(&res->spinlock); 675 /* We clear the DLM_LOCK_RES_DIRTY state once we shuffle lists below */ 676 list_del_init(&res->dirty); 677 spin_unlock(&res->spinlock); 678 spin_unlock(&dlm->spinlock); 679 /* Drop dirty_list ref */ 680 dlm_lockres_put(res); 681 682 /* lockres can be re-dirtied/re-added to the 683 * dirty_list in this gap, but that is ok */ 684 685 spin_lock(&dlm->ast_lock); 686 spin_lock(&res->spinlock); 687 if (res->owner != dlm->node_num) { 688 __dlm_print_one_lock_resource(res); 689 mlog(ML_ERROR, "%s: inprog %d, mig %d, reco %d," 690 " dirty %d\n", dlm->name, 691 !!(res->state & DLM_LOCK_RES_IN_PROGRESS), 692 !!(res->state & DLM_LOCK_RES_MIGRATING), 693 !!(res->state & DLM_LOCK_RES_RECOVERING), 694 !!(res->state & DLM_LOCK_RES_DIRTY)); 695 } 696 BUG_ON(res->owner != dlm->node_num); 697 698 /* it is now ok to move lockreses in these states 699 * to the dirty list, assuming that they will only be 700 * dirty for a short while. */ 701 BUG_ON(res->state & DLM_LOCK_RES_MIGRATING); 702 if (res->state & (DLM_LOCK_RES_IN_PROGRESS | 703 DLM_LOCK_RES_RECOVERING)) { 704 /* move it to the tail and keep going */ 705 res->state &= ~DLM_LOCK_RES_DIRTY; 706 spin_unlock(&res->spinlock); 707 spin_unlock(&dlm->ast_lock); 708 mlog(0, "%s: res %.*s, inprogress, delay list " 709 "shuffle, state %d\n", dlm->name, 710 res->lockname.len, res->lockname.name, 711 res->state); 712 delay = 1; 713 goto in_progress; 714 } 715 716 /* at this point the lockres is not migrating/ 717 * recovering/in-progress. we have the lockres 718 * spinlock and do NOT have the dlm lock. 719 * safe to reserve/queue asts and run the lists. */ 720 721 /* called while holding lockres lock */ 722 dlm_shuffle_lists(dlm, res); 723 res->state &= ~DLM_LOCK_RES_DIRTY; 724 spin_unlock(&res->spinlock); 725 spin_unlock(&dlm->ast_lock); 726 727 dlm_lockres_calc_usage(dlm, res); 728 729 in_progress: 730 731 spin_lock(&dlm->spinlock); 732 /* if the lock was in-progress, stick 733 * it on the back of the list */ 734 if (delay) { 735 spin_lock(&res->spinlock); 736 __dlm_dirty_lockres(dlm, res); 737 spin_unlock(&res->spinlock); 738 } 739 dlm_lockres_put(res); 740 741 /* unlikely, but we may need to give time to 742 * other tasks */ 743 if (!--n) { 744 mlog(0, "%s: Throttling dlm thread\n", 745 dlm->name); 746 break; 747 } 748 } 749 750 spin_unlock(&dlm->spinlock); 751 dlm_flush_asts(dlm); 752 753 /* yield and continue right away if there is more work to do */ 754 if (!n) { 755 cond_resched(); 756 continue; 757 } 758 759 wait_event_interruptible_timeout(dlm->dlm_thread_wq, 760 !dlm_dirty_list_empty(dlm) || 761 kthread_should_stop(), 762 timeout); 763 } 764 765 mlog(0, "quitting DLM thread\n"); 766 return 0; 767 } 768