1 /* -*- mode: c; c-basic-offset: 8; -*- 2 * vim: noexpandtab sw=8 ts=8 sts=0: 3 * 4 * dlmthread.c 5 * 6 * standalone DLM module 7 * 8 * Copyright (C) 2004 Oracle. All rights reserved. 9 * 10 * This program is free software; you can redistribute it and/or 11 * modify it under the terms of the GNU General Public 12 * License as published by the Free Software Foundation; either 13 * version 2 of the License, or (at your option) any later version. 14 * 15 * This program is distributed in the hope that it will be useful, 16 * but WITHOUT ANY WARRANTY; without even the implied warranty of 17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 18 * General Public License for more details. 19 * 20 * You should have received a copy of the GNU General Public 21 * License along with this program; if not, write to the 22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 23 * Boston, MA 021110-1307, USA. 24 * 25 */ 26 27 28 #include <linux/module.h> 29 #include <linux/fs.h> 30 #include <linux/types.h> 31 #include <linux/highmem.h> 32 #include <linux/init.h> 33 #include <linux/sysctl.h> 34 #include <linux/random.h> 35 #include <linux/blkdev.h> 36 #include <linux/socket.h> 37 #include <linux/inet.h> 38 #include <linux/timer.h> 39 #include <linux/kthread.h> 40 #include <linux/delay.h> 41 42 43 #include "cluster/heartbeat.h" 44 #include "cluster/nodemanager.h" 45 #include "cluster/tcp.h" 46 47 #include "dlmapi.h" 48 #include "dlmcommon.h" 49 #include "dlmdomain.h" 50 51 #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_THREAD) 52 #include "cluster/masklog.h" 53 54 static int dlm_thread(void *data); 55 static void dlm_flush_asts(struct dlm_ctxt *dlm); 56 57 #define dlm_lock_is_remote(dlm, lock) ((lock)->ml.node != (dlm)->node_num) 58 59 /* will exit holding res->spinlock, but may drop in function */ 60 /* waits until flags are cleared on res->state */ 61 void __dlm_wait_on_lockres_flags(struct dlm_lock_resource *res, int flags) 62 { 63 DECLARE_WAITQUEUE(wait, current); 64 65 assert_spin_locked(&res->spinlock); 66 67 add_wait_queue(&res->wq, &wait); 68 repeat: 69 set_current_state(TASK_UNINTERRUPTIBLE); 70 if (res->state & flags) { 71 spin_unlock(&res->spinlock); 72 schedule(); 73 spin_lock(&res->spinlock); 74 goto repeat; 75 } 76 remove_wait_queue(&res->wq, &wait); 77 __set_current_state(TASK_RUNNING); 78 } 79 80 int __dlm_lockres_has_locks(struct dlm_lock_resource *res) 81 { 82 if (list_empty(&res->granted) && 83 list_empty(&res->converting) && 84 list_empty(&res->blocked)) 85 return 0; 86 return 1; 87 } 88 89 /* "unused": the lockres has no locks, is not on the dirty list, 90 * has no inflight locks (in the gap between mastery and acquiring 91 * the first lock), and has no bits in its refmap. 92 * truly ready to be freed. */ 93 int __dlm_lockres_unused(struct dlm_lock_resource *res) 94 { 95 int bit; 96 97 assert_spin_locked(&res->spinlock); 98 99 if (__dlm_lockres_has_locks(res)) 100 return 0; 101 102 /* Locks are in the process of being created */ 103 if (res->inflight_locks) 104 return 0; 105 106 if (!list_empty(&res->dirty) || res->state & DLM_LOCK_RES_DIRTY) 107 return 0; 108 109 if (res->state & (DLM_LOCK_RES_RECOVERING| 110 DLM_LOCK_RES_RECOVERY_WAITING)) 111 return 0; 112 113 /* Another node has this resource with this node as the master */ 114 bit = find_next_bit(res->refmap, O2NM_MAX_NODES, 0); 115 if (bit < O2NM_MAX_NODES) 116 return 0; 117 118 return 1; 119 } 120 121 122 /* Call whenever you may have added or deleted something from one of 123 * the lockres queue's. This will figure out whether it belongs on the 124 * unused list or not and does the appropriate thing. */ 125 void __dlm_lockres_calc_usage(struct dlm_ctxt *dlm, 126 struct dlm_lock_resource *res) 127 { 128 assert_spin_locked(&dlm->spinlock); 129 assert_spin_locked(&res->spinlock); 130 131 if (__dlm_lockres_unused(res)){ 132 if (list_empty(&res->purge)) { 133 mlog(0, "%s: Adding res %.*s to purge list\n", 134 dlm->name, res->lockname.len, res->lockname.name); 135 136 res->last_used = jiffies; 137 dlm_lockres_get(res); 138 list_add_tail(&res->purge, &dlm->purge_list); 139 dlm->purge_count++; 140 } 141 } else if (!list_empty(&res->purge)) { 142 mlog(0, "%s: Removing res %.*s from purge list\n", 143 dlm->name, res->lockname.len, res->lockname.name); 144 145 list_del_init(&res->purge); 146 dlm_lockres_put(res); 147 dlm->purge_count--; 148 } 149 } 150 151 void dlm_lockres_calc_usage(struct dlm_ctxt *dlm, 152 struct dlm_lock_resource *res) 153 { 154 spin_lock(&dlm->spinlock); 155 spin_lock(&res->spinlock); 156 157 __dlm_lockres_calc_usage(dlm, res); 158 159 spin_unlock(&res->spinlock); 160 spin_unlock(&dlm->spinlock); 161 } 162 163 static void dlm_purge_lockres(struct dlm_ctxt *dlm, 164 struct dlm_lock_resource *res) 165 { 166 int master; 167 int ret = 0; 168 169 assert_spin_locked(&dlm->spinlock); 170 assert_spin_locked(&res->spinlock); 171 172 master = (res->owner == dlm->node_num); 173 174 mlog(0, "%s: Purging res %.*s, master %d\n", dlm->name, 175 res->lockname.len, res->lockname.name, master); 176 177 if (!master) { 178 res->state |= DLM_LOCK_RES_DROPPING_REF; 179 /* drop spinlock... retake below */ 180 spin_unlock(&res->spinlock); 181 spin_unlock(&dlm->spinlock); 182 183 spin_lock(&res->spinlock); 184 /* This ensures that clear refmap is sent after the set */ 185 __dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG); 186 spin_unlock(&res->spinlock); 187 188 /* clear our bit from the master's refmap, ignore errors */ 189 ret = dlm_drop_lockres_ref(dlm, res); 190 if (ret < 0) { 191 if (!dlm_is_host_down(ret)) 192 BUG(); 193 } 194 spin_lock(&dlm->spinlock); 195 spin_lock(&res->spinlock); 196 } 197 198 if (!list_empty(&res->purge)) { 199 mlog(0, "%s: Removing res %.*s from purgelist, master %d\n", 200 dlm->name, res->lockname.len, res->lockname.name, master); 201 list_del_init(&res->purge); 202 dlm_lockres_put(res); 203 dlm->purge_count--; 204 } 205 206 if (!master && ret != 0) { 207 mlog(0, "%s: deref %.*s in progress or master goes down\n", 208 dlm->name, res->lockname.len, res->lockname.name); 209 spin_unlock(&res->spinlock); 210 return; 211 } 212 213 if (!__dlm_lockres_unused(res)) { 214 mlog(ML_ERROR, "%s: res %.*s in use after deref\n", 215 dlm->name, res->lockname.len, res->lockname.name); 216 __dlm_print_one_lock_resource(res); 217 BUG(); 218 } 219 220 __dlm_unhash_lockres(dlm, res); 221 222 spin_lock(&dlm->track_lock); 223 if (!list_empty(&res->tracking)) 224 list_del_init(&res->tracking); 225 else { 226 mlog(ML_ERROR, "Resource %.*s not on the Tracking list\n", 227 res->lockname.len, res->lockname.name); 228 __dlm_print_one_lock_resource(res); 229 } 230 spin_unlock(&dlm->track_lock); 231 232 /* lockres is not in the hash now. drop the flag and wake up 233 * any processes waiting in dlm_get_lock_resource. */ 234 if (!master) { 235 res->state &= ~DLM_LOCK_RES_DROPPING_REF; 236 spin_unlock(&res->spinlock); 237 wake_up(&res->wq); 238 } else 239 spin_unlock(&res->spinlock); 240 } 241 242 static void dlm_run_purge_list(struct dlm_ctxt *dlm, 243 int purge_now) 244 { 245 unsigned int run_max, unused; 246 unsigned long purge_jiffies; 247 struct dlm_lock_resource *lockres; 248 249 spin_lock(&dlm->spinlock); 250 run_max = dlm->purge_count; 251 252 while(run_max && !list_empty(&dlm->purge_list)) { 253 run_max--; 254 255 lockres = list_entry(dlm->purge_list.next, 256 struct dlm_lock_resource, purge); 257 258 spin_lock(&lockres->spinlock); 259 260 purge_jiffies = lockres->last_used + 261 msecs_to_jiffies(DLM_PURGE_INTERVAL_MS); 262 263 /* Make sure that we want to be processing this guy at 264 * this time. */ 265 if (!purge_now && time_after(purge_jiffies, jiffies)) { 266 /* Since resources are added to the purge list 267 * in tail order, we can stop at the first 268 * unpurgable resource -- anyone added after 269 * him will have a greater last_used value */ 270 spin_unlock(&lockres->spinlock); 271 break; 272 } 273 274 /* Status of the lockres *might* change so double 275 * check. If the lockres is unused, holding the dlm 276 * spinlock will prevent people from getting and more 277 * refs on it. */ 278 unused = __dlm_lockres_unused(lockres); 279 if (!unused || 280 (lockres->state & DLM_LOCK_RES_MIGRATING) || 281 (lockres->inflight_assert_workers != 0)) { 282 mlog(0, "%s: res %.*s is in use or being remastered, " 283 "used %d, state %d, assert master workers %u\n", 284 dlm->name, lockres->lockname.len, 285 lockres->lockname.name, 286 !unused, lockres->state, 287 lockres->inflight_assert_workers); 288 list_move_tail(&lockres->purge, &dlm->purge_list); 289 spin_unlock(&lockres->spinlock); 290 continue; 291 } 292 293 dlm_lockres_get(lockres); 294 295 dlm_purge_lockres(dlm, lockres); 296 297 dlm_lockres_put(lockres); 298 299 /* Avoid adding any scheduling latencies */ 300 cond_resched_lock(&dlm->spinlock); 301 } 302 303 spin_unlock(&dlm->spinlock); 304 } 305 306 static void dlm_shuffle_lists(struct dlm_ctxt *dlm, 307 struct dlm_lock_resource *res) 308 { 309 struct dlm_lock *lock, *target; 310 int can_grant = 1; 311 312 /* 313 * Because this function is called with the lockres 314 * spinlock, and because we know that it is not migrating/ 315 * recovering/in-progress, it is fine to reserve asts and 316 * basts right before queueing them all throughout 317 */ 318 assert_spin_locked(&dlm->ast_lock); 319 assert_spin_locked(&res->spinlock); 320 BUG_ON((res->state & (DLM_LOCK_RES_MIGRATING| 321 DLM_LOCK_RES_RECOVERING| 322 DLM_LOCK_RES_IN_PROGRESS))); 323 324 converting: 325 if (list_empty(&res->converting)) 326 goto blocked; 327 mlog(0, "%s: res %.*s has locks on the convert queue\n", dlm->name, 328 res->lockname.len, res->lockname.name); 329 330 target = list_entry(res->converting.next, struct dlm_lock, list); 331 if (target->ml.convert_type == LKM_IVMODE) { 332 mlog(ML_ERROR, "%s: res %.*s converting lock to invalid mode\n", 333 dlm->name, res->lockname.len, res->lockname.name); 334 BUG(); 335 } 336 list_for_each_entry(lock, &res->granted, list) { 337 if (lock==target) 338 continue; 339 if (!dlm_lock_compatible(lock->ml.type, 340 target->ml.convert_type)) { 341 can_grant = 0; 342 /* queue the BAST if not already */ 343 if (lock->ml.highest_blocked == LKM_IVMODE) { 344 __dlm_lockres_reserve_ast(res); 345 __dlm_queue_bast(dlm, lock); 346 } 347 /* update the highest_blocked if needed */ 348 if (lock->ml.highest_blocked < target->ml.convert_type) 349 lock->ml.highest_blocked = 350 target->ml.convert_type; 351 } 352 } 353 354 list_for_each_entry(lock, &res->converting, list) { 355 if (lock==target) 356 continue; 357 if (!dlm_lock_compatible(lock->ml.type, 358 target->ml.convert_type)) { 359 can_grant = 0; 360 if (lock->ml.highest_blocked == LKM_IVMODE) { 361 __dlm_lockres_reserve_ast(res); 362 __dlm_queue_bast(dlm, lock); 363 } 364 if (lock->ml.highest_blocked < target->ml.convert_type) 365 lock->ml.highest_blocked = 366 target->ml.convert_type; 367 } 368 } 369 370 /* we can convert the lock */ 371 if (can_grant) { 372 spin_lock(&target->spinlock); 373 BUG_ON(target->ml.highest_blocked != LKM_IVMODE); 374 375 mlog(0, "%s: res %.*s, AST for Converting lock %u:%llu, type " 376 "%d => %d, node %u\n", dlm->name, res->lockname.len, 377 res->lockname.name, 378 dlm_get_lock_cookie_node(be64_to_cpu(target->ml.cookie)), 379 dlm_get_lock_cookie_seq(be64_to_cpu(target->ml.cookie)), 380 target->ml.type, 381 target->ml.convert_type, target->ml.node); 382 383 target->ml.type = target->ml.convert_type; 384 target->ml.convert_type = LKM_IVMODE; 385 list_move_tail(&target->list, &res->granted); 386 387 BUG_ON(!target->lksb); 388 target->lksb->status = DLM_NORMAL; 389 390 spin_unlock(&target->spinlock); 391 392 __dlm_lockres_reserve_ast(res); 393 __dlm_queue_ast(dlm, target); 394 /* go back and check for more */ 395 goto converting; 396 } 397 398 blocked: 399 if (list_empty(&res->blocked)) 400 goto leave; 401 target = list_entry(res->blocked.next, struct dlm_lock, list); 402 403 list_for_each_entry(lock, &res->granted, list) { 404 if (lock==target) 405 continue; 406 if (!dlm_lock_compatible(lock->ml.type, target->ml.type)) { 407 can_grant = 0; 408 if (lock->ml.highest_blocked == LKM_IVMODE) { 409 __dlm_lockres_reserve_ast(res); 410 __dlm_queue_bast(dlm, lock); 411 } 412 if (lock->ml.highest_blocked < target->ml.type) 413 lock->ml.highest_blocked = target->ml.type; 414 } 415 } 416 417 list_for_each_entry(lock, &res->converting, list) { 418 if (lock==target) 419 continue; 420 if (!dlm_lock_compatible(lock->ml.type, target->ml.type)) { 421 can_grant = 0; 422 if (lock->ml.highest_blocked == LKM_IVMODE) { 423 __dlm_lockres_reserve_ast(res); 424 __dlm_queue_bast(dlm, lock); 425 } 426 if (lock->ml.highest_blocked < target->ml.type) 427 lock->ml.highest_blocked = target->ml.type; 428 } 429 } 430 431 /* we can grant the blocked lock (only 432 * possible if converting list empty) */ 433 if (can_grant) { 434 spin_lock(&target->spinlock); 435 BUG_ON(target->ml.highest_blocked != LKM_IVMODE); 436 437 mlog(0, "%s: res %.*s, AST for Blocked lock %u:%llu, type %d, " 438 "node %u\n", dlm->name, res->lockname.len, 439 res->lockname.name, 440 dlm_get_lock_cookie_node(be64_to_cpu(target->ml.cookie)), 441 dlm_get_lock_cookie_seq(be64_to_cpu(target->ml.cookie)), 442 target->ml.type, target->ml.node); 443 444 /* target->ml.type is already correct */ 445 list_move_tail(&target->list, &res->granted); 446 447 BUG_ON(!target->lksb); 448 target->lksb->status = DLM_NORMAL; 449 450 spin_unlock(&target->spinlock); 451 452 __dlm_lockres_reserve_ast(res); 453 __dlm_queue_ast(dlm, target); 454 /* go back and check for more */ 455 goto converting; 456 } 457 458 leave: 459 return; 460 } 461 462 /* must have NO locks when calling this with res !=NULL * */ 463 void dlm_kick_thread(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) 464 { 465 if (res) { 466 spin_lock(&dlm->spinlock); 467 spin_lock(&res->spinlock); 468 __dlm_dirty_lockres(dlm, res); 469 spin_unlock(&res->spinlock); 470 spin_unlock(&dlm->spinlock); 471 } 472 wake_up(&dlm->dlm_thread_wq); 473 } 474 475 void __dlm_dirty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) 476 { 477 assert_spin_locked(&dlm->spinlock); 478 assert_spin_locked(&res->spinlock); 479 480 /* don't shuffle secondary queues */ 481 if ((res->owner == dlm->node_num)) { 482 if (res->state & (DLM_LOCK_RES_MIGRATING | 483 DLM_LOCK_RES_BLOCK_DIRTY)) 484 return; 485 486 if (list_empty(&res->dirty)) { 487 /* ref for dirty_list */ 488 dlm_lockres_get(res); 489 list_add_tail(&res->dirty, &dlm->dirty_list); 490 res->state |= DLM_LOCK_RES_DIRTY; 491 } 492 } 493 494 mlog(0, "%s: res %.*s\n", dlm->name, res->lockname.len, 495 res->lockname.name); 496 } 497 498 499 /* Launch the NM thread for the mounted volume */ 500 int dlm_launch_thread(struct dlm_ctxt *dlm) 501 { 502 mlog(0, "Starting dlm_thread...\n"); 503 504 dlm->dlm_thread_task = kthread_run(dlm_thread, dlm, "dlm-%s", 505 dlm->name); 506 if (IS_ERR(dlm->dlm_thread_task)) { 507 mlog_errno(PTR_ERR(dlm->dlm_thread_task)); 508 dlm->dlm_thread_task = NULL; 509 return -EINVAL; 510 } 511 512 return 0; 513 } 514 515 void dlm_complete_thread(struct dlm_ctxt *dlm) 516 { 517 if (dlm->dlm_thread_task) { 518 mlog(ML_KTHREAD, "Waiting for dlm thread to exit\n"); 519 kthread_stop(dlm->dlm_thread_task); 520 dlm->dlm_thread_task = NULL; 521 } 522 } 523 524 static int dlm_dirty_list_empty(struct dlm_ctxt *dlm) 525 { 526 int empty; 527 528 spin_lock(&dlm->spinlock); 529 empty = list_empty(&dlm->dirty_list); 530 spin_unlock(&dlm->spinlock); 531 532 return empty; 533 } 534 535 static void dlm_flush_asts(struct dlm_ctxt *dlm) 536 { 537 int ret; 538 struct dlm_lock *lock; 539 struct dlm_lock_resource *res; 540 u8 hi; 541 542 spin_lock(&dlm->ast_lock); 543 while (!list_empty(&dlm->pending_asts)) { 544 lock = list_entry(dlm->pending_asts.next, 545 struct dlm_lock, ast_list); 546 /* get an extra ref on lock */ 547 dlm_lock_get(lock); 548 res = lock->lockres; 549 mlog(0, "%s: res %.*s, Flush AST for lock %u:%llu, type %d, " 550 "node %u\n", dlm->name, res->lockname.len, 551 res->lockname.name, 552 dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)), 553 dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)), 554 lock->ml.type, lock->ml.node); 555 556 BUG_ON(!lock->ast_pending); 557 558 /* remove from list (including ref) */ 559 list_del_init(&lock->ast_list); 560 dlm_lock_put(lock); 561 spin_unlock(&dlm->ast_lock); 562 563 if (lock->ml.node != dlm->node_num) { 564 ret = dlm_do_remote_ast(dlm, res, lock); 565 if (ret < 0) 566 mlog_errno(ret); 567 } else 568 dlm_do_local_ast(dlm, res, lock); 569 570 spin_lock(&dlm->ast_lock); 571 572 /* possible that another ast was queued while 573 * we were delivering the last one */ 574 if (!list_empty(&lock->ast_list)) { 575 mlog(0, "%s: res %.*s, AST queued while flushing last " 576 "one\n", dlm->name, res->lockname.len, 577 res->lockname.name); 578 } else 579 lock->ast_pending = 0; 580 581 /* drop the extra ref. 582 * this may drop it completely. */ 583 dlm_lock_put(lock); 584 dlm_lockres_release_ast(dlm, res); 585 } 586 587 while (!list_empty(&dlm->pending_basts)) { 588 lock = list_entry(dlm->pending_basts.next, 589 struct dlm_lock, bast_list); 590 /* get an extra ref on lock */ 591 dlm_lock_get(lock); 592 res = lock->lockres; 593 594 BUG_ON(!lock->bast_pending); 595 596 /* get the highest blocked lock, and reset */ 597 spin_lock(&lock->spinlock); 598 BUG_ON(lock->ml.highest_blocked <= LKM_IVMODE); 599 hi = lock->ml.highest_blocked; 600 lock->ml.highest_blocked = LKM_IVMODE; 601 spin_unlock(&lock->spinlock); 602 603 /* remove from list (including ref) */ 604 list_del_init(&lock->bast_list); 605 dlm_lock_put(lock); 606 spin_unlock(&dlm->ast_lock); 607 608 mlog(0, "%s: res %.*s, Flush BAST for lock %u:%llu, " 609 "blocked %d, node %u\n", 610 dlm->name, res->lockname.len, res->lockname.name, 611 dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)), 612 dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)), 613 hi, lock->ml.node); 614 615 if (lock->ml.node != dlm->node_num) { 616 ret = dlm_send_proxy_bast(dlm, res, lock, hi); 617 if (ret < 0) 618 mlog_errno(ret); 619 } else 620 dlm_do_local_bast(dlm, res, lock, hi); 621 622 spin_lock(&dlm->ast_lock); 623 624 /* possible that another bast was queued while 625 * we were delivering the last one */ 626 if (!list_empty(&lock->bast_list)) { 627 mlog(0, "%s: res %.*s, BAST queued while flushing last " 628 "one\n", dlm->name, res->lockname.len, 629 res->lockname.name); 630 } else 631 lock->bast_pending = 0; 632 633 /* drop the extra ref. 634 * this may drop it completely. */ 635 dlm_lock_put(lock); 636 dlm_lockres_release_ast(dlm, res); 637 } 638 wake_up(&dlm->ast_wq); 639 spin_unlock(&dlm->ast_lock); 640 } 641 642 643 #define DLM_THREAD_TIMEOUT_MS (4 * 1000) 644 #define DLM_THREAD_MAX_DIRTY 100 645 #define DLM_THREAD_MAX_ASTS 10 646 647 static int dlm_thread(void *data) 648 { 649 struct dlm_lock_resource *res; 650 struct dlm_ctxt *dlm = data; 651 unsigned long timeout = msecs_to_jiffies(DLM_THREAD_TIMEOUT_MS); 652 653 mlog(0, "dlm thread running for %s...\n", dlm->name); 654 655 while (!kthread_should_stop()) { 656 int n = DLM_THREAD_MAX_DIRTY; 657 658 /* dlm_shutting_down is very point-in-time, but that 659 * doesn't matter as we'll just loop back around if we 660 * get false on the leading edge of a state 661 * transition. */ 662 dlm_run_purge_list(dlm, dlm_shutting_down(dlm)); 663 664 /* We really don't want to hold dlm->spinlock while 665 * calling dlm_shuffle_lists on each lockres that 666 * needs to have its queues adjusted and AST/BASTs 667 * run. So let's pull each entry off the dirty_list 668 * and drop dlm->spinlock ASAP. Once off the list, 669 * res->spinlock needs to be taken again to protect 670 * the queues while calling dlm_shuffle_lists. */ 671 spin_lock(&dlm->spinlock); 672 while (!list_empty(&dlm->dirty_list)) { 673 int delay = 0; 674 res = list_entry(dlm->dirty_list.next, 675 struct dlm_lock_resource, dirty); 676 677 /* peel a lockres off, remove it from the list, 678 * unset the dirty flag and drop the dlm lock */ 679 BUG_ON(!res); 680 dlm_lockres_get(res); 681 682 spin_lock(&res->spinlock); 683 /* We clear the DLM_LOCK_RES_DIRTY state once we shuffle lists below */ 684 list_del_init(&res->dirty); 685 spin_unlock(&res->spinlock); 686 spin_unlock(&dlm->spinlock); 687 /* Drop dirty_list ref */ 688 dlm_lockres_put(res); 689 690 /* lockres can be re-dirtied/re-added to the 691 * dirty_list in this gap, but that is ok */ 692 693 spin_lock(&dlm->ast_lock); 694 spin_lock(&res->spinlock); 695 if (res->owner != dlm->node_num) { 696 __dlm_print_one_lock_resource(res); 697 mlog(ML_ERROR, "%s: inprog %d, mig %d, reco %d," 698 " dirty %d\n", dlm->name, 699 !!(res->state & DLM_LOCK_RES_IN_PROGRESS), 700 !!(res->state & DLM_LOCK_RES_MIGRATING), 701 !!(res->state & DLM_LOCK_RES_RECOVERING), 702 !!(res->state & DLM_LOCK_RES_DIRTY)); 703 } 704 BUG_ON(res->owner != dlm->node_num); 705 706 /* it is now ok to move lockreses in these states 707 * to the dirty list, assuming that they will only be 708 * dirty for a short while. */ 709 BUG_ON(res->state & DLM_LOCK_RES_MIGRATING); 710 if (res->state & (DLM_LOCK_RES_IN_PROGRESS | 711 DLM_LOCK_RES_RECOVERING | 712 DLM_LOCK_RES_RECOVERY_WAITING)) { 713 /* move it to the tail and keep going */ 714 res->state &= ~DLM_LOCK_RES_DIRTY; 715 spin_unlock(&res->spinlock); 716 spin_unlock(&dlm->ast_lock); 717 mlog(0, "%s: res %.*s, inprogress, delay list " 718 "shuffle, state %d\n", dlm->name, 719 res->lockname.len, res->lockname.name, 720 res->state); 721 delay = 1; 722 goto in_progress; 723 } 724 725 /* at this point the lockres is not migrating/ 726 * recovering/in-progress. we have the lockres 727 * spinlock and do NOT have the dlm lock. 728 * safe to reserve/queue asts and run the lists. */ 729 730 /* called while holding lockres lock */ 731 dlm_shuffle_lists(dlm, res); 732 res->state &= ~DLM_LOCK_RES_DIRTY; 733 spin_unlock(&res->spinlock); 734 spin_unlock(&dlm->ast_lock); 735 736 dlm_lockres_calc_usage(dlm, res); 737 738 in_progress: 739 740 spin_lock(&dlm->spinlock); 741 /* if the lock was in-progress, stick 742 * it on the back of the list */ 743 if (delay) { 744 spin_lock(&res->spinlock); 745 __dlm_dirty_lockres(dlm, res); 746 spin_unlock(&res->spinlock); 747 } 748 dlm_lockres_put(res); 749 750 /* unlikely, but we may need to give time to 751 * other tasks */ 752 if (!--n) { 753 mlog(0, "%s: Throttling dlm thread\n", 754 dlm->name); 755 break; 756 } 757 } 758 759 spin_unlock(&dlm->spinlock); 760 dlm_flush_asts(dlm); 761 762 /* yield and continue right away if there is more work to do */ 763 if (!n) { 764 cond_resched(); 765 continue; 766 } 767 768 wait_event_interruptible_timeout(dlm->dlm_thread_wq, 769 !dlm_dirty_list_empty(dlm) || 770 kthread_should_stop(), 771 timeout); 772 } 773 774 mlog(0, "quitting DLM thread\n"); 775 return 0; 776 } 777