1 /* -*- mode: c; c-basic-offset: 8; -*- 2 * vim: noexpandtab sw=8 ts=8 sts=0: 3 * 4 * dlmthread.c 5 * 6 * standalone DLM module 7 * 8 * Copyright (C) 2004 Oracle. All rights reserved. 9 * 10 * This program is free software; you can redistribute it and/or 11 * modify it under the terms of the GNU General Public 12 * License as published by the Free Software Foundation; either 13 * version 2 of the License, or (at your option) any later version. 14 * 15 * This program is distributed in the hope that it will be useful, 16 * but WITHOUT ANY WARRANTY; without even the implied warranty of 17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 18 * General Public License for more details. 19 * 20 * You should have received a copy of the GNU General Public 21 * License along with this program; if not, write to the 22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 23 * Boston, MA 021110-1307, USA. 24 * 25 */ 26 27 28 #include <linux/module.h> 29 #include <linux/fs.h> 30 #include <linux/types.h> 31 #include <linux/highmem.h> 32 #include <linux/init.h> 33 #include <linux/sysctl.h> 34 #include <linux/random.h> 35 #include <linux/blkdev.h> 36 #include <linux/socket.h> 37 #include <linux/inet.h> 38 #include <linux/timer.h> 39 #include <linux/kthread.h> 40 #include <linux/delay.h> 41 42 43 #include "cluster/heartbeat.h" 44 #include "cluster/nodemanager.h" 45 #include "cluster/tcp.h" 46 47 #include "dlmapi.h" 48 #include "dlmcommon.h" 49 #include "dlmdomain.h" 50 51 #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_THREAD) 52 #include "cluster/masklog.h" 53 54 static int dlm_thread(void *data); 55 static void dlm_flush_asts(struct dlm_ctxt *dlm); 56 57 #define dlm_lock_is_remote(dlm, lock) ((lock)->ml.node != (dlm)->node_num) 58 59 /* will exit holding res->spinlock, but may drop in function */ 60 /* waits until flags are cleared on res->state */ 61 void __dlm_wait_on_lockres_flags(struct dlm_lock_resource *res, int flags) 62 { 63 DECLARE_WAITQUEUE(wait, current); 64 65 assert_spin_locked(&res->spinlock); 66 67 add_wait_queue(&res->wq, &wait); 68 repeat: 69 set_current_state(TASK_UNINTERRUPTIBLE); 70 if (res->state & flags) { 71 spin_unlock(&res->spinlock); 72 schedule(); 73 spin_lock(&res->spinlock); 74 goto repeat; 75 } 76 remove_wait_queue(&res->wq, &wait); 77 __set_current_state(TASK_RUNNING); 78 } 79 80 int __dlm_lockres_has_locks(struct dlm_lock_resource *res) 81 { 82 if (list_empty(&res->granted) && 83 list_empty(&res->converting) && 84 list_empty(&res->blocked)) 85 return 0; 86 return 1; 87 } 88 89 /* "unused": the lockres has no locks, is not on the dirty list, 90 * has no inflight locks (in the gap between mastery and acquiring 91 * the first lock), and has no bits in its refmap. 92 * truly ready to be freed. */ 93 int __dlm_lockres_unused(struct dlm_lock_resource *res) 94 { 95 int bit; 96 97 assert_spin_locked(&res->spinlock); 98 99 if (__dlm_lockres_has_locks(res)) 100 return 0; 101 102 /* Locks are in the process of being created */ 103 if (res->inflight_locks) 104 return 0; 105 106 if (!list_empty(&res->dirty) || res->state & DLM_LOCK_RES_DIRTY) 107 return 0; 108 109 if (res->state & DLM_LOCK_RES_RECOVERING) 110 return 0; 111 112 /* Another node has this resource with this node as the master */ 113 bit = find_next_bit(res->refmap, O2NM_MAX_NODES, 0); 114 if (bit < O2NM_MAX_NODES) 115 return 0; 116 117 return 1; 118 } 119 120 121 /* Call whenever you may have added or deleted something from one of 122 * the lockres queue's. This will figure out whether it belongs on the 123 * unused list or not and does the appropriate thing. */ 124 void __dlm_lockres_calc_usage(struct dlm_ctxt *dlm, 125 struct dlm_lock_resource *res) 126 { 127 assert_spin_locked(&dlm->spinlock); 128 assert_spin_locked(&res->spinlock); 129 130 if (__dlm_lockres_unused(res)){ 131 if (list_empty(&res->purge)) { 132 mlog(0, "%s: Adding res %.*s to purge list\n", 133 dlm->name, res->lockname.len, res->lockname.name); 134 135 res->last_used = jiffies; 136 dlm_lockres_get(res); 137 list_add_tail(&res->purge, &dlm->purge_list); 138 dlm->purge_count++; 139 } 140 } else if (!list_empty(&res->purge)) { 141 mlog(0, "%s: Removing res %.*s from purge list\n", 142 dlm->name, res->lockname.len, res->lockname.name); 143 144 list_del_init(&res->purge); 145 dlm_lockres_put(res); 146 dlm->purge_count--; 147 } 148 } 149 150 void dlm_lockres_calc_usage(struct dlm_ctxt *dlm, 151 struct dlm_lock_resource *res) 152 { 153 spin_lock(&dlm->spinlock); 154 spin_lock(&res->spinlock); 155 156 __dlm_lockres_calc_usage(dlm, res); 157 158 spin_unlock(&res->spinlock); 159 spin_unlock(&dlm->spinlock); 160 } 161 162 static void dlm_purge_lockres(struct dlm_ctxt *dlm, 163 struct dlm_lock_resource *res) 164 { 165 int master; 166 int ret = 0; 167 168 assert_spin_locked(&dlm->spinlock); 169 assert_spin_locked(&res->spinlock); 170 171 master = (res->owner == dlm->node_num); 172 173 mlog(0, "%s: Purging res %.*s, master %d\n", dlm->name, 174 res->lockname.len, res->lockname.name, master); 175 176 if (!master) { 177 res->state |= DLM_LOCK_RES_DROPPING_REF; 178 /* drop spinlock... retake below */ 179 spin_unlock(&res->spinlock); 180 spin_unlock(&dlm->spinlock); 181 182 spin_lock(&res->spinlock); 183 /* This ensures that clear refmap is sent after the set */ 184 __dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG); 185 spin_unlock(&res->spinlock); 186 187 /* clear our bit from the master's refmap, ignore errors */ 188 ret = dlm_drop_lockres_ref(dlm, res); 189 if (ret < 0) { 190 if (!dlm_is_host_down(ret)) 191 BUG(); 192 } 193 spin_lock(&dlm->spinlock); 194 spin_lock(&res->spinlock); 195 } 196 197 if (!list_empty(&res->purge)) { 198 mlog(0, "%s: Removing res %.*s from purgelist, master %d\n", 199 dlm->name, res->lockname.len, res->lockname.name, master); 200 list_del_init(&res->purge); 201 dlm_lockres_put(res); 202 dlm->purge_count--; 203 } 204 205 if (!__dlm_lockres_unused(res)) { 206 mlog(ML_ERROR, "%s: res %.*s in use after deref\n", 207 dlm->name, res->lockname.len, res->lockname.name); 208 __dlm_print_one_lock_resource(res); 209 BUG(); 210 } 211 212 __dlm_unhash_lockres(dlm, res); 213 214 spin_lock(&dlm->track_lock); 215 if (!list_empty(&res->tracking)) 216 list_del_init(&res->tracking); 217 else { 218 mlog(ML_ERROR, "Resource %.*s not on the Tracking list\n", 219 res->lockname.len, res->lockname.name); 220 __dlm_print_one_lock_resource(res); 221 } 222 spin_unlock(&dlm->track_lock); 223 224 /* lockres is not in the hash now. drop the flag and wake up 225 * any processes waiting in dlm_get_lock_resource. */ 226 if (!master) { 227 res->state &= ~DLM_LOCK_RES_DROPPING_REF; 228 spin_unlock(&res->spinlock); 229 wake_up(&res->wq); 230 } else 231 spin_unlock(&res->spinlock); 232 } 233 234 static void dlm_run_purge_list(struct dlm_ctxt *dlm, 235 int purge_now) 236 { 237 unsigned int run_max, unused; 238 unsigned long purge_jiffies; 239 struct dlm_lock_resource *lockres; 240 241 spin_lock(&dlm->spinlock); 242 run_max = dlm->purge_count; 243 244 while(run_max && !list_empty(&dlm->purge_list)) { 245 run_max--; 246 247 lockres = list_entry(dlm->purge_list.next, 248 struct dlm_lock_resource, purge); 249 250 spin_lock(&lockres->spinlock); 251 252 purge_jiffies = lockres->last_used + 253 msecs_to_jiffies(DLM_PURGE_INTERVAL_MS); 254 255 /* Make sure that we want to be processing this guy at 256 * this time. */ 257 if (!purge_now && time_after(purge_jiffies, jiffies)) { 258 /* Since resources are added to the purge list 259 * in tail order, we can stop at the first 260 * unpurgable resource -- anyone added after 261 * him will have a greater last_used value */ 262 spin_unlock(&lockres->spinlock); 263 break; 264 } 265 266 /* Status of the lockres *might* change so double 267 * check. If the lockres is unused, holding the dlm 268 * spinlock will prevent people from getting and more 269 * refs on it. */ 270 unused = __dlm_lockres_unused(lockres); 271 if (!unused || 272 (lockres->state & DLM_LOCK_RES_MIGRATING) || 273 (lockres->inflight_assert_workers != 0)) { 274 mlog(0, "%s: res %.*s is in use or being remastered, " 275 "used %d, state %d, assert master workers %u\n", 276 dlm->name, lockres->lockname.len, 277 lockres->lockname.name, 278 !unused, lockres->state, 279 lockres->inflight_assert_workers); 280 list_move_tail(&lockres->purge, &dlm->purge_list); 281 spin_unlock(&lockres->spinlock); 282 continue; 283 } 284 285 dlm_lockres_get(lockres); 286 287 dlm_purge_lockres(dlm, lockres); 288 289 dlm_lockres_put(lockres); 290 291 /* Avoid adding any scheduling latencies */ 292 cond_resched_lock(&dlm->spinlock); 293 } 294 295 spin_unlock(&dlm->spinlock); 296 } 297 298 static void dlm_shuffle_lists(struct dlm_ctxt *dlm, 299 struct dlm_lock_resource *res) 300 { 301 struct dlm_lock *lock, *target; 302 int can_grant = 1; 303 304 /* 305 * Because this function is called with the lockres 306 * spinlock, and because we know that it is not migrating/ 307 * recovering/in-progress, it is fine to reserve asts and 308 * basts right before queueing them all throughout 309 */ 310 assert_spin_locked(&dlm->ast_lock); 311 assert_spin_locked(&res->spinlock); 312 BUG_ON((res->state & (DLM_LOCK_RES_MIGRATING| 313 DLM_LOCK_RES_RECOVERING| 314 DLM_LOCK_RES_IN_PROGRESS))); 315 316 converting: 317 if (list_empty(&res->converting)) 318 goto blocked; 319 mlog(0, "%s: res %.*s has locks on the convert queue\n", dlm->name, 320 res->lockname.len, res->lockname.name); 321 322 target = list_entry(res->converting.next, struct dlm_lock, list); 323 if (target->ml.convert_type == LKM_IVMODE) { 324 mlog(ML_ERROR, "%s: res %.*s converting lock to invalid mode\n", 325 dlm->name, res->lockname.len, res->lockname.name); 326 BUG(); 327 } 328 list_for_each_entry(lock, &res->granted, list) { 329 if (lock==target) 330 continue; 331 if (!dlm_lock_compatible(lock->ml.type, 332 target->ml.convert_type)) { 333 can_grant = 0; 334 /* queue the BAST if not already */ 335 if (lock->ml.highest_blocked == LKM_IVMODE) { 336 __dlm_lockres_reserve_ast(res); 337 __dlm_queue_bast(dlm, lock); 338 } 339 /* update the highest_blocked if needed */ 340 if (lock->ml.highest_blocked < target->ml.convert_type) 341 lock->ml.highest_blocked = 342 target->ml.convert_type; 343 } 344 } 345 346 list_for_each_entry(lock, &res->converting, list) { 347 if (lock==target) 348 continue; 349 if (!dlm_lock_compatible(lock->ml.type, 350 target->ml.convert_type)) { 351 can_grant = 0; 352 if (lock->ml.highest_blocked == LKM_IVMODE) { 353 __dlm_lockres_reserve_ast(res); 354 __dlm_queue_bast(dlm, lock); 355 } 356 if (lock->ml.highest_blocked < target->ml.convert_type) 357 lock->ml.highest_blocked = 358 target->ml.convert_type; 359 } 360 } 361 362 /* we can convert the lock */ 363 if (can_grant) { 364 spin_lock(&target->spinlock); 365 BUG_ON(target->ml.highest_blocked != LKM_IVMODE); 366 367 mlog(0, "%s: res %.*s, AST for Converting lock %u:%llu, type " 368 "%d => %d, node %u\n", dlm->name, res->lockname.len, 369 res->lockname.name, 370 dlm_get_lock_cookie_node(be64_to_cpu(target->ml.cookie)), 371 dlm_get_lock_cookie_seq(be64_to_cpu(target->ml.cookie)), 372 target->ml.type, 373 target->ml.convert_type, target->ml.node); 374 375 target->ml.type = target->ml.convert_type; 376 target->ml.convert_type = LKM_IVMODE; 377 list_move_tail(&target->list, &res->granted); 378 379 BUG_ON(!target->lksb); 380 target->lksb->status = DLM_NORMAL; 381 382 spin_unlock(&target->spinlock); 383 384 __dlm_lockres_reserve_ast(res); 385 __dlm_queue_ast(dlm, target); 386 /* go back and check for more */ 387 goto converting; 388 } 389 390 blocked: 391 if (list_empty(&res->blocked)) 392 goto leave; 393 target = list_entry(res->blocked.next, struct dlm_lock, list); 394 395 list_for_each_entry(lock, &res->granted, list) { 396 if (lock==target) 397 continue; 398 if (!dlm_lock_compatible(lock->ml.type, target->ml.type)) { 399 can_grant = 0; 400 if (lock->ml.highest_blocked == LKM_IVMODE) { 401 __dlm_lockres_reserve_ast(res); 402 __dlm_queue_bast(dlm, lock); 403 } 404 if (lock->ml.highest_blocked < target->ml.type) 405 lock->ml.highest_blocked = target->ml.type; 406 } 407 } 408 409 list_for_each_entry(lock, &res->converting, list) { 410 if (lock==target) 411 continue; 412 if (!dlm_lock_compatible(lock->ml.type, target->ml.type)) { 413 can_grant = 0; 414 if (lock->ml.highest_blocked == LKM_IVMODE) { 415 __dlm_lockres_reserve_ast(res); 416 __dlm_queue_bast(dlm, lock); 417 } 418 if (lock->ml.highest_blocked < target->ml.type) 419 lock->ml.highest_blocked = target->ml.type; 420 } 421 } 422 423 /* we can grant the blocked lock (only 424 * possible if converting list empty) */ 425 if (can_grant) { 426 spin_lock(&target->spinlock); 427 BUG_ON(target->ml.highest_blocked != LKM_IVMODE); 428 429 mlog(0, "%s: res %.*s, AST for Blocked lock %u:%llu, type %d, " 430 "node %u\n", dlm->name, res->lockname.len, 431 res->lockname.name, 432 dlm_get_lock_cookie_node(be64_to_cpu(target->ml.cookie)), 433 dlm_get_lock_cookie_seq(be64_to_cpu(target->ml.cookie)), 434 target->ml.type, target->ml.node); 435 436 /* target->ml.type is already correct */ 437 list_move_tail(&target->list, &res->granted); 438 439 BUG_ON(!target->lksb); 440 target->lksb->status = DLM_NORMAL; 441 442 spin_unlock(&target->spinlock); 443 444 __dlm_lockres_reserve_ast(res); 445 __dlm_queue_ast(dlm, target); 446 /* go back and check for more */ 447 goto converting; 448 } 449 450 leave: 451 return; 452 } 453 454 /* must have NO locks when calling this with res !=NULL * */ 455 void dlm_kick_thread(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) 456 { 457 if (res) { 458 spin_lock(&dlm->spinlock); 459 spin_lock(&res->spinlock); 460 __dlm_dirty_lockres(dlm, res); 461 spin_unlock(&res->spinlock); 462 spin_unlock(&dlm->spinlock); 463 } 464 wake_up(&dlm->dlm_thread_wq); 465 } 466 467 void __dlm_dirty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) 468 { 469 assert_spin_locked(&dlm->spinlock); 470 assert_spin_locked(&res->spinlock); 471 472 /* don't shuffle secondary queues */ 473 if ((res->owner == dlm->node_num)) { 474 if (res->state & (DLM_LOCK_RES_MIGRATING | 475 DLM_LOCK_RES_BLOCK_DIRTY)) 476 return; 477 478 if (list_empty(&res->dirty)) { 479 /* ref for dirty_list */ 480 dlm_lockres_get(res); 481 list_add_tail(&res->dirty, &dlm->dirty_list); 482 res->state |= DLM_LOCK_RES_DIRTY; 483 } 484 } 485 486 mlog(0, "%s: res %.*s\n", dlm->name, res->lockname.len, 487 res->lockname.name); 488 } 489 490 491 /* Launch the NM thread for the mounted volume */ 492 int dlm_launch_thread(struct dlm_ctxt *dlm) 493 { 494 mlog(0, "Starting dlm_thread...\n"); 495 496 dlm->dlm_thread_task = kthread_run(dlm_thread, dlm, "dlm_thread"); 497 if (IS_ERR(dlm->dlm_thread_task)) { 498 mlog_errno(PTR_ERR(dlm->dlm_thread_task)); 499 dlm->dlm_thread_task = NULL; 500 return -EINVAL; 501 } 502 503 return 0; 504 } 505 506 void dlm_complete_thread(struct dlm_ctxt *dlm) 507 { 508 if (dlm->dlm_thread_task) { 509 mlog(ML_KTHREAD, "Waiting for dlm thread to exit\n"); 510 kthread_stop(dlm->dlm_thread_task); 511 dlm->dlm_thread_task = NULL; 512 } 513 } 514 515 static int dlm_dirty_list_empty(struct dlm_ctxt *dlm) 516 { 517 int empty; 518 519 spin_lock(&dlm->spinlock); 520 empty = list_empty(&dlm->dirty_list); 521 spin_unlock(&dlm->spinlock); 522 523 return empty; 524 } 525 526 static void dlm_flush_asts(struct dlm_ctxt *dlm) 527 { 528 int ret; 529 struct dlm_lock *lock; 530 struct dlm_lock_resource *res; 531 u8 hi; 532 533 spin_lock(&dlm->ast_lock); 534 while (!list_empty(&dlm->pending_asts)) { 535 lock = list_entry(dlm->pending_asts.next, 536 struct dlm_lock, ast_list); 537 /* get an extra ref on lock */ 538 dlm_lock_get(lock); 539 res = lock->lockres; 540 mlog(0, "%s: res %.*s, Flush AST for lock %u:%llu, type %d, " 541 "node %u\n", dlm->name, res->lockname.len, 542 res->lockname.name, 543 dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)), 544 dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)), 545 lock->ml.type, lock->ml.node); 546 547 BUG_ON(!lock->ast_pending); 548 549 /* remove from list (including ref) */ 550 list_del_init(&lock->ast_list); 551 dlm_lock_put(lock); 552 spin_unlock(&dlm->ast_lock); 553 554 if (lock->ml.node != dlm->node_num) { 555 ret = dlm_do_remote_ast(dlm, res, lock); 556 if (ret < 0) 557 mlog_errno(ret); 558 } else 559 dlm_do_local_ast(dlm, res, lock); 560 561 spin_lock(&dlm->ast_lock); 562 563 /* possible that another ast was queued while 564 * we were delivering the last one */ 565 if (!list_empty(&lock->ast_list)) { 566 mlog(0, "%s: res %.*s, AST queued while flushing last " 567 "one\n", dlm->name, res->lockname.len, 568 res->lockname.name); 569 } else 570 lock->ast_pending = 0; 571 572 /* drop the extra ref. 573 * this may drop it completely. */ 574 dlm_lock_put(lock); 575 dlm_lockres_release_ast(dlm, res); 576 } 577 578 while (!list_empty(&dlm->pending_basts)) { 579 lock = list_entry(dlm->pending_basts.next, 580 struct dlm_lock, bast_list); 581 /* get an extra ref on lock */ 582 dlm_lock_get(lock); 583 res = lock->lockres; 584 585 BUG_ON(!lock->bast_pending); 586 587 /* get the highest blocked lock, and reset */ 588 spin_lock(&lock->spinlock); 589 BUG_ON(lock->ml.highest_blocked <= LKM_IVMODE); 590 hi = lock->ml.highest_blocked; 591 lock->ml.highest_blocked = LKM_IVMODE; 592 spin_unlock(&lock->spinlock); 593 594 /* remove from list (including ref) */ 595 list_del_init(&lock->bast_list); 596 dlm_lock_put(lock); 597 spin_unlock(&dlm->ast_lock); 598 599 mlog(0, "%s: res %.*s, Flush BAST for lock %u:%llu, " 600 "blocked %d, node %u\n", 601 dlm->name, res->lockname.len, res->lockname.name, 602 dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)), 603 dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)), 604 hi, lock->ml.node); 605 606 if (lock->ml.node != dlm->node_num) { 607 ret = dlm_send_proxy_bast(dlm, res, lock, hi); 608 if (ret < 0) 609 mlog_errno(ret); 610 } else 611 dlm_do_local_bast(dlm, res, lock, hi); 612 613 spin_lock(&dlm->ast_lock); 614 615 /* possible that another bast was queued while 616 * we were delivering the last one */ 617 if (!list_empty(&lock->bast_list)) { 618 mlog(0, "%s: res %.*s, BAST queued while flushing last " 619 "one\n", dlm->name, res->lockname.len, 620 res->lockname.name); 621 } else 622 lock->bast_pending = 0; 623 624 /* drop the extra ref. 625 * this may drop it completely. */ 626 dlm_lock_put(lock); 627 dlm_lockres_release_ast(dlm, res); 628 } 629 wake_up(&dlm->ast_wq); 630 spin_unlock(&dlm->ast_lock); 631 } 632 633 634 #define DLM_THREAD_TIMEOUT_MS (4 * 1000) 635 #define DLM_THREAD_MAX_DIRTY 100 636 #define DLM_THREAD_MAX_ASTS 10 637 638 static int dlm_thread(void *data) 639 { 640 struct dlm_lock_resource *res; 641 struct dlm_ctxt *dlm = data; 642 unsigned long timeout = msecs_to_jiffies(DLM_THREAD_TIMEOUT_MS); 643 644 mlog(0, "dlm thread running for %s...\n", dlm->name); 645 646 while (!kthread_should_stop()) { 647 int n = DLM_THREAD_MAX_DIRTY; 648 649 /* dlm_shutting_down is very point-in-time, but that 650 * doesn't matter as we'll just loop back around if we 651 * get false on the leading edge of a state 652 * transition. */ 653 dlm_run_purge_list(dlm, dlm_shutting_down(dlm)); 654 655 /* We really don't want to hold dlm->spinlock while 656 * calling dlm_shuffle_lists on each lockres that 657 * needs to have its queues adjusted and AST/BASTs 658 * run. So let's pull each entry off the dirty_list 659 * and drop dlm->spinlock ASAP. Once off the list, 660 * res->spinlock needs to be taken again to protect 661 * the queues while calling dlm_shuffle_lists. */ 662 spin_lock(&dlm->spinlock); 663 while (!list_empty(&dlm->dirty_list)) { 664 int delay = 0; 665 res = list_entry(dlm->dirty_list.next, 666 struct dlm_lock_resource, dirty); 667 668 /* peel a lockres off, remove it from the list, 669 * unset the dirty flag and drop the dlm lock */ 670 BUG_ON(!res); 671 dlm_lockres_get(res); 672 673 spin_lock(&res->spinlock); 674 /* We clear the DLM_LOCK_RES_DIRTY state once we shuffle lists below */ 675 list_del_init(&res->dirty); 676 spin_unlock(&res->spinlock); 677 spin_unlock(&dlm->spinlock); 678 /* Drop dirty_list ref */ 679 dlm_lockres_put(res); 680 681 /* lockres can be re-dirtied/re-added to the 682 * dirty_list in this gap, but that is ok */ 683 684 spin_lock(&dlm->ast_lock); 685 spin_lock(&res->spinlock); 686 if (res->owner != dlm->node_num) { 687 __dlm_print_one_lock_resource(res); 688 mlog(ML_ERROR, "%s: inprog %d, mig %d, reco %d," 689 " dirty %d\n", dlm->name, 690 !!(res->state & DLM_LOCK_RES_IN_PROGRESS), 691 !!(res->state & DLM_LOCK_RES_MIGRATING), 692 !!(res->state & DLM_LOCK_RES_RECOVERING), 693 !!(res->state & DLM_LOCK_RES_DIRTY)); 694 } 695 BUG_ON(res->owner != dlm->node_num); 696 697 /* it is now ok to move lockreses in these states 698 * to the dirty list, assuming that they will only be 699 * dirty for a short while. */ 700 BUG_ON(res->state & DLM_LOCK_RES_MIGRATING); 701 if (res->state & (DLM_LOCK_RES_IN_PROGRESS | 702 DLM_LOCK_RES_RECOVERING)) { 703 /* move it to the tail and keep going */ 704 res->state &= ~DLM_LOCK_RES_DIRTY; 705 spin_unlock(&res->spinlock); 706 spin_unlock(&dlm->ast_lock); 707 mlog(0, "%s: res %.*s, inprogress, delay list " 708 "shuffle, state %d\n", dlm->name, 709 res->lockname.len, res->lockname.name, 710 res->state); 711 delay = 1; 712 goto in_progress; 713 } 714 715 /* at this point the lockres is not migrating/ 716 * recovering/in-progress. we have the lockres 717 * spinlock and do NOT have the dlm lock. 718 * safe to reserve/queue asts and run the lists. */ 719 720 /* called while holding lockres lock */ 721 dlm_shuffle_lists(dlm, res); 722 res->state &= ~DLM_LOCK_RES_DIRTY; 723 spin_unlock(&res->spinlock); 724 spin_unlock(&dlm->ast_lock); 725 726 dlm_lockres_calc_usage(dlm, res); 727 728 in_progress: 729 730 spin_lock(&dlm->spinlock); 731 /* if the lock was in-progress, stick 732 * it on the back of the list */ 733 if (delay) { 734 spin_lock(&res->spinlock); 735 __dlm_dirty_lockres(dlm, res); 736 spin_unlock(&res->spinlock); 737 } 738 dlm_lockres_put(res); 739 740 /* unlikely, but we may need to give time to 741 * other tasks */ 742 if (!--n) { 743 mlog(0, "%s: Throttling dlm thread\n", 744 dlm->name); 745 break; 746 } 747 } 748 749 spin_unlock(&dlm->spinlock); 750 dlm_flush_asts(dlm); 751 752 /* yield and continue right away if there is more work to do */ 753 if (!n) { 754 cond_resched(); 755 continue; 756 } 757 758 wait_event_interruptible_timeout(dlm->dlm_thread_wq, 759 !dlm_dirty_list_empty(dlm) || 760 kthread_should_stop(), 761 timeout); 762 } 763 764 mlog(0, "quitting DLM thread\n"); 765 return 0; 766 } 767