1 /* -*- mode: c; c-basic-offset: 8; -*- 2 * vim: noexpandtab sw=8 ts=8 sts=0: 3 * 4 * dlmglue.c 5 * 6 * Code which implements an OCFS2 specific interface to our DLM. 7 * 8 * Copyright (C) 2003, 2004 Oracle. All rights reserved. 9 * 10 * This program is free software; you can redistribute it and/or 11 * modify it under the terms of the GNU General Public 12 * License as published by the Free Software Foundation; either 13 * version 2 of the License, or (at your option) any later version. 14 * 15 * This program is distributed in the hope that it will be useful, 16 * but WITHOUT ANY WARRANTY; without even the implied warranty of 17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 18 * General Public License for more details. 19 * 20 * You should have received a copy of the GNU General Public 21 * License along with this program; if not, write to the 22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 23 * Boston, MA 021110-1307, USA. 24 */ 25 26 #include <linux/types.h> 27 #include <linux/slab.h> 28 #include <linux/highmem.h> 29 #include <linux/mm.h> 30 #include <linux/kthread.h> 31 #include <linux/pagemap.h> 32 #include <linux/debugfs.h> 33 #include <linux/seq_file.h> 34 35 #define MLOG_MASK_PREFIX ML_DLM_GLUE 36 #include <cluster/masklog.h> 37 38 #include "ocfs2.h" 39 #include "ocfs2_lockingver.h" 40 41 #include "alloc.h" 42 #include "dcache.h" 43 #include "dlmglue.h" 44 #include "extent_map.h" 45 #include "file.h" 46 #include "heartbeat.h" 47 #include "inode.h" 48 #include "journal.h" 49 #include "stackglue.h" 50 #include "slot_map.h" 51 #include "super.h" 52 #include "uptodate.h" 53 54 #include "buffer_head_io.h" 55 56 struct ocfs2_mask_waiter { 57 struct list_head mw_item; 58 int mw_status; 59 struct completion mw_complete; 60 unsigned long mw_mask; 61 unsigned long mw_goal; 62 }; 63 64 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres); 65 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres); 66 static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres); 67 68 /* 69 * Return value from ->downconvert_worker functions. 70 * 71 * These control the precise actions of ocfs2_unblock_lock() 72 * and ocfs2_process_blocked_lock() 73 * 74 */ 75 enum ocfs2_unblock_action { 76 UNBLOCK_CONTINUE = 0, /* Continue downconvert */ 77 UNBLOCK_CONTINUE_POST = 1, /* Continue downconvert, fire 78 * ->post_unlock callback */ 79 UNBLOCK_STOP_POST = 2, /* Do not downconvert, fire 80 * ->post_unlock() callback. */ 81 }; 82 83 struct ocfs2_unblock_ctl { 84 int requeue; 85 enum ocfs2_unblock_action unblock_action; 86 }; 87 88 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres, 89 int new_level); 90 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres); 91 92 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres, 93 int blocking); 94 95 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres, 96 int blocking); 97 98 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb, 99 struct ocfs2_lock_res *lockres); 100 101 102 #define mlog_meta_lvb(__level, __lockres) ocfs2_dump_meta_lvb_info(__level, __PRETTY_FUNCTION__, __LINE__, __lockres) 103 104 /* This aids in debugging situations where a bad LVB might be involved. */ 105 static void ocfs2_dump_meta_lvb_info(u64 level, 106 const char *function, 107 unsigned int line, 108 struct ocfs2_lock_res *lockres) 109 { 110 struct ocfs2_meta_lvb *lvb = 111 (struct ocfs2_meta_lvb *)ocfs2_dlm_lvb(&lockres->l_lksb); 112 113 mlog(level, "LVB information for %s (called from %s:%u):\n", 114 lockres->l_name, function, line); 115 mlog(level, "version: %u, clusters: %u, generation: 0x%x\n", 116 lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters), 117 be32_to_cpu(lvb->lvb_igeneration)); 118 mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n", 119 (unsigned long long)be64_to_cpu(lvb->lvb_isize), 120 be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid), 121 be16_to_cpu(lvb->lvb_imode)); 122 mlog(level, "nlink %u, atime_packed 0x%llx, ctime_packed 0x%llx, " 123 "mtime_packed 0x%llx iattr 0x%x\n", be16_to_cpu(lvb->lvb_inlink), 124 (long long)be64_to_cpu(lvb->lvb_iatime_packed), 125 (long long)be64_to_cpu(lvb->lvb_ictime_packed), 126 (long long)be64_to_cpu(lvb->lvb_imtime_packed), 127 be32_to_cpu(lvb->lvb_iattr)); 128 } 129 130 131 /* 132 * OCFS2 Lock Resource Operations 133 * 134 * These fine tune the behavior of the generic dlmglue locking infrastructure. 135 * 136 * The most basic of lock types can point ->l_priv to their respective 137 * struct ocfs2_super and allow the default actions to manage things. 138 * 139 * Right now, each lock type also needs to implement an init function, 140 * and trivial lock/unlock wrappers. ocfs2_simple_drop_lockres() 141 * should be called when the lock is no longer needed (i.e., object 142 * destruction time). 143 */ 144 struct ocfs2_lock_res_ops { 145 /* 146 * Translate an ocfs2_lock_res * into an ocfs2_super *. Define 147 * this callback if ->l_priv is not an ocfs2_super pointer 148 */ 149 struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *); 150 151 /* 152 * Optionally called in the downconvert thread after a 153 * successful downconvert. The lockres will not be referenced 154 * after this callback is called, so it is safe to free 155 * memory, etc. 156 * 157 * The exact semantics of when this is called are controlled 158 * by ->downconvert_worker() 159 */ 160 void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *); 161 162 /* 163 * Allow a lock type to add checks to determine whether it is 164 * safe to downconvert a lock. Return 0 to re-queue the 165 * downconvert at a later time, nonzero to continue. 166 * 167 * For most locks, the default checks that there are no 168 * incompatible holders are sufficient. 169 * 170 * Called with the lockres spinlock held. 171 */ 172 int (*check_downconvert)(struct ocfs2_lock_res *, int); 173 174 /* 175 * Allows a lock type to populate the lock value block. This 176 * is called on downconvert, and when we drop a lock. 177 * 178 * Locks that want to use this should set LOCK_TYPE_USES_LVB 179 * in the flags field. 180 * 181 * Called with the lockres spinlock held. 182 */ 183 void (*set_lvb)(struct ocfs2_lock_res *); 184 185 /* 186 * Called from the downconvert thread when it is determined 187 * that a lock will be downconverted. This is called without 188 * any locks held so the function can do work that might 189 * schedule (syncing out data, etc). 190 * 191 * This should return any one of the ocfs2_unblock_action 192 * values, depending on what it wants the thread to do. 193 */ 194 int (*downconvert_worker)(struct ocfs2_lock_res *, int); 195 196 /* 197 * LOCK_TYPE_* flags which describe the specific requirements 198 * of a lock type. Descriptions of each individual flag follow. 199 */ 200 int flags; 201 }; 202 203 /* 204 * Some locks want to "refresh" potentially stale data when a 205 * meaningful (PRMODE or EXMODE) lock level is first obtained. If this 206 * flag is set, the OCFS2_LOCK_NEEDS_REFRESH flag will be set on the 207 * individual lockres l_flags member from the ast function. It is 208 * expected that the locking wrapper will clear the 209 * OCFS2_LOCK_NEEDS_REFRESH flag when done. 210 */ 211 #define LOCK_TYPE_REQUIRES_REFRESH 0x1 212 213 /* 214 * Indicate that a lock type makes use of the lock value block. The 215 * ->set_lvb lock type callback must be defined. 216 */ 217 #define LOCK_TYPE_USES_LVB 0x2 218 219 static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = { 220 .get_osb = ocfs2_get_inode_osb, 221 .flags = 0, 222 }; 223 224 static struct ocfs2_lock_res_ops ocfs2_inode_inode_lops = { 225 .get_osb = ocfs2_get_inode_osb, 226 .check_downconvert = ocfs2_check_meta_downconvert, 227 .set_lvb = ocfs2_set_meta_lvb, 228 .downconvert_worker = ocfs2_data_convert_worker, 229 .flags = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB, 230 }; 231 232 static struct ocfs2_lock_res_ops ocfs2_super_lops = { 233 .flags = LOCK_TYPE_REQUIRES_REFRESH, 234 }; 235 236 static struct ocfs2_lock_res_ops ocfs2_rename_lops = { 237 .flags = 0, 238 }; 239 240 static struct ocfs2_lock_res_ops ocfs2_dentry_lops = { 241 .get_osb = ocfs2_get_dentry_osb, 242 .post_unlock = ocfs2_dentry_post_unlock, 243 .downconvert_worker = ocfs2_dentry_convert_worker, 244 .flags = 0, 245 }; 246 247 static struct ocfs2_lock_res_ops ocfs2_inode_open_lops = { 248 .get_osb = ocfs2_get_inode_osb, 249 .flags = 0, 250 }; 251 252 static struct ocfs2_lock_res_ops ocfs2_flock_lops = { 253 .get_osb = ocfs2_get_file_osb, 254 .flags = 0, 255 }; 256 257 static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres) 258 { 259 return lockres->l_type == OCFS2_LOCK_TYPE_META || 260 lockres->l_type == OCFS2_LOCK_TYPE_RW || 261 lockres->l_type == OCFS2_LOCK_TYPE_OPEN; 262 } 263 264 static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres) 265 { 266 BUG_ON(!ocfs2_is_inode_lock(lockres)); 267 268 return (struct inode *) lockres->l_priv; 269 } 270 271 static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres) 272 { 273 BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY); 274 275 return (struct ocfs2_dentry_lock *)lockres->l_priv; 276 } 277 278 static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *lockres) 279 { 280 if (lockres->l_ops->get_osb) 281 return lockres->l_ops->get_osb(lockres); 282 283 return (struct ocfs2_super *)lockres->l_priv; 284 } 285 286 static int ocfs2_lock_create(struct ocfs2_super *osb, 287 struct ocfs2_lock_res *lockres, 288 int level, 289 u32 dlm_flags); 290 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres, 291 int wanted); 292 static void ocfs2_cluster_unlock(struct ocfs2_super *osb, 293 struct ocfs2_lock_res *lockres, 294 int level); 295 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres); 296 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres); 297 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres); 298 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, int level); 299 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb, 300 struct ocfs2_lock_res *lockres); 301 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres, 302 int convert); 303 #define ocfs2_log_dlm_error(_func, _err, _lockres) do { \ 304 mlog(ML_ERROR, "DLM error %d while calling %s on resource %s\n", \ 305 _err, _func, _lockres->l_name); \ 306 } while (0) 307 static int ocfs2_downconvert_thread(void *arg); 308 static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb, 309 struct ocfs2_lock_res *lockres); 310 static int ocfs2_inode_lock_update(struct inode *inode, 311 struct buffer_head **bh); 312 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb); 313 static inline int ocfs2_highest_compat_lock_level(int level); 314 static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres, 315 int new_level); 316 static int ocfs2_downconvert_lock(struct ocfs2_super *osb, 317 struct ocfs2_lock_res *lockres, 318 int new_level, 319 int lvb, 320 unsigned int generation); 321 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb, 322 struct ocfs2_lock_res *lockres); 323 static int ocfs2_cancel_convert(struct ocfs2_super *osb, 324 struct ocfs2_lock_res *lockres); 325 326 327 static void ocfs2_build_lock_name(enum ocfs2_lock_type type, 328 u64 blkno, 329 u32 generation, 330 char *name) 331 { 332 int len; 333 334 mlog_entry_void(); 335 336 BUG_ON(type >= OCFS2_NUM_LOCK_TYPES); 337 338 len = snprintf(name, OCFS2_LOCK_ID_MAX_LEN, "%c%s%016llx%08x", 339 ocfs2_lock_type_char(type), OCFS2_LOCK_ID_PAD, 340 (long long)blkno, generation); 341 342 BUG_ON(len != (OCFS2_LOCK_ID_MAX_LEN - 1)); 343 344 mlog(0, "built lock resource with name: %s\n", name); 345 346 mlog_exit_void(); 347 } 348 349 static DEFINE_SPINLOCK(ocfs2_dlm_tracking_lock); 350 351 static void ocfs2_add_lockres_tracking(struct ocfs2_lock_res *res, 352 struct ocfs2_dlm_debug *dlm_debug) 353 { 354 mlog(0, "Add tracking for lockres %s\n", res->l_name); 355 356 spin_lock(&ocfs2_dlm_tracking_lock); 357 list_add(&res->l_debug_list, &dlm_debug->d_lockres_tracking); 358 spin_unlock(&ocfs2_dlm_tracking_lock); 359 } 360 361 static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res) 362 { 363 spin_lock(&ocfs2_dlm_tracking_lock); 364 if (!list_empty(&res->l_debug_list)) 365 list_del_init(&res->l_debug_list); 366 spin_unlock(&ocfs2_dlm_tracking_lock); 367 } 368 369 static void ocfs2_lock_res_init_common(struct ocfs2_super *osb, 370 struct ocfs2_lock_res *res, 371 enum ocfs2_lock_type type, 372 struct ocfs2_lock_res_ops *ops, 373 void *priv) 374 { 375 res->l_type = type; 376 res->l_ops = ops; 377 res->l_priv = priv; 378 379 res->l_level = DLM_LOCK_IV; 380 res->l_requested = DLM_LOCK_IV; 381 res->l_blocking = DLM_LOCK_IV; 382 res->l_action = OCFS2_AST_INVALID; 383 res->l_unlock_action = OCFS2_UNLOCK_INVALID; 384 385 res->l_flags = OCFS2_LOCK_INITIALIZED; 386 387 ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug); 388 } 389 390 void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res) 391 { 392 /* This also clears out the lock status block */ 393 memset(res, 0, sizeof(struct ocfs2_lock_res)); 394 spin_lock_init(&res->l_lock); 395 init_waitqueue_head(&res->l_event); 396 INIT_LIST_HEAD(&res->l_blocked_list); 397 INIT_LIST_HEAD(&res->l_mask_waiters); 398 } 399 400 void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res, 401 enum ocfs2_lock_type type, 402 unsigned int generation, 403 struct inode *inode) 404 { 405 struct ocfs2_lock_res_ops *ops; 406 407 switch(type) { 408 case OCFS2_LOCK_TYPE_RW: 409 ops = &ocfs2_inode_rw_lops; 410 break; 411 case OCFS2_LOCK_TYPE_META: 412 ops = &ocfs2_inode_inode_lops; 413 break; 414 case OCFS2_LOCK_TYPE_OPEN: 415 ops = &ocfs2_inode_open_lops; 416 break; 417 default: 418 mlog_bug_on_msg(1, "type: %d\n", type); 419 ops = NULL; /* thanks, gcc */ 420 break; 421 }; 422 423 ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno, 424 generation, res->l_name); 425 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode); 426 } 427 428 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres) 429 { 430 struct inode *inode = ocfs2_lock_res_inode(lockres); 431 432 return OCFS2_SB(inode->i_sb); 433 } 434 435 static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres) 436 { 437 struct ocfs2_file_private *fp = lockres->l_priv; 438 439 return OCFS2_SB(fp->fp_file->f_mapping->host->i_sb); 440 } 441 442 static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres) 443 { 444 __be64 inode_blkno_be; 445 446 memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], 447 sizeof(__be64)); 448 449 return be64_to_cpu(inode_blkno_be); 450 } 451 452 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres) 453 { 454 struct ocfs2_dentry_lock *dl = lockres->l_priv; 455 456 return OCFS2_SB(dl->dl_inode->i_sb); 457 } 458 459 void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl, 460 u64 parent, struct inode *inode) 461 { 462 int len; 463 u64 inode_blkno = OCFS2_I(inode)->ip_blkno; 464 __be64 inode_blkno_be = cpu_to_be64(inode_blkno); 465 struct ocfs2_lock_res *lockres = &dl->dl_lockres; 466 467 ocfs2_lock_res_init_once(lockres); 468 469 /* 470 * Unfortunately, the standard lock naming scheme won't work 471 * here because we have two 16 byte values to use. Instead, 472 * we'll stuff the inode number as a binary value. We still 473 * want error prints to show something without garbling the 474 * display, so drop a null byte in there before the inode 475 * number. A future version of OCFS2 will likely use all 476 * binary lock names. The stringified names have been a 477 * tremendous aid in debugging, but now that the debugfs 478 * interface exists, we can mangle things there if need be. 479 * 480 * NOTE: We also drop the standard "pad" value (the total lock 481 * name size stays the same though - the last part is all 482 * zeros due to the memset in ocfs2_lock_res_init_once() 483 */ 484 len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START, 485 "%c%016llx", 486 ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY), 487 (long long)parent); 488 489 BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1)); 490 491 memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be, 492 sizeof(__be64)); 493 494 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres, 495 OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops, 496 dl); 497 } 498 499 static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res, 500 struct ocfs2_super *osb) 501 { 502 /* Superblock lockres doesn't come from a slab so we call init 503 * once on it manually. */ 504 ocfs2_lock_res_init_once(res); 505 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO, 506 0, res->l_name); 507 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER, 508 &ocfs2_super_lops, osb); 509 } 510 511 static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res, 512 struct ocfs2_super *osb) 513 { 514 /* Rename lockres doesn't come from a slab so we call init 515 * once on it manually. */ 516 ocfs2_lock_res_init_once(res); 517 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name); 518 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME, 519 &ocfs2_rename_lops, osb); 520 } 521 522 void ocfs2_file_lock_res_init(struct ocfs2_lock_res *lockres, 523 struct ocfs2_file_private *fp) 524 { 525 struct inode *inode = fp->fp_file->f_mapping->host; 526 struct ocfs2_inode_info *oi = OCFS2_I(inode); 527 528 ocfs2_lock_res_init_once(lockres); 529 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_FLOCK, oi->ip_blkno, 530 inode->i_generation, lockres->l_name); 531 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres, 532 OCFS2_LOCK_TYPE_FLOCK, &ocfs2_flock_lops, 533 fp); 534 lockres->l_flags |= OCFS2_LOCK_NOCACHE; 535 } 536 537 void ocfs2_lock_res_free(struct ocfs2_lock_res *res) 538 { 539 mlog_entry_void(); 540 541 if (!(res->l_flags & OCFS2_LOCK_INITIALIZED)) 542 return; 543 544 ocfs2_remove_lockres_tracking(res); 545 546 mlog_bug_on_msg(!list_empty(&res->l_blocked_list), 547 "Lockres %s is on the blocked list\n", 548 res->l_name); 549 mlog_bug_on_msg(!list_empty(&res->l_mask_waiters), 550 "Lockres %s has mask waiters pending\n", 551 res->l_name); 552 mlog_bug_on_msg(spin_is_locked(&res->l_lock), 553 "Lockres %s is locked\n", 554 res->l_name); 555 mlog_bug_on_msg(res->l_ro_holders, 556 "Lockres %s has %u ro holders\n", 557 res->l_name, res->l_ro_holders); 558 mlog_bug_on_msg(res->l_ex_holders, 559 "Lockres %s has %u ex holders\n", 560 res->l_name, res->l_ex_holders); 561 562 /* Need to clear out the lock status block for the dlm */ 563 memset(&res->l_lksb, 0, sizeof(res->l_lksb)); 564 565 res->l_flags = 0UL; 566 mlog_exit_void(); 567 } 568 569 static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres, 570 int level) 571 { 572 mlog_entry_void(); 573 574 BUG_ON(!lockres); 575 576 switch(level) { 577 case DLM_LOCK_EX: 578 lockres->l_ex_holders++; 579 break; 580 case DLM_LOCK_PR: 581 lockres->l_ro_holders++; 582 break; 583 default: 584 BUG(); 585 } 586 587 mlog_exit_void(); 588 } 589 590 static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres, 591 int level) 592 { 593 mlog_entry_void(); 594 595 BUG_ON(!lockres); 596 597 switch(level) { 598 case DLM_LOCK_EX: 599 BUG_ON(!lockres->l_ex_holders); 600 lockres->l_ex_holders--; 601 break; 602 case DLM_LOCK_PR: 603 BUG_ON(!lockres->l_ro_holders); 604 lockres->l_ro_holders--; 605 break; 606 default: 607 BUG(); 608 } 609 mlog_exit_void(); 610 } 611 612 /* WARNING: This function lives in a world where the only three lock 613 * levels are EX, PR, and NL. It *will* have to be adjusted when more 614 * lock types are added. */ 615 static inline int ocfs2_highest_compat_lock_level(int level) 616 { 617 int new_level = DLM_LOCK_EX; 618 619 if (level == DLM_LOCK_EX) 620 new_level = DLM_LOCK_NL; 621 else if (level == DLM_LOCK_PR) 622 new_level = DLM_LOCK_PR; 623 return new_level; 624 } 625 626 static void lockres_set_flags(struct ocfs2_lock_res *lockres, 627 unsigned long newflags) 628 { 629 struct ocfs2_mask_waiter *mw, *tmp; 630 631 assert_spin_locked(&lockres->l_lock); 632 633 lockres->l_flags = newflags; 634 635 list_for_each_entry_safe(mw, tmp, &lockres->l_mask_waiters, mw_item) { 636 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal) 637 continue; 638 639 list_del_init(&mw->mw_item); 640 mw->mw_status = 0; 641 complete(&mw->mw_complete); 642 } 643 } 644 static void lockres_or_flags(struct ocfs2_lock_res *lockres, unsigned long or) 645 { 646 lockres_set_flags(lockres, lockres->l_flags | or); 647 } 648 static void lockres_clear_flags(struct ocfs2_lock_res *lockres, 649 unsigned long clear) 650 { 651 lockres_set_flags(lockres, lockres->l_flags & ~clear); 652 } 653 654 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres) 655 { 656 mlog_entry_void(); 657 658 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 659 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED)); 660 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); 661 BUG_ON(lockres->l_blocking <= DLM_LOCK_NL); 662 663 lockres->l_level = lockres->l_requested; 664 if (lockres->l_level <= 665 ocfs2_highest_compat_lock_level(lockres->l_blocking)) { 666 lockres->l_blocking = DLM_LOCK_NL; 667 lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED); 668 } 669 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 670 671 mlog_exit_void(); 672 } 673 674 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres) 675 { 676 mlog_entry_void(); 677 678 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 679 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED)); 680 681 /* Convert from RO to EX doesn't really need anything as our 682 * information is already up to data. Convert from NL to 683 * *anything* however should mark ourselves as needing an 684 * update */ 685 if (lockres->l_level == DLM_LOCK_NL && 686 lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 687 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 688 689 lockres->l_level = lockres->l_requested; 690 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 691 692 mlog_exit_void(); 693 } 694 695 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres) 696 { 697 mlog_entry_void(); 698 699 BUG_ON((!(lockres->l_flags & OCFS2_LOCK_BUSY))); 700 BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED); 701 702 if (lockres->l_requested > DLM_LOCK_NL && 703 !(lockres->l_flags & OCFS2_LOCK_LOCAL) && 704 lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 705 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 706 707 lockres->l_level = lockres->l_requested; 708 lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED); 709 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 710 711 mlog_exit_void(); 712 } 713 714 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, 715 int level) 716 { 717 int needs_downconvert = 0; 718 mlog_entry_void(); 719 720 assert_spin_locked(&lockres->l_lock); 721 722 lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED); 723 724 if (level > lockres->l_blocking) { 725 /* only schedule a downconvert if we haven't already scheduled 726 * one that goes low enough to satisfy the level we're 727 * blocking. this also catches the case where we get 728 * duplicate BASTs */ 729 if (ocfs2_highest_compat_lock_level(level) < 730 ocfs2_highest_compat_lock_level(lockres->l_blocking)) 731 needs_downconvert = 1; 732 733 lockres->l_blocking = level; 734 } 735 736 mlog_exit(needs_downconvert); 737 return needs_downconvert; 738 } 739 740 /* 741 * OCFS2_LOCK_PENDING and l_pending_gen. 742 * 743 * Why does OCFS2_LOCK_PENDING exist? To close a race between setting 744 * OCFS2_LOCK_BUSY and calling ocfs2_dlm_lock(). See ocfs2_unblock_lock() 745 * for more details on the race. 746 * 747 * OCFS2_LOCK_PENDING closes the race quite nicely. However, it introduces 748 * a race on itself. In o2dlm, we can get the ast before ocfs2_dlm_lock() 749 * returns. The ast clears OCFS2_LOCK_BUSY, and must therefore clear 750 * OCFS2_LOCK_PENDING at the same time. When ocfs2_dlm_lock() returns, 751 * the caller is going to try to clear PENDING again. If nothing else is 752 * happening, __lockres_clear_pending() sees PENDING is unset and does 753 * nothing. 754 * 755 * But what if another path (eg downconvert thread) has just started a 756 * new locking action? The other path has re-set PENDING. Our path 757 * cannot clear PENDING, because that will re-open the original race 758 * window. 759 * 760 * [Example] 761 * 762 * ocfs2_meta_lock() 763 * ocfs2_cluster_lock() 764 * set BUSY 765 * set PENDING 766 * drop l_lock 767 * ocfs2_dlm_lock() 768 * ocfs2_locking_ast() ocfs2_downconvert_thread() 769 * clear PENDING ocfs2_unblock_lock() 770 * take_l_lock 771 * !BUSY 772 * ocfs2_prepare_downconvert() 773 * set BUSY 774 * set PENDING 775 * drop l_lock 776 * take l_lock 777 * clear PENDING 778 * drop l_lock 779 * <window> 780 * ocfs2_dlm_lock() 781 * 782 * So as you can see, we now have a window where l_lock is not held, 783 * PENDING is not set, and ocfs2_dlm_lock() has not been called. 784 * 785 * The core problem is that ocfs2_cluster_lock() has cleared the PENDING 786 * set by ocfs2_prepare_downconvert(). That wasn't nice. 787 * 788 * To solve this we introduce l_pending_gen. A call to 789 * lockres_clear_pending() will only do so when it is passed a generation 790 * number that matches the lockres. lockres_set_pending() will return the 791 * current generation number. When ocfs2_cluster_lock() goes to clear 792 * PENDING, it passes the generation it got from set_pending(). In our 793 * example above, the generation numbers will *not* match. Thus, 794 * ocfs2_cluster_lock() will not clear the PENDING set by 795 * ocfs2_prepare_downconvert(). 796 */ 797 798 /* Unlocked version for ocfs2_locking_ast() */ 799 static void __lockres_clear_pending(struct ocfs2_lock_res *lockres, 800 unsigned int generation, 801 struct ocfs2_super *osb) 802 { 803 assert_spin_locked(&lockres->l_lock); 804 805 /* 806 * The ast and locking functions can race us here. The winner 807 * will clear pending, the loser will not. 808 */ 809 if (!(lockres->l_flags & OCFS2_LOCK_PENDING) || 810 (lockres->l_pending_gen != generation)) 811 return; 812 813 lockres_clear_flags(lockres, OCFS2_LOCK_PENDING); 814 lockres->l_pending_gen++; 815 816 /* 817 * The downconvert thread may have skipped us because we 818 * were PENDING. Wake it up. 819 */ 820 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) 821 ocfs2_wake_downconvert_thread(osb); 822 } 823 824 /* Locked version for callers of ocfs2_dlm_lock() */ 825 static void lockres_clear_pending(struct ocfs2_lock_res *lockres, 826 unsigned int generation, 827 struct ocfs2_super *osb) 828 { 829 unsigned long flags; 830 831 spin_lock_irqsave(&lockres->l_lock, flags); 832 __lockres_clear_pending(lockres, generation, osb); 833 spin_unlock_irqrestore(&lockres->l_lock, flags); 834 } 835 836 static unsigned int lockres_set_pending(struct ocfs2_lock_res *lockres) 837 { 838 assert_spin_locked(&lockres->l_lock); 839 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 840 841 lockres_or_flags(lockres, OCFS2_LOCK_PENDING); 842 843 return lockres->l_pending_gen; 844 } 845 846 847 static void ocfs2_blocking_ast(void *opaque, int level) 848 { 849 struct ocfs2_lock_res *lockres = opaque; 850 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 851 int needs_downconvert; 852 unsigned long flags; 853 854 BUG_ON(level <= DLM_LOCK_NL); 855 856 mlog(0, "BAST fired for lockres %s, blocking %d, level %d type %s\n", 857 lockres->l_name, level, lockres->l_level, 858 ocfs2_lock_type_string(lockres->l_type)); 859 860 /* 861 * We can skip the bast for locks which don't enable caching - 862 * they'll be dropped at the earliest possible time anyway. 863 */ 864 if (lockres->l_flags & OCFS2_LOCK_NOCACHE) 865 return; 866 867 spin_lock_irqsave(&lockres->l_lock, flags); 868 needs_downconvert = ocfs2_generic_handle_bast(lockres, level); 869 if (needs_downconvert) 870 ocfs2_schedule_blocked_lock(osb, lockres); 871 spin_unlock_irqrestore(&lockres->l_lock, flags); 872 873 wake_up(&lockres->l_event); 874 875 ocfs2_wake_downconvert_thread(osb); 876 } 877 878 static void ocfs2_locking_ast(void *opaque) 879 { 880 struct ocfs2_lock_res *lockres = opaque; 881 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 882 unsigned long flags; 883 int status; 884 885 spin_lock_irqsave(&lockres->l_lock, flags); 886 887 status = ocfs2_dlm_lock_status(&lockres->l_lksb); 888 889 if (status == -EAGAIN) { 890 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 891 goto out; 892 } 893 894 if (status) { 895 mlog(ML_ERROR, "lockres %s: lksb status value of %d!\n", 896 lockres->l_name, status); 897 spin_unlock_irqrestore(&lockres->l_lock, flags); 898 return; 899 } 900 901 switch(lockres->l_action) { 902 case OCFS2_AST_ATTACH: 903 ocfs2_generic_handle_attach_action(lockres); 904 lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL); 905 break; 906 case OCFS2_AST_CONVERT: 907 ocfs2_generic_handle_convert_action(lockres); 908 break; 909 case OCFS2_AST_DOWNCONVERT: 910 ocfs2_generic_handle_downconvert_action(lockres); 911 break; 912 default: 913 mlog(ML_ERROR, "lockres %s: ast fired with invalid action: %u " 914 "lockres flags = 0x%lx, unlock action: %u\n", 915 lockres->l_name, lockres->l_action, lockres->l_flags, 916 lockres->l_unlock_action); 917 BUG(); 918 } 919 out: 920 /* set it to something invalid so if we get called again we 921 * can catch it. */ 922 lockres->l_action = OCFS2_AST_INVALID; 923 924 /* Did we try to cancel this lock? Clear that state */ 925 if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) 926 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 927 928 /* 929 * We may have beaten the locking functions here. We certainly 930 * know that dlm_lock() has been called :-) 931 * Because we can't have two lock calls in flight at once, we 932 * can use lockres->l_pending_gen. 933 */ 934 __lockres_clear_pending(lockres, lockres->l_pending_gen, osb); 935 936 wake_up(&lockres->l_event); 937 spin_unlock_irqrestore(&lockres->l_lock, flags); 938 } 939 940 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres, 941 int convert) 942 { 943 unsigned long flags; 944 945 mlog_entry_void(); 946 spin_lock_irqsave(&lockres->l_lock, flags); 947 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 948 if (convert) 949 lockres->l_action = OCFS2_AST_INVALID; 950 else 951 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 952 spin_unlock_irqrestore(&lockres->l_lock, flags); 953 954 wake_up(&lockres->l_event); 955 mlog_exit_void(); 956 } 957 958 /* Note: If we detect another process working on the lock (i.e., 959 * OCFS2_LOCK_BUSY), we'll bail out returning 0. It's up to the caller 960 * to do the right thing in that case. 961 */ 962 static int ocfs2_lock_create(struct ocfs2_super *osb, 963 struct ocfs2_lock_res *lockres, 964 int level, 965 u32 dlm_flags) 966 { 967 int ret = 0; 968 unsigned long flags; 969 unsigned int gen; 970 971 mlog_entry_void(); 972 973 mlog(0, "lock %s, level = %d, flags = %u\n", lockres->l_name, level, 974 dlm_flags); 975 976 spin_lock_irqsave(&lockres->l_lock, flags); 977 if ((lockres->l_flags & OCFS2_LOCK_ATTACHED) || 978 (lockres->l_flags & OCFS2_LOCK_BUSY)) { 979 spin_unlock_irqrestore(&lockres->l_lock, flags); 980 goto bail; 981 } 982 983 lockres->l_action = OCFS2_AST_ATTACH; 984 lockres->l_requested = level; 985 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 986 gen = lockres_set_pending(lockres); 987 spin_unlock_irqrestore(&lockres->l_lock, flags); 988 989 ret = ocfs2_dlm_lock(osb->cconn, 990 level, 991 &lockres->l_lksb, 992 dlm_flags, 993 lockres->l_name, 994 OCFS2_LOCK_ID_MAX_LEN - 1, 995 lockres); 996 lockres_clear_pending(lockres, gen, osb); 997 if (ret) { 998 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 999 ocfs2_recover_from_dlm_error(lockres, 1); 1000 } 1001 1002 mlog(0, "lock %s, return from ocfs2_dlm_lock\n", lockres->l_name); 1003 1004 bail: 1005 mlog_exit(ret); 1006 return ret; 1007 } 1008 1009 static inline int ocfs2_check_wait_flag(struct ocfs2_lock_res *lockres, 1010 int flag) 1011 { 1012 unsigned long flags; 1013 int ret; 1014 1015 spin_lock_irqsave(&lockres->l_lock, flags); 1016 ret = lockres->l_flags & flag; 1017 spin_unlock_irqrestore(&lockres->l_lock, flags); 1018 1019 return ret; 1020 } 1021 1022 static inline void ocfs2_wait_on_busy_lock(struct ocfs2_lock_res *lockres) 1023 1024 { 1025 wait_event(lockres->l_event, 1026 !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_BUSY)); 1027 } 1028 1029 static inline void ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res *lockres) 1030 1031 { 1032 wait_event(lockres->l_event, 1033 !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_REFRESHING)); 1034 } 1035 1036 /* predict what lock level we'll be dropping down to on behalf 1037 * of another node, and return true if the currently wanted 1038 * level will be compatible with it. */ 1039 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres, 1040 int wanted) 1041 { 1042 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); 1043 1044 return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking); 1045 } 1046 1047 static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw) 1048 { 1049 INIT_LIST_HEAD(&mw->mw_item); 1050 init_completion(&mw->mw_complete); 1051 } 1052 1053 static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw) 1054 { 1055 wait_for_completion(&mw->mw_complete); 1056 /* Re-arm the completion in case we want to wait on it again */ 1057 INIT_COMPLETION(mw->mw_complete); 1058 return mw->mw_status; 1059 } 1060 1061 static void lockres_add_mask_waiter(struct ocfs2_lock_res *lockres, 1062 struct ocfs2_mask_waiter *mw, 1063 unsigned long mask, 1064 unsigned long goal) 1065 { 1066 BUG_ON(!list_empty(&mw->mw_item)); 1067 1068 assert_spin_locked(&lockres->l_lock); 1069 1070 list_add_tail(&mw->mw_item, &lockres->l_mask_waiters); 1071 mw->mw_mask = mask; 1072 mw->mw_goal = goal; 1073 } 1074 1075 /* returns 0 if the mw that was removed was already satisfied, -EBUSY 1076 * if the mask still hadn't reached its goal */ 1077 static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres, 1078 struct ocfs2_mask_waiter *mw) 1079 { 1080 unsigned long flags; 1081 int ret = 0; 1082 1083 spin_lock_irqsave(&lockres->l_lock, flags); 1084 if (!list_empty(&mw->mw_item)) { 1085 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal) 1086 ret = -EBUSY; 1087 1088 list_del_init(&mw->mw_item); 1089 init_completion(&mw->mw_complete); 1090 } 1091 spin_unlock_irqrestore(&lockres->l_lock, flags); 1092 1093 return ret; 1094 1095 } 1096 1097 static int ocfs2_wait_for_mask_interruptible(struct ocfs2_mask_waiter *mw, 1098 struct ocfs2_lock_res *lockres) 1099 { 1100 int ret; 1101 1102 ret = wait_for_completion_interruptible(&mw->mw_complete); 1103 if (ret) 1104 lockres_remove_mask_waiter(lockres, mw); 1105 else 1106 ret = mw->mw_status; 1107 /* Re-arm the completion in case we want to wait on it again */ 1108 INIT_COMPLETION(mw->mw_complete); 1109 return ret; 1110 } 1111 1112 static int ocfs2_cluster_lock(struct ocfs2_super *osb, 1113 struct ocfs2_lock_res *lockres, 1114 int level, 1115 u32 lkm_flags, 1116 int arg_flags) 1117 { 1118 struct ocfs2_mask_waiter mw; 1119 int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR); 1120 int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */ 1121 unsigned long flags; 1122 unsigned int gen; 1123 int noqueue_attempted = 0; 1124 1125 mlog_entry_void(); 1126 1127 ocfs2_init_mask_waiter(&mw); 1128 1129 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) 1130 lkm_flags |= DLM_LKF_VALBLK; 1131 1132 again: 1133 wait = 0; 1134 1135 if (catch_signals && signal_pending(current)) { 1136 ret = -ERESTARTSYS; 1137 goto out; 1138 } 1139 1140 spin_lock_irqsave(&lockres->l_lock, flags); 1141 1142 mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING, 1143 "Cluster lock called on freeing lockres %s! flags " 1144 "0x%lx\n", lockres->l_name, lockres->l_flags); 1145 1146 /* We only compare against the currently granted level 1147 * here. If the lock is blocked waiting on a downconvert, 1148 * we'll get caught below. */ 1149 if (lockres->l_flags & OCFS2_LOCK_BUSY && 1150 level > lockres->l_level) { 1151 /* is someone sitting in dlm_lock? If so, wait on 1152 * them. */ 1153 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1154 wait = 1; 1155 goto unlock; 1156 } 1157 1158 if (lockres->l_flags & OCFS2_LOCK_BLOCKED && 1159 !ocfs2_may_continue_on_blocked_lock(lockres, level)) { 1160 /* is the lock is currently blocked on behalf of 1161 * another node */ 1162 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BLOCKED, 0); 1163 wait = 1; 1164 goto unlock; 1165 } 1166 1167 if (level > lockres->l_level) { 1168 if (noqueue_attempted > 0) { 1169 ret = -EAGAIN; 1170 goto unlock; 1171 } 1172 if (lkm_flags & DLM_LKF_NOQUEUE) 1173 noqueue_attempted = 1; 1174 1175 if (lockres->l_action != OCFS2_AST_INVALID) 1176 mlog(ML_ERROR, "lockres %s has action %u pending\n", 1177 lockres->l_name, lockres->l_action); 1178 1179 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 1180 lockres->l_action = OCFS2_AST_ATTACH; 1181 lkm_flags &= ~DLM_LKF_CONVERT; 1182 } else { 1183 lockres->l_action = OCFS2_AST_CONVERT; 1184 lkm_flags |= DLM_LKF_CONVERT; 1185 } 1186 1187 lockres->l_requested = level; 1188 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 1189 gen = lockres_set_pending(lockres); 1190 spin_unlock_irqrestore(&lockres->l_lock, flags); 1191 1192 BUG_ON(level == DLM_LOCK_IV); 1193 BUG_ON(level == DLM_LOCK_NL); 1194 1195 mlog(0, "lock %s, convert from %d to level = %d\n", 1196 lockres->l_name, lockres->l_level, level); 1197 1198 /* call dlm_lock to upgrade lock now */ 1199 ret = ocfs2_dlm_lock(osb->cconn, 1200 level, 1201 &lockres->l_lksb, 1202 lkm_flags, 1203 lockres->l_name, 1204 OCFS2_LOCK_ID_MAX_LEN - 1, 1205 lockres); 1206 lockres_clear_pending(lockres, gen, osb); 1207 if (ret) { 1208 if (!(lkm_flags & DLM_LKF_NOQUEUE) || 1209 (ret != -EAGAIN)) { 1210 ocfs2_log_dlm_error("ocfs2_dlm_lock", 1211 ret, lockres); 1212 } 1213 ocfs2_recover_from_dlm_error(lockres, 1); 1214 goto out; 1215 } 1216 1217 mlog(0, "lock %s, successfull return from ocfs2_dlm_lock\n", 1218 lockres->l_name); 1219 1220 /* At this point we've gone inside the dlm and need to 1221 * complete our work regardless. */ 1222 catch_signals = 0; 1223 1224 /* wait for busy to clear and carry on */ 1225 goto again; 1226 } 1227 1228 /* Ok, if we get here then we're good to go. */ 1229 ocfs2_inc_holders(lockres, level); 1230 1231 ret = 0; 1232 unlock: 1233 spin_unlock_irqrestore(&lockres->l_lock, flags); 1234 out: 1235 /* 1236 * This is helping work around a lock inversion between the page lock 1237 * and dlm locks. One path holds the page lock while calling aops 1238 * which block acquiring dlm locks. The voting thread holds dlm 1239 * locks while acquiring page locks while down converting data locks. 1240 * This block is helping an aop path notice the inversion and back 1241 * off to unlock its page lock before trying the dlm lock again. 1242 */ 1243 if (wait && arg_flags & OCFS2_LOCK_NONBLOCK && 1244 mw.mw_mask & (OCFS2_LOCK_BUSY|OCFS2_LOCK_BLOCKED)) { 1245 wait = 0; 1246 if (lockres_remove_mask_waiter(lockres, &mw)) 1247 ret = -EAGAIN; 1248 else 1249 goto again; 1250 } 1251 if (wait) { 1252 ret = ocfs2_wait_for_mask(&mw); 1253 if (ret == 0) 1254 goto again; 1255 mlog_errno(ret); 1256 } 1257 1258 mlog_exit(ret); 1259 return ret; 1260 } 1261 1262 static void ocfs2_cluster_unlock(struct ocfs2_super *osb, 1263 struct ocfs2_lock_res *lockres, 1264 int level) 1265 { 1266 unsigned long flags; 1267 1268 mlog_entry_void(); 1269 spin_lock_irqsave(&lockres->l_lock, flags); 1270 ocfs2_dec_holders(lockres, level); 1271 ocfs2_downconvert_on_unlock(osb, lockres); 1272 spin_unlock_irqrestore(&lockres->l_lock, flags); 1273 mlog_exit_void(); 1274 } 1275 1276 static int ocfs2_create_new_lock(struct ocfs2_super *osb, 1277 struct ocfs2_lock_res *lockres, 1278 int ex, 1279 int local) 1280 { 1281 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 1282 unsigned long flags; 1283 u32 lkm_flags = local ? DLM_LKF_LOCAL : 0; 1284 1285 spin_lock_irqsave(&lockres->l_lock, flags); 1286 BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED); 1287 lockres_or_flags(lockres, OCFS2_LOCK_LOCAL); 1288 spin_unlock_irqrestore(&lockres->l_lock, flags); 1289 1290 return ocfs2_lock_create(osb, lockres, level, lkm_flags); 1291 } 1292 1293 /* Grants us an EX lock on the data and metadata resources, skipping 1294 * the normal cluster directory lookup. Use this ONLY on newly created 1295 * inodes which other nodes can't possibly see, and which haven't been 1296 * hashed in the inode hash yet. This can give us a good performance 1297 * increase as it'll skip the network broadcast normally associated 1298 * with creating a new lock resource. */ 1299 int ocfs2_create_new_inode_locks(struct inode *inode) 1300 { 1301 int ret; 1302 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1303 1304 BUG_ON(!inode); 1305 BUG_ON(!ocfs2_inode_is_new(inode)); 1306 1307 mlog_entry_void(); 1308 1309 mlog(0, "Inode %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno); 1310 1311 /* NOTE: That we don't increment any of the holder counts, nor 1312 * do we add anything to a journal handle. Since this is 1313 * supposed to be a new inode which the cluster doesn't know 1314 * about yet, there is no need to. As far as the LVB handling 1315 * is concerned, this is basically like acquiring an EX lock 1316 * on a resource which has an invalid one -- we'll set it 1317 * valid when we release the EX. */ 1318 1319 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1, 1); 1320 if (ret) { 1321 mlog_errno(ret); 1322 goto bail; 1323 } 1324 1325 /* 1326 * We don't want to use DLM_LKF_LOCAL on a meta data lock as they 1327 * don't use a generation in their lock names. 1328 */ 1329 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_inode_lockres, 1, 0); 1330 if (ret) { 1331 mlog_errno(ret); 1332 goto bail; 1333 } 1334 1335 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_open_lockres, 0, 0); 1336 if (ret) { 1337 mlog_errno(ret); 1338 goto bail; 1339 } 1340 1341 bail: 1342 mlog_exit(ret); 1343 return ret; 1344 } 1345 1346 int ocfs2_rw_lock(struct inode *inode, int write) 1347 { 1348 int status, level; 1349 struct ocfs2_lock_res *lockres; 1350 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1351 1352 BUG_ON(!inode); 1353 1354 mlog_entry_void(); 1355 1356 mlog(0, "inode %llu take %s RW lock\n", 1357 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1358 write ? "EXMODE" : "PRMODE"); 1359 1360 if (ocfs2_mount_local(osb)) 1361 return 0; 1362 1363 lockres = &OCFS2_I(inode)->ip_rw_lockres; 1364 1365 level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1366 1367 status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0, 1368 0); 1369 if (status < 0) 1370 mlog_errno(status); 1371 1372 mlog_exit(status); 1373 return status; 1374 } 1375 1376 void ocfs2_rw_unlock(struct inode *inode, int write) 1377 { 1378 int level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1379 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres; 1380 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1381 1382 mlog_entry_void(); 1383 1384 mlog(0, "inode %llu drop %s RW lock\n", 1385 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1386 write ? "EXMODE" : "PRMODE"); 1387 1388 if (!ocfs2_mount_local(osb)) 1389 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level); 1390 1391 mlog_exit_void(); 1392 } 1393 1394 /* 1395 * ocfs2_open_lock always get PR mode lock. 1396 */ 1397 int ocfs2_open_lock(struct inode *inode) 1398 { 1399 int status = 0; 1400 struct ocfs2_lock_res *lockres; 1401 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1402 1403 BUG_ON(!inode); 1404 1405 mlog_entry_void(); 1406 1407 mlog(0, "inode %llu take PRMODE open lock\n", 1408 (unsigned long long)OCFS2_I(inode)->ip_blkno); 1409 1410 if (ocfs2_mount_local(osb)) 1411 goto out; 1412 1413 lockres = &OCFS2_I(inode)->ip_open_lockres; 1414 1415 status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, 1416 DLM_LOCK_PR, 0, 0); 1417 if (status < 0) 1418 mlog_errno(status); 1419 1420 out: 1421 mlog_exit(status); 1422 return status; 1423 } 1424 1425 int ocfs2_try_open_lock(struct inode *inode, int write) 1426 { 1427 int status = 0, level; 1428 struct ocfs2_lock_res *lockres; 1429 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1430 1431 BUG_ON(!inode); 1432 1433 mlog_entry_void(); 1434 1435 mlog(0, "inode %llu try to take %s open lock\n", 1436 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1437 write ? "EXMODE" : "PRMODE"); 1438 1439 if (ocfs2_mount_local(osb)) 1440 goto out; 1441 1442 lockres = &OCFS2_I(inode)->ip_open_lockres; 1443 1444 level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1445 1446 /* 1447 * The file system may already holding a PRMODE/EXMODE open lock. 1448 * Since we pass DLM_LKF_NOQUEUE, the request won't block waiting on 1449 * other nodes and the -EAGAIN will indicate to the caller that 1450 * this inode is still in use. 1451 */ 1452 status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, 1453 level, DLM_LKF_NOQUEUE, 0); 1454 1455 out: 1456 mlog_exit(status); 1457 return status; 1458 } 1459 1460 /* 1461 * ocfs2_open_unlock unlock PR and EX mode open locks. 1462 */ 1463 void ocfs2_open_unlock(struct inode *inode) 1464 { 1465 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_open_lockres; 1466 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1467 1468 mlog_entry_void(); 1469 1470 mlog(0, "inode %llu drop open lock\n", 1471 (unsigned long long)OCFS2_I(inode)->ip_blkno); 1472 1473 if (ocfs2_mount_local(osb)) 1474 goto out; 1475 1476 if(lockres->l_ro_holders) 1477 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, 1478 DLM_LOCK_PR); 1479 if(lockres->l_ex_holders) 1480 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, 1481 DLM_LOCK_EX); 1482 1483 out: 1484 mlog_exit_void(); 1485 } 1486 1487 static int ocfs2_flock_handle_signal(struct ocfs2_lock_res *lockres, 1488 int level) 1489 { 1490 int ret; 1491 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 1492 unsigned long flags; 1493 struct ocfs2_mask_waiter mw; 1494 1495 ocfs2_init_mask_waiter(&mw); 1496 1497 retry_cancel: 1498 spin_lock_irqsave(&lockres->l_lock, flags); 1499 if (lockres->l_flags & OCFS2_LOCK_BUSY) { 1500 ret = ocfs2_prepare_cancel_convert(osb, lockres); 1501 if (ret) { 1502 spin_unlock_irqrestore(&lockres->l_lock, flags); 1503 ret = ocfs2_cancel_convert(osb, lockres); 1504 if (ret < 0) { 1505 mlog_errno(ret); 1506 goto out; 1507 } 1508 goto retry_cancel; 1509 } 1510 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1511 spin_unlock_irqrestore(&lockres->l_lock, flags); 1512 1513 ocfs2_wait_for_mask(&mw); 1514 goto retry_cancel; 1515 } 1516 1517 ret = -ERESTARTSYS; 1518 /* 1519 * We may still have gotten the lock, in which case there's no 1520 * point to restarting the syscall. 1521 */ 1522 if (lockres->l_level == level) 1523 ret = 0; 1524 1525 mlog(0, "Cancel returning %d. flags: 0x%lx, level: %d, act: %d\n", ret, 1526 lockres->l_flags, lockres->l_level, lockres->l_action); 1527 1528 spin_unlock_irqrestore(&lockres->l_lock, flags); 1529 1530 out: 1531 return ret; 1532 } 1533 1534 /* 1535 * ocfs2_file_lock() and ocfs2_file_unlock() map to a single pair of 1536 * flock() calls. The locking approach this requires is sufficiently 1537 * different from all other cluster lock types that we implement a 1538 * seperate path to the "low-level" dlm calls. In particular: 1539 * 1540 * - No optimization of lock levels is done - we take at exactly 1541 * what's been requested. 1542 * 1543 * - No lock caching is employed. We immediately downconvert to 1544 * no-lock at unlock time. This also means flock locks never go on 1545 * the blocking list). 1546 * 1547 * - Since userspace can trivially deadlock itself with flock, we make 1548 * sure to allow cancellation of a misbehaving applications flock() 1549 * request. 1550 * 1551 * - Access to any flock lockres doesn't require concurrency, so we 1552 * can simplify the code by requiring the caller to guarantee 1553 * serialization of dlmglue flock calls. 1554 */ 1555 int ocfs2_file_lock(struct file *file, int ex, int trylock) 1556 { 1557 int ret, level = ex ? LKM_EXMODE : LKM_PRMODE; 1558 unsigned int lkm_flags = trylock ? LKM_NOQUEUE : 0; 1559 unsigned long flags; 1560 struct ocfs2_file_private *fp = file->private_data; 1561 struct ocfs2_lock_res *lockres = &fp->fp_flock; 1562 struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb); 1563 struct ocfs2_mask_waiter mw; 1564 1565 ocfs2_init_mask_waiter(&mw); 1566 1567 if ((lockres->l_flags & OCFS2_LOCK_BUSY) || 1568 (lockres->l_level > DLM_LOCK_NL)) { 1569 mlog(ML_ERROR, 1570 "File lock \"%s\" has busy or locked state: flags: 0x%lx, " 1571 "level: %u\n", lockres->l_name, lockres->l_flags, 1572 lockres->l_level); 1573 return -EINVAL; 1574 } 1575 1576 spin_lock_irqsave(&lockres->l_lock, flags); 1577 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 1578 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1579 spin_unlock_irqrestore(&lockres->l_lock, flags); 1580 1581 /* 1582 * Get the lock at NLMODE to start - that way we 1583 * can cancel the upconvert request if need be. 1584 */ 1585 ret = ocfs2_lock_create(osb, lockres, LKM_NLMODE, 0); 1586 if (ret < 0) { 1587 mlog_errno(ret); 1588 goto out; 1589 } 1590 1591 ret = ocfs2_wait_for_mask(&mw); 1592 if (ret) { 1593 mlog_errno(ret); 1594 goto out; 1595 } 1596 spin_lock_irqsave(&lockres->l_lock, flags); 1597 } 1598 1599 lockres->l_action = OCFS2_AST_CONVERT; 1600 lkm_flags |= LKM_CONVERT; 1601 lockres->l_requested = level; 1602 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 1603 1604 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1605 spin_unlock_irqrestore(&lockres->l_lock, flags); 1606 1607 ret = ocfs2_dlm_lock(osb->cconn, level, &lockres->l_lksb, lkm_flags, 1608 lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1, 1609 lockres); 1610 if (ret) { 1611 if (!trylock || (ret != -EAGAIN)) { 1612 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 1613 ret = -EINVAL; 1614 } 1615 1616 ocfs2_recover_from_dlm_error(lockres, 1); 1617 lockres_remove_mask_waiter(lockres, &mw); 1618 goto out; 1619 } 1620 1621 ret = ocfs2_wait_for_mask_interruptible(&mw, lockres); 1622 if (ret == -ERESTARTSYS) { 1623 /* 1624 * Userspace can cause deadlock itself with 1625 * flock(). Current behavior locally is to allow the 1626 * deadlock, but abort the system call if a signal is 1627 * received. We follow this example, otherwise a 1628 * poorly written program could sit in kernel until 1629 * reboot. 1630 * 1631 * Handling this is a bit more complicated for Ocfs2 1632 * though. We can't exit this function with an 1633 * outstanding lock request, so a cancel convert is 1634 * required. We intentionally overwrite 'ret' - if the 1635 * cancel fails and the lock was granted, it's easier 1636 * to just bubble sucess back up to the user. 1637 */ 1638 ret = ocfs2_flock_handle_signal(lockres, level); 1639 } else if (!ret && (level > lockres->l_level)) { 1640 /* Trylock failed asynchronously */ 1641 BUG_ON(!trylock); 1642 ret = -EAGAIN; 1643 } 1644 1645 out: 1646 1647 mlog(0, "Lock: \"%s\" ex: %d, trylock: %d, returns: %d\n", 1648 lockres->l_name, ex, trylock, ret); 1649 return ret; 1650 } 1651 1652 void ocfs2_file_unlock(struct file *file) 1653 { 1654 int ret; 1655 unsigned int gen; 1656 unsigned long flags; 1657 struct ocfs2_file_private *fp = file->private_data; 1658 struct ocfs2_lock_res *lockres = &fp->fp_flock; 1659 struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb); 1660 struct ocfs2_mask_waiter mw; 1661 1662 ocfs2_init_mask_waiter(&mw); 1663 1664 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) 1665 return; 1666 1667 if (lockres->l_level == LKM_NLMODE) 1668 return; 1669 1670 mlog(0, "Unlock: \"%s\" flags: 0x%lx, level: %d, act: %d\n", 1671 lockres->l_name, lockres->l_flags, lockres->l_level, 1672 lockres->l_action); 1673 1674 spin_lock_irqsave(&lockres->l_lock, flags); 1675 /* 1676 * Fake a blocking ast for the downconvert code. 1677 */ 1678 lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED); 1679 lockres->l_blocking = DLM_LOCK_EX; 1680 1681 gen = ocfs2_prepare_downconvert(lockres, LKM_NLMODE); 1682 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1683 spin_unlock_irqrestore(&lockres->l_lock, flags); 1684 1685 ret = ocfs2_downconvert_lock(osb, lockres, LKM_NLMODE, 0, gen); 1686 if (ret) { 1687 mlog_errno(ret); 1688 return; 1689 } 1690 1691 ret = ocfs2_wait_for_mask(&mw); 1692 if (ret) 1693 mlog_errno(ret); 1694 } 1695 1696 static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb, 1697 struct ocfs2_lock_res *lockres) 1698 { 1699 int kick = 0; 1700 1701 mlog_entry_void(); 1702 1703 /* If we know that another node is waiting on our lock, kick 1704 * the downconvert thread * pre-emptively when we reach a release 1705 * condition. */ 1706 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) { 1707 switch(lockres->l_blocking) { 1708 case DLM_LOCK_EX: 1709 if (!lockres->l_ex_holders && !lockres->l_ro_holders) 1710 kick = 1; 1711 break; 1712 case DLM_LOCK_PR: 1713 if (!lockres->l_ex_holders) 1714 kick = 1; 1715 break; 1716 default: 1717 BUG(); 1718 } 1719 } 1720 1721 if (kick) 1722 ocfs2_wake_downconvert_thread(osb); 1723 1724 mlog_exit_void(); 1725 } 1726 1727 #define OCFS2_SEC_BITS 34 1728 #define OCFS2_SEC_SHIFT (64 - 34) 1729 #define OCFS2_NSEC_MASK ((1ULL << OCFS2_SEC_SHIFT) - 1) 1730 1731 /* LVB only has room for 64 bits of time here so we pack it for 1732 * now. */ 1733 static u64 ocfs2_pack_timespec(struct timespec *spec) 1734 { 1735 u64 res; 1736 u64 sec = spec->tv_sec; 1737 u32 nsec = spec->tv_nsec; 1738 1739 res = (sec << OCFS2_SEC_SHIFT) | (nsec & OCFS2_NSEC_MASK); 1740 1741 return res; 1742 } 1743 1744 /* Call this with the lockres locked. I am reasonably sure we don't 1745 * need ip_lock in this function as anyone who would be changing those 1746 * values is supposed to be blocked in ocfs2_inode_lock right now. */ 1747 static void __ocfs2_stuff_meta_lvb(struct inode *inode) 1748 { 1749 struct ocfs2_inode_info *oi = OCFS2_I(inode); 1750 struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; 1751 struct ocfs2_meta_lvb *lvb; 1752 1753 mlog_entry_void(); 1754 1755 lvb = (struct ocfs2_meta_lvb *)ocfs2_dlm_lvb(&lockres->l_lksb); 1756 1757 /* 1758 * Invalidate the LVB of a deleted inode - this way other 1759 * nodes are forced to go to disk and discover the new inode 1760 * status. 1761 */ 1762 if (oi->ip_flags & OCFS2_INODE_DELETED) { 1763 lvb->lvb_version = 0; 1764 goto out; 1765 } 1766 1767 lvb->lvb_version = OCFS2_LVB_VERSION; 1768 lvb->lvb_isize = cpu_to_be64(i_size_read(inode)); 1769 lvb->lvb_iclusters = cpu_to_be32(oi->ip_clusters); 1770 lvb->lvb_iuid = cpu_to_be32(inode->i_uid); 1771 lvb->lvb_igid = cpu_to_be32(inode->i_gid); 1772 lvb->lvb_imode = cpu_to_be16(inode->i_mode); 1773 lvb->lvb_inlink = cpu_to_be16(inode->i_nlink); 1774 lvb->lvb_iatime_packed = 1775 cpu_to_be64(ocfs2_pack_timespec(&inode->i_atime)); 1776 lvb->lvb_ictime_packed = 1777 cpu_to_be64(ocfs2_pack_timespec(&inode->i_ctime)); 1778 lvb->lvb_imtime_packed = 1779 cpu_to_be64(ocfs2_pack_timespec(&inode->i_mtime)); 1780 lvb->lvb_iattr = cpu_to_be32(oi->ip_attr); 1781 lvb->lvb_idynfeatures = cpu_to_be16(oi->ip_dyn_features); 1782 lvb->lvb_igeneration = cpu_to_be32(inode->i_generation); 1783 1784 out: 1785 mlog_meta_lvb(0, lockres); 1786 1787 mlog_exit_void(); 1788 } 1789 1790 static void ocfs2_unpack_timespec(struct timespec *spec, 1791 u64 packed_time) 1792 { 1793 spec->tv_sec = packed_time >> OCFS2_SEC_SHIFT; 1794 spec->tv_nsec = packed_time & OCFS2_NSEC_MASK; 1795 } 1796 1797 static void ocfs2_refresh_inode_from_lvb(struct inode *inode) 1798 { 1799 struct ocfs2_inode_info *oi = OCFS2_I(inode); 1800 struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; 1801 struct ocfs2_meta_lvb *lvb; 1802 1803 mlog_entry_void(); 1804 1805 mlog_meta_lvb(0, lockres); 1806 1807 lvb = (struct ocfs2_meta_lvb *)ocfs2_dlm_lvb(&lockres->l_lksb); 1808 1809 /* We're safe here without the lockres lock... */ 1810 spin_lock(&oi->ip_lock); 1811 oi->ip_clusters = be32_to_cpu(lvb->lvb_iclusters); 1812 i_size_write(inode, be64_to_cpu(lvb->lvb_isize)); 1813 1814 oi->ip_attr = be32_to_cpu(lvb->lvb_iattr); 1815 oi->ip_dyn_features = be16_to_cpu(lvb->lvb_idynfeatures); 1816 ocfs2_set_inode_flags(inode); 1817 1818 /* fast-symlinks are a special case */ 1819 if (S_ISLNK(inode->i_mode) && !oi->ip_clusters) 1820 inode->i_blocks = 0; 1821 else 1822 inode->i_blocks = ocfs2_inode_sector_count(inode); 1823 1824 inode->i_uid = be32_to_cpu(lvb->lvb_iuid); 1825 inode->i_gid = be32_to_cpu(lvb->lvb_igid); 1826 inode->i_mode = be16_to_cpu(lvb->lvb_imode); 1827 inode->i_nlink = be16_to_cpu(lvb->lvb_inlink); 1828 ocfs2_unpack_timespec(&inode->i_atime, 1829 be64_to_cpu(lvb->lvb_iatime_packed)); 1830 ocfs2_unpack_timespec(&inode->i_mtime, 1831 be64_to_cpu(lvb->lvb_imtime_packed)); 1832 ocfs2_unpack_timespec(&inode->i_ctime, 1833 be64_to_cpu(lvb->lvb_ictime_packed)); 1834 spin_unlock(&oi->ip_lock); 1835 1836 mlog_exit_void(); 1837 } 1838 1839 static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode, 1840 struct ocfs2_lock_res *lockres) 1841 { 1842 struct ocfs2_meta_lvb *lvb = 1843 (struct ocfs2_meta_lvb *)ocfs2_dlm_lvb(&lockres->l_lksb); 1844 1845 if (lvb->lvb_version == OCFS2_LVB_VERSION 1846 && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation) 1847 return 1; 1848 return 0; 1849 } 1850 1851 /* Determine whether a lock resource needs to be refreshed, and 1852 * arbitrate who gets to refresh it. 1853 * 1854 * 0 means no refresh needed. 1855 * 1856 * > 0 means you need to refresh this and you MUST call 1857 * ocfs2_complete_lock_res_refresh afterwards. */ 1858 static int ocfs2_should_refresh_lock_res(struct ocfs2_lock_res *lockres) 1859 { 1860 unsigned long flags; 1861 int status = 0; 1862 1863 mlog_entry_void(); 1864 1865 refresh_check: 1866 spin_lock_irqsave(&lockres->l_lock, flags); 1867 if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) { 1868 spin_unlock_irqrestore(&lockres->l_lock, flags); 1869 goto bail; 1870 } 1871 1872 if (lockres->l_flags & OCFS2_LOCK_REFRESHING) { 1873 spin_unlock_irqrestore(&lockres->l_lock, flags); 1874 1875 ocfs2_wait_on_refreshing_lock(lockres); 1876 goto refresh_check; 1877 } 1878 1879 /* Ok, I'll be the one to refresh this lock. */ 1880 lockres_or_flags(lockres, OCFS2_LOCK_REFRESHING); 1881 spin_unlock_irqrestore(&lockres->l_lock, flags); 1882 1883 status = 1; 1884 bail: 1885 mlog_exit(status); 1886 return status; 1887 } 1888 1889 /* If status is non zero, I'll mark it as not being in refresh 1890 * anymroe, but i won't clear the needs refresh flag. */ 1891 static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockres, 1892 int status) 1893 { 1894 unsigned long flags; 1895 mlog_entry_void(); 1896 1897 spin_lock_irqsave(&lockres->l_lock, flags); 1898 lockres_clear_flags(lockres, OCFS2_LOCK_REFRESHING); 1899 if (!status) 1900 lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 1901 spin_unlock_irqrestore(&lockres->l_lock, flags); 1902 1903 wake_up(&lockres->l_event); 1904 1905 mlog_exit_void(); 1906 } 1907 1908 /* may or may not return a bh if it went to disk. */ 1909 static int ocfs2_inode_lock_update(struct inode *inode, 1910 struct buffer_head **bh) 1911 { 1912 int status = 0; 1913 struct ocfs2_inode_info *oi = OCFS2_I(inode); 1914 struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; 1915 struct ocfs2_dinode *fe; 1916 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1917 1918 mlog_entry_void(); 1919 1920 if (ocfs2_mount_local(osb)) 1921 goto bail; 1922 1923 spin_lock(&oi->ip_lock); 1924 if (oi->ip_flags & OCFS2_INODE_DELETED) { 1925 mlog(0, "Orphaned inode %llu was deleted while we " 1926 "were waiting on a lock. ip_flags = 0x%x\n", 1927 (unsigned long long)oi->ip_blkno, oi->ip_flags); 1928 spin_unlock(&oi->ip_lock); 1929 status = -ENOENT; 1930 goto bail; 1931 } 1932 spin_unlock(&oi->ip_lock); 1933 1934 if (!ocfs2_should_refresh_lock_res(lockres)) 1935 goto bail; 1936 1937 /* This will discard any caching information we might have had 1938 * for the inode metadata. */ 1939 ocfs2_metadata_cache_purge(inode); 1940 1941 ocfs2_extent_map_trunc(inode, 0); 1942 1943 if (ocfs2_meta_lvb_is_trustable(inode, lockres)) { 1944 mlog(0, "Trusting LVB on inode %llu\n", 1945 (unsigned long long)oi->ip_blkno); 1946 ocfs2_refresh_inode_from_lvb(inode); 1947 } else { 1948 /* Boo, we have to go to disk. */ 1949 /* read bh, cast, ocfs2_refresh_inode */ 1950 status = ocfs2_read_block(OCFS2_SB(inode->i_sb), oi->ip_blkno, 1951 bh, OCFS2_BH_CACHED, inode); 1952 if (status < 0) { 1953 mlog_errno(status); 1954 goto bail_refresh; 1955 } 1956 fe = (struct ocfs2_dinode *) (*bh)->b_data; 1957 1958 /* This is a good chance to make sure we're not 1959 * locking an invalid object. 1960 * 1961 * We bug on a stale inode here because we checked 1962 * above whether it was wiped from disk. The wiping 1963 * node provides a guarantee that we receive that 1964 * message and can mark the inode before dropping any 1965 * locks associated with it. */ 1966 if (!OCFS2_IS_VALID_DINODE(fe)) { 1967 OCFS2_RO_ON_INVALID_DINODE(inode->i_sb, fe); 1968 status = -EIO; 1969 goto bail_refresh; 1970 } 1971 mlog_bug_on_msg(inode->i_generation != 1972 le32_to_cpu(fe->i_generation), 1973 "Invalid dinode %llu disk generation: %u " 1974 "inode->i_generation: %u\n", 1975 (unsigned long long)oi->ip_blkno, 1976 le32_to_cpu(fe->i_generation), 1977 inode->i_generation); 1978 mlog_bug_on_msg(le64_to_cpu(fe->i_dtime) || 1979 !(fe->i_flags & cpu_to_le32(OCFS2_VALID_FL)), 1980 "Stale dinode %llu dtime: %llu flags: 0x%x\n", 1981 (unsigned long long)oi->ip_blkno, 1982 (unsigned long long)le64_to_cpu(fe->i_dtime), 1983 le32_to_cpu(fe->i_flags)); 1984 1985 ocfs2_refresh_inode(inode, fe); 1986 } 1987 1988 status = 0; 1989 bail_refresh: 1990 ocfs2_complete_lock_res_refresh(lockres, status); 1991 bail: 1992 mlog_exit(status); 1993 return status; 1994 } 1995 1996 static int ocfs2_assign_bh(struct inode *inode, 1997 struct buffer_head **ret_bh, 1998 struct buffer_head *passed_bh) 1999 { 2000 int status; 2001 2002 if (passed_bh) { 2003 /* Ok, the update went to disk for us, use the 2004 * returned bh. */ 2005 *ret_bh = passed_bh; 2006 get_bh(*ret_bh); 2007 2008 return 0; 2009 } 2010 2011 status = ocfs2_read_block(OCFS2_SB(inode->i_sb), 2012 OCFS2_I(inode)->ip_blkno, 2013 ret_bh, 2014 OCFS2_BH_CACHED, 2015 inode); 2016 if (status < 0) 2017 mlog_errno(status); 2018 2019 return status; 2020 } 2021 2022 /* 2023 * returns < 0 error if the callback will never be called, otherwise 2024 * the result of the lock will be communicated via the callback. 2025 */ 2026 int ocfs2_inode_lock_full(struct inode *inode, 2027 struct buffer_head **ret_bh, 2028 int ex, 2029 int arg_flags) 2030 { 2031 int status, level, acquired; 2032 u32 dlm_flags; 2033 struct ocfs2_lock_res *lockres = NULL; 2034 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 2035 struct buffer_head *local_bh = NULL; 2036 2037 BUG_ON(!inode); 2038 2039 mlog_entry_void(); 2040 2041 mlog(0, "inode %llu, take %s META lock\n", 2042 (unsigned long long)OCFS2_I(inode)->ip_blkno, 2043 ex ? "EXMODE" : "PRMODE"); 2044 2045 status = 0; 2046 acquired = 0; 2047 /* We'll allow faking a readonly metadata lock for 2048 * rodevices. */ 2049 if (ocfs2_is_hard_readonly(osb)) { 2050 if (ex) 2051 status = -EROFS; 2052 goto bail; 2053 } 2054 2055 if (ocfs2_mount_local(osb)) 2056 goto local; 2057 2058 if (!(arg_flags & OCFS2_META_LOCK_RECOVERY)) 2059 ocfs2_wait_for_recovery(osb); 2060 2061 lockres = &OCFS2_I(inode)->ip_inode_lockres; 2062 level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2063 dlm_flags = 0; 2064 if (arg_flags & OCFS2_META_LOCK_NOQUEUE) 2065 dlm_flags |= DLM_LKF_NOQUEUE; 2066 2067 status = ocfs2_cluster_lock(osb, lockres, level, dlm_flags, arg_flags); 2068 if (status < 0) { 2069 if (status != -EAGAIN && status != -EIOCBRETRY) 2070 mlog_errno(status); 2071 goto bail; 2072 } 2073 2074 /* Notify the error cleanup path to drop the cluster lock. */ 2075 acquired = 1; 2076 2077 /* We wait twice because a node may have died while we were in 2078 * the lower dlm layers. The second time though, we've 2079 * committed to owning this lock so we don't allow signals to 2080 * abort the operation. */ 2081 if (!(arg_flags & OCFS2_META_LOCK_RECOVERY)) 2082 ocfs2_wait_for_recovery(osb); 2083 2084 local: 2085 /* 2086 * We only see this flag if we're being called from 2087 * ocfs2_read_locked_inode(). It means we're locking an inode 2088 * which hasn't been populated yet, so clear the refresh flag 2089 * and let the caller handle it. 2090 */ 2091 if (inode->i_state & I_NEW) { 2092 status = 0; 2093 if (lockres) 2094 ocfs2_complete_lock_res_refresh(lockres, 0); 2095 goto bail; 2096 } 2097 2098 /* This is fun. The caller may want a bh back, or it may 2099 * not. ocfs2_inode_lock_update definitely wants one in, but 2100 * may or may not read one, depending on what's in the 2101 * LVB. The result of all of this is that we've *only* gone to 2102 * disk if we have to, so the complexity is worthwhile. */ 2103 status = ocfs2_inode_lock_update(inode, &local_bh); 2104 if (status < 0) { 2105 if (status != -ENOENT) 2106 mlog_errno(status); 2107 goto bail; 2108 } 2109 2110 if (ret_bh) { 2111 status = ocfs2_assign_bh(inode, ret_bh, local_bh); 2112 if (status < 0) { 2113 mlog_errno(status); 2114 goto bail; 2115 } 2116 } 2117 2118 bail: 2119 if (status < 0) { 2120 if (ret_bh && (*ret_bh)) { 2121 brelse(*ret_bh); 2122 *ret_bh = NULL; 2123 } 2124 if (acquired) 2125 ocfs2_inode_unlock(inode, ex); 2126 } 2127 2128 if (local_bh) 2129 brelse(local_bh); 2130 2131 mlog_exit(status); 2132 return status; 2133 } 2134 2135 /* 2136 * This is working around a lock inversion between tasks acquiring DLM 2137 * locks while holding a page lock and the downconvert thread which 2138 * blocks dlm lock acquiry while acquiring page locks. 2139 * 2140 * ** These _with_page variantes are only intended to be called from aop 2141 * methods that hold page locks and return a very specific *positive* error 2142 * code that aop methods pass up to the VFS -- test for errors with != 0. ** 2143 * 2144 * The DLM is called such that it returns -EAGAIN if it would have 2145 * blocked waiting for the downconvert thread. In that case we unlock 2146 * our page so the downconvert thread can make progress. Once we've 2147 * done this we have to return AOP_TRUNCATED_PAGE so the aop method 2148 * that called us can bubble that back up into the VFS who will then 2149 * immediately retry the aop call. 2150 * 2151 * We do a blocking lock and immediate unlock before returning, though, so that 2152 * the lock has a great chance of being cached on this node by the time the VFS 2153 * calls back to retry the aop. This has a potential to livelock as nodes 2154 * ping locks back and forth, but that's a risk we're willing to take to avoid 2155 * the lock inversion simply. 2156 */ 2157 int ocfs2_inode_lock_with_page(struct inode *inode, 2158 struct buffer_head **ret_bh, 2159 int ex, 2160 struct page *page) 2161 { 2162 int ret; 2163 2164 ret = ocfs2_inode_lock_full(inode, ret_bh, ex, OCFS2_LOCK_NONBLOCK); 2165 if (ret == -EAGAIN) { 2166 unlock_page(page); 2167 if (ocfs2_inode_lock(inode, ret_bh, ex) == 0) 2168 ocfs2_inode_unlock(inode, ex); 2169 ret = AOP_TRUNCATED_PAGE; 2170 } 2171 2172 return ret; 2173 } 2174 2175 int ocfs2_inode_lock_atime(struct inode *inode, 2176 struct vfsmount *vfsmnt, 2177 int *level) 2178 { 2179 int ret; 2180 2181 mlog_entry_void(); 2182 ret = ocfs2_inode_lock(inode, NULL, 0); 2183 if (ret < 0) { 2184 mlog_errno(ret); 2185 return ret; 2186 } 2187 2188 /* 2189 * If we should update atime, we will get EX lock, 2190 * otherwise we just get PR lock. 2191 */ 2192 if (ocfs2_should_update_atime(inode, vfsmnt)) { 2193 struct buffer_head *bh = NULL; 2194 2195 ocfs2_inode_unlock(inode, 0); 2196 ret = ocfs2_inode_lock(inode, &bh, 1); 2197 if (ret < 0) { 2198 mlog_errno(ret); 2199 return ret; 2200 } 2201 *level = 1; 2202 if (ocfs2_should_update_atime(inode, vfsmnt)) 2203 ocfs2_update_inode_atime(inode, bh); 2204 if (bh) 2205 brelse(bh); 2206 } else 2207 *level = 0; 2208 2209 mlog_exit(ret); 2210 return ret; 2211 } 2212 2213 void ocfs2_inode_unlock(struct inode *inode, 2214 int ex) 2215 { 2216 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2217 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_inode_lockres; 2218 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 2219 2220 mlog_entry_void(); 2221 2222 mlog(0, "inode %llu drop %s META lock\n", 2223 (unsigned long long)OCFS2_I(inode)->ip_blkno, 2224 ex ? "EXMODE" : "PRMODE"); 2225 2226 if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)) && 2227 !ocfs2_mount_local(osb)) 2228 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level); 2229 2230 mlog_exit_void(); 2231 } 2232 2233 int ocfs2_super_lock(struct ocfs2_super *osb, 2234 int ex) 2235 { 2236 int status = 0; 2237 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2238 struct ocfs2_lock_res *lockres = &osb->osb_super_lockres; 2239 2240 mlog_entry_void(); 2241 2242 if (ocfs2_is_hard_readonly(osb)) 2243 return -EROFS; 2244 2245 if (ocfs2_mount_local(osb)) 2246 goto bail; 2247 2248 status = ocfs2_cluster_lock(osb, lockres, level, 0, 0); 2249 if (status < 0) { 2250 mlog_errno(status); 2251 goto bail; 2252 } 2253 2254 /* The super block lock path is really in the best position to 2255 * know when resources covered by the lock need to be 2256 * refreshed, so we do it here. Of course, making sense of 2257 * everything is up to the caller :) */ 2258 status = ocfs2_should_refresh_lock_res(lockres); 2259 if (status < 0) { 2260 mlog_errno(status); 2261 goto bail; 2262 } 2263 if (status) { 2264 status = ocfs2_refresh_slot_info(osb); 2265 2266 ocfs2_complete_lock_res_refresh(lockres, status); 2267 2268 if (status < 0) 2269 mlog_errno(status); 2270 } 2271 bail: 2272 mlog_exit(status); 2273 return status; 2274 } 2275 2276 void ocfs2_super_unlock(struct ocfs2_super *osb, 2277 int ex) 2278 { 2279 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2280 struct ocfs2_lock_res *lockres = &osb->osb_super_lockres; 2281 2282 if (!ocfs2_mount_local(osb)) 2283 ocfs2_cluster_unlock(osb, lockres, level); 2284 } 2285 2286 int ocfs2_rename_lock(struct ocfs2_super *osb) 2287 { 2288 int status; 2289 struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres; 2290 2291 if (ocfs2_is_hard_readonly(osb)) 2292 return -EROFS; 2293 2294 if (ocfs2_mount_local(osb)) 2295 return 0; 2296 2297 status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0); 2298 if (status < 0) 2299 mlog_errno(status); 2300 2301 return status; 2302 } 2303 2304 void ocfs2_rename_unlock(struct ocfs2_super *osb) 2305 { 2306 struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres; 2307 2308 if (!ocfs2_mount_local(osb)) 2309 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX); 2310 } 2311 2312 int ocfs2_dentry_lock(struct dentry *dentry, int ex) 2313 { 2314 int ret; 2315 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2316 struct ocfs2_dentry_lock *dl = dentry->d_fsdata; 2317 struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb); 2318 2319 BUG_ON(!dl); 2320 2321 if (ocfs2_is_hard_readonly(osb)) 2322 return -EROFS; 2323 2324 if (ocfs2_mount_local(osb)) 2325 return 0; 2326 2327 ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0); 2328 if (ret < 0) 2329 mlog_errno(ret); 2330 2331 return ret; 2332 } 2333 2334 void ocfs2_dentry_unlock(struct dentry *dentry, int ex) 2335 { 2336 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2337 struct ocfs2_dentry_lock *dl = dentry->d_fsdata; 2338 struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb); 2339 2340 if (!ocfs2_mount_local(osb)) 2341 ocfs2_cluster_unlock(osb, &dl->dl_lockres, level); 2342 } 2343 2344 /* Reference counting of the dlm debug structure. We want this because 2345 * open references on the debug inodes can live on after a mount, so 2346 * we can't rely on the ocfs2_super to always exist. */ 2347 static void ocfs2_dlm_debug_free(struct kref *kref) 2348 { 2349 struct ocfs2_dlm_debug *dlm_debug; 2350 2351 dlm_debug = container_of(kref, struct ocfs2_dlm_debug, d_refcnt); 2352 2353 kfree(dlm_debug); 2354 } 2355 2356 void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug) 2357 { 2358 if (dlm_debug) 2359 kref_put(&dlm_debug->d_refcnt, ocfs2_dlm_debug_free); 2360 } 2361 2362 static void ocfs2_get_dlm_debug(struct ocfs2_dlm_debug *debug) 2363 { 2364 kref_get(&debug->d_refcnt); 2365 } 2366 2367 struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void) 2368 { 2369 struct ocfs2_dlm_debug *dlm_debug; 2370 2371 dlm_debug = kmalloc(sizeof(struct ocfs2_dlm_debug), GFP_KERNEL); 2372 if (!dlm_debug) { 2373 mlog_errno(-ENOMEM); 2374 goto out; 2375 } 2376 2377 kref_init(&dlm_debug->d_refcnt); 2378 INIT_LIST_HEAD(&dlm_debug->d_lockres_tracking); 2379 dlm_debug->d_locking_state = NULL; 2380 out: 2381 return dlm_debug; 2382 } 2383 2384 /* Access to this is arbitrated for us via seq_file->sem. */ 2385 struct ocfs2_dlm_seq_priv { 2386 struct ocfs2_dlm_debug *p_dlm_debug; 2387 struct ocfs2_lock_res p_iter_res; 2388 struct ocfs2_lock_res p_tmp_res; 2389 }; 2390 2391 static struct ocfs2_lock_res *ocfs2_dlm_next_res(struct ocfs2_lock_res *start, 2392 struct ocfs2_dlm_seq_priv *priv) 2393 { 2394 struct ocfs2_lock_res *iter, *ret = NULL; 2395 struct ocfs2_dlm_debug *dlm_debug = priv->p_dlm_debug; 2396 2397 assert_spin_locked(&ocfs2_dlm_tracking_lock); 2398 2399 list_for_each_entry(iter, &start->l_debug_list, l_debug_list) { 2400 /* discover the head of the list */ 2401 if (&iter->l_debug_list == &dlm_debug->d_lockres_tracking) { 2402 mlog(0, "End of list found, %p\n", ret); 2403 break; 2404 } 2405 2406 /* We track our "dummy" iteration lockres' by a NULL 2407 * l_ops field. */ 2408 if (iter->l_ops != NULL) { 2409 ret = iter; 2410 break; 2411 } 2412 } 2413 2414 return ret; 2415 } 2416 2417 static void *ocfs2_dlm_seq_start(struct seq_file *m, loff_t *pos) 2418 { 2419 struct ocfs2_dlm_seq_priv *priv = m->private; 2420 struct ocfs2_lock_res *iter; 2421 2422 spin_lock(&ocfs2_dlm_tracking_lock); 2423 iter = ocfs2_dlm_next_res(&priv->p_iter_res, priv); 2424 if (iter) { 2425 /* Since lockres' have the lifetime of their container 2426 * (which can be inodes, ocfs2_supers, etc) we want to 2427 * copy this out to a temporary lockres while still 2428 * under the spinlock. Obviously after this we can't 2429 * trust any pointers on the copy returned, but that's 2430 * ok as the information we want isn't typically held 2431 * in them. */ 2432 priv->p_tmp_res = *iter; 2433 iter = &priv->p_tmp_res; 2434 } 2435 spin_unlock(&ocfs2_dlm_tracking_lock); 2436 2437 return iter; 2438 } 2439 2440 static void ocfs2_dlm_seq_stop(struct seq_file *m, void *v) 2441 { 2442 } 2443 2444 static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos) 2445 { 2446 struct ocfs2_dlm_seq_priv *priv = m->private; 2447 struct ocfs2_lock_res *iter = v; 2448 struct ocfs2_lock_res *dummy = &priv->p_iter_res; 2449 2450 spin_lock(&ocfs2_dlm_tracking_lock); 2451 iter = ocfs2_dlm_next_res(iter, priv); 2452 list_del_init(&dummy->l_debug_list); 2453 if (iter) { 2454 list_add(&dummy->l_debug_list, &iter->l_debug_list); 2455 priv->p_tmp_res = *iter; 2456 iter = &priv->p_tmp_res; 2457 } 2458 spin_unlock(&ocfs2_dlm_tracking_lock); 2459 2460 return iter; 2461 } 2462 2463 /* So that debugfs.ocfs2 can determine which format is being used */ 2464 #define OCFS2_DLM_DEBUG_STR_VERSION 1 2465 static int ocfs2_dlm_seq_show(struct seq_file *m, void *v) 2466 { 2467 int i; 2468 char *lvb; 2469 struct ocfs2_lock_res *lockres = v; 2470 2471 if (!lockres) 2472 return -EINVAL; 2473 2474 seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION); 2475 2476 if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY) 2477 seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1, 2478 lockres->l_name, 2479 (unsigned int)ocfs2_get_dentry_lock_ino(lockres)); 2480 else 2481 seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name); 2482 2483 seq_printf(m, "%d\t" 2484 "0x%lx\t" 2485 "0x%x\t" 2486 "0x%x\t" 2487 "%u\t" 2488 "%u\t" 2489 "%d\t" 2490 "%d\t", 2491 lockres->l_level, 2492 lockres->l_flags, 2493 lockres->l_action, 2494 lockres->l_unlock_action, 2495 lockres->l_ro_holders, 2496 lockres->l_ex_holders, 2497 lockres->l_requested, 2498 lockres->l_blocking); 2499 2500 /* Dump the raw LVB */ 2501 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2502 for(i = 0; i < DLM_LVB_LEN; i++) 2503 seq_printf(m, "0x%x\t", lvb[i]); 2504 2505 /* End the line */ 2506 seq_printf(m, "\n"); 2507 return 0; 2508 } 2509 2510 static const struct seq_operations ocfs2_dlm_seq_ops = { 2511 .start = ocfs2_dlm_seq_start, 2512 .stop = ocfs2_dlm_seq_stop, 2513 .next = ocfs2_dlm_seq_next, 2514 .show = ocfs2_dlm_seq_show, 2515 }; 2516 2517 static int ocfs2_dlm_debug_release(struct inode *inode, struct file *file) 2518 { 2519 struct seq_file *seq = (struct seq_file *) file->private_data; 2520 struct ocfs2_dlm_seq_priv *priv = seq->private; 2521 struct ocfs2_lock_res *res = &priv->p_iter_res; 2522 2523 ocfs2_remove_lockres_tracking(res); 2524 ocfs2_put_dlm_debug(priv->p_dlm_debug); 2525 return seq_release_private(inode, file); 2526 } 2527 2528 static int ocfs2_dlm_debug_open(struct inode *inode, struct file *file) 2529 { 2530 int ret; 2531 struct ocfs2_dlm_seq_priv *priv; 2532 struct seq_file *seq; 2533 struct ocfs2_super *osb; 2534 2535 priv = kzalloc(sizeof(struct ocfs2_dlm_seq_priv), GFP_KERNEL); 2536 if (!priv) { 2537 ret = -ENOMEM; 2538 mlog_errno(ret); 2539 goto out; 2540 } 2541 osb = inode->i_private; 2542 ocfs2_get_dlm_debug(osb->osb_dlm_debug); 2543 priv->p_dlm_debug = osb->osb_dlm_debug; 2544 INIT_LIST_HEAD(&priv->p_iter_res.l_debug_list); 2545 2546 ret = seq_open(file, &ocfs2_dlm_seq_ops); 2547 if (ret) { 2548 kfree(priv); 2549 mlog_errno(ret); 2550 goto out; 2551 } 2552 2553 seq = (struct seq_file *) file->private_data; 2554 seq->private = priv; 2555 2556 ocfs2_add_lockres_tracking(&priv->p_iter_res, 2557 priv->p_dlm_debug); 2558 2559 out: 2560 return ret; 2561 } 2562 2563 static const struct file_operations ocfs2_dlm_debug_fops = { 2564 .open = ocfs2_dlm_debug_open, 2565 .release = ocfs2_dlm_debug_release, 2566 .read = seq_read, 2567 .llseek = seq_lseek, 2568 }; 2569 2570 static int ocfs2_dlm_init_debug(struct ocfs2_super *osb) 2571 { 2572 int ret = 0; 2573 struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug; 2574 2575 dlm_debug->d_locking_state = debugfs_create_file("locking_state", 2576 S_IFREG|S_IRUSR, 2577 osb->osb_debug_root, 2578 osb, 2579 &ocfs2_dlm_debug_fops); 2580 if (!dlm_debug->d_locking_state) { 2581 ret = -EINVAL; 2582 mlog(ML_ERROR, 2583 "Unable to create locking state debugfs file.\n"); 2584 goto out; 2585 } 2586 2587 ocfs2_get_dlm_debug(dlm_debug); 2588 out: 2589 return ret; 2590 } 2591 2592 static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb) 2593 { 2594 struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug; 2595 2596 if (dlm_debug) { 2597 debugfs_remove(dlm_debug->d_locking_state); 2598 ocfs2_put_dlm_debug(dlm_debug); 2599 } 2600 } 2601 2602 int ocfs2_dlm_init(struct ocfs2_super *osb) 2603 { 2604 int status = 0; 2605 struct ocfs2_cluster_connection *conn = NULL; 2606 2607 mlog_entry_void(); 2608 2609 if (ocfs2_mount_local(osb)) { 2610 osb->node_num = 0; 2611 goto local; 2612 } 2613 2614 status = ocfs2_dlm_init_debug(osb); 2615 if (status < 0) { 2616 mlog_errno(status); 2617 goto bail; 2618 } 2619 2620 /* launch downconvert thread */ 2621 osb->dc_task = kthread_run(ocfs2_downconvert_thread, osb, "ocfs2dc"); 2622 if (IS_ERR(osb->dc_task)) { 2623 status = PTR_ERR(osb->dc_task); 2624 osb->dc_task = NULL; 2625 mlog_errno(status); 2626 goto bail; 2627 } 2628 2629 /* for now, uuid == domain */ 2630 status = ocfs2_cluster_connect(osb->osb_cluster_stack, 2631 osb->uuid_str, 2632 strlen(osb->uuid_str), 2633 ocfs2_do_node_down, osb, 2634 &conn); 2635 if (status) { 2636 mlog_errno(status); 2637 goto bail; 2638 } 2639 2640 status = ocfs2_cluster_this_node(&osb->node_num); 2641 if (status < 0) { 2642 mlog_errno(status); 2643 mlog(ML_ERROR, 2644 "could not find this host's node number\n"); 2645 ocfs2_cluster_disconnect(conn, 0); 2646 goto bail; 2647 } 2648 2649 local: 2650 ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb); 2651 ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb); 2652 2653 osb->cconn = conn; 2654 2655 status = 0; 2656 bail: 2657 if (status < 0) { 2658 ocfs2_dlm_shutdown_debug(osb); 2659 if (osb->dc_task) 2660 kthread_stop(osb->dc_task); 2661 } 2662 2663 mlog_exit(status); 2664 return status; 2665 } 2666 2667 void ocfs2_dlm_shutdown(struct ocfs2_super *osb, 2668 int hangup_pending) 2669 { 2670 mlog_entry_void(); 2671 2672 ocfs2_drop_osb_locks(osb); 2673 2674 /* 2675 * Now that we have dropped all locks and ocfs2_dismount_volume() 2676 * has disabled recovery, the DLM won't be talking to us. It's 2677 * safe to tear things down before disconnecting the cluster. 2678 */ 2679 2680 if (osb->dc_task) { 2681 kthread_stop(osb->dc_task); 2682 osb->dc_task = NULL; 2683 } 2684 2685 ocfs2_lock_res_free(&osb->osb_super_lockres); 2686 ocfs2_lock_res_free(&osb->osb_rename_lockres); 2687 2688 ocfs2_cluster_disconnect(osb->cconn, hangup_pending); 2689 osb->cconn = NULL; 2690 2691 ocfs2_dlm_shutdown_debug(osb); 2692 2693 mlog_exit_void(); 2694 } 2695 2696 static void ocfs2_unlock_ast(void *opaque, int error) 2697 { 2698 struct ocfs2_lock_res *lockres = opaque; 2699 unsigned long flags; 2700 2701 mlog_entry_void(); 2702 2703 mlog(0, "UNLOCK AST called on lock %s, action = %d\n", lockres->l_name, 2704 lockres->l_unlock_action); 2705 2706 spin_lock_irqsave(&lockres->l_lock, flags); 2707 if (error) { 2708 mlog(ML_ERROR, "Dlm passes error %d for lock %s, " 2709 "unlock_action %d\n", error, lockres->l_name, 2710 lockres->l_unlock_action); 2711 spin_unlock_irqrestore(&lockres->l_lock, flags); 2712 return; 2713 } 2714 2715 switch(lockres->l_unlock_action) { 2716 case OCFS2_UNLOCK_CANCEL_CONVERT: 2717 mlog(0, "Cancel convert success for %s\n", lockres->l_name); 2718 lockres->l_action = OCFS2_AST_INVALID; 2719 break; 2720 case OCFS2_UNLOCK_DROP_LOCK: 2721 lockres->l_level = DLM_LOCK_IV; 2722 break; 2723 default: 2724 BUG(); 2725 } 2726 2727 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 2728 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 2729 spin_unlock_irqrestore(&lockres->l_lock, flags); 2730 2731 wake_up(&lockres->l_event); 2732 2733 mlog_exit_void(); 2734 } 2735 2736 static int ocfs2_drop_lock(struct ocfs2_super *osb, 2737 struct ocfs2_lock_res *lockres) 2738 { 2739 int ret; 2740 unsigned long flags; 2741 u32 lkm_flags = 0; 2742 2743 /* We didn't get anywhere near actually using this lockres. */ 2744 if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED)) 2745 goto out; 2746 2747 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) 2748 lkm_flags |= DLM_LKF_VALBLK; 2749 2750 spin_lock_irqsave(&lockres->l_lock, flags); 2751 2752 mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_FREEING), 2753 "lockres %s, flags 0x%lx\n", 2754 lockres->l_name, lockres->l_flags); 2755 2756 while (lockres->l_flags & OCFS2_LOCK_BUSY) { 2757 mlog(0, "waiting on busy lock \"%s\": flags = %lx, action = " 2758 "%u, unlock_action = %u\n", 2759 lockres->l_name, lockres->l_flags, lockres->l_action, 2760 lockres->l_unlock_action); 2761 2762 spin_unlock_irqrestore(&lockres->l_lock, flags); 2763 2764 /* XXX: Today we just wait on any busy 2765 * locks... Perhaps we need to cancel converts in the 2766 * future? */ 2767 ocfs2_wait_on_busy_lock(lockres); 2768 2769 spin_lock_irqsave(&lockres->l_lock, flags); 2770 } 2771 2772 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) { 2773 if (lockres->l_flags & OCFS2_LOCK_ATTACHED && 2774 lockres->l_level == DLM_LOCK_EX && 2775 !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) 2776 lockres->l_ops->set_lvb(lockres); 2777 } 2778 2779 if (lockres->l_flags & OCFS2_LOCK_BUSY) 2780 mlog(ML_ERROR, "destroying busy lock: \"%s\"\n", 2781 lockres->l_name); 2782 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) 2783 mlog(0, "destroying blocked lock: \"%s\"\n", lockres->l_name); 2784 2785 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 2786 spin_unlock_irqrestore(&lockres->l_lock, flags); 2787 goto out; 2788 } 2789 2790 lockres_clear_flags(lockres, OCFS2_LOCK_ATTACHED); 2791 2792 /* make sure we never get here while waiting for an ast to 2793 * fire. */ 2794 BUG_ON(lockres->l_action != OCFS2_AST_INVALID); 2795 2796 /* is this necessary? */ 2797 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 2798 lockres->l_unlock_action = OCFS2_UNLOCK_DROP_LOCK; 2799 spin_unlock_irqrestore(&lockres->l_lock, flags); 2800 2801 mlog(0, "lock %s\n", lockres->l_name); 2802 2803 ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, lkm_flags, 2804 lockres); 2805 if (ret) { 2806 ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres); 2807 mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags); 2808 ocfs2_dlm_dump_lksb(&lockres->l_lksb); 2809 BUG(); 2810 } 2811 mlog(0, "lock %s, successfull return from ocfs2_dlm_unlock\n", 2812 lockres->l_name); 2813 2814 ocfs2_wait_on_busy_lock(lockres); 2815 out: 2816 mlog_exit(0); 2817 return 0; 2818 } 2819 2820 /* Mark the lockres as being dropped. It will no longer be 2821 * queued if blocking, but we still may have to wait on it 2822 * being dequeued from the downconvert thread before we can consider 2823 * it safe to drop. 2824 * 2825 * You can *not* attempt to call cluster_lock on this lockres anymore. */ 2826 void ocfs2_mark_lockres_freeing(struct ocfs2_lock_res *lockres) 2827 { 2828 int status; 2829 struct ocfs2_mask_waiter mw; 2830 unsigned long flags; 2831 2832 ocfs2_init_mask_waiter(&mw); 2833 2834 spin_lock_irqsave(&lockres->l_lock, flags); 2835 lockres->l_flags |= OCFS2_LOCK_FREEING; 2836 while (lockres->l_flags & OCFS2_LOCK_QUEUED) { 2837 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_QUEUED, 0); 2838 spin_unlock_irqrestore(&lockres->l_lock, flags); 2839 2840 mlog(0, "Waiting on lockres %s\n", lockres->l_name); 2841 2842 status = ocfs2_wait_for_mask(&mw); 2843 if (status) 2844 mlog_errno(status); 2845 2846 spin_lock_irqsave(&lockres->l_lock, flags); 2847 } 2848 spin_unlock_irqrestore(&lockres->l_lock, flags); 2849 } 2850 2851 void ocfs2_simple_drop_lockres(struct ocfs2_super *osb, 2852 struct ocfs2_lock_res *lockres) 2853 { 2854 int ret; 2855 2856 ocfs2_mark_lockres_freeing(lockres); 2857 ret = ocfs2_drop_lock(osb, lockres); 2858 if (ret) 2859 mlog_errno(ret); 2860 } 2861 2862 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb) 2863 { 2864 ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres); 2865 ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres); 2866 } 2867 2868 int ocfs2_drop_inode_locks(struct inode *inode) 2869 { 2870 int status, err; 2871 2872 mlog_entry_void(); 2873 2874 /* No need to call ocfs2_mark_lockres_freeing here - 2875 * ocfs2_clear_inode has done it for us. */ 2876 2877 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 2878 &OCFS2_I(inode)->ip_open_lockres); 2879 if (err < 0) 2880 mlog_errno(err); 2881 2882 status = err; 2883 2884 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 2885 &OCFS2_I(inode)->ip_inode_lockres); 2886 if (err < 0) 2887 mlog_errno(err); 2888 if (err < 0 && !status) 2889 status = err; 2890 2891 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 2892 &OCFS2_I(inode)->ip_rw_lockres); 2893 if (err < 0) 2894 mlog_errno(err); 2895 if (err < 0 && !status) 2896 status = err; 2897 2898 mlog_exit(status); 2899 return status; 2900 } 2901 2902 static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres, 2903 int new_level) 2904 { 2905 assert_spin_locked(&lockres->l_lock); 2906 2907 BUG_ON(lockres->l_blocking <= DLM_LOCK_NL); 2908 2909 if (lockres->l_level <= new_level) { 2910 mlog(ML_ERROR, "lockres->l_level (%d) <= new_level (%d)\n", 2911 lockres->l_level, new_level); 2912 BUG(); 2913 } 2914 2915 mlog(0, "lock %s, new_level = %d, l_blocking = %d\n", 2916 lockres->l_name, new_level, lockres->l_blocking); 2917 2918 lockres->l_action = OCFS2_AST_DOWNCONVERT; 2919 lockres->l_requested = new_level; 2920 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 2921 return lockres_set_pending(lockres); 2922 } 2923 2924 static int ocfs2_downconvert_lock(struct ocfs2_super *osb, 2925 struct ocfs2_lock_res *lockres, 2926 int new_level, 2927 int lvb, 2928 unsigned int generation) 2929 { 2930 int ret; 2931 u32 dlm_flags = DLM_LKF_CONVERT; 2932 2933 mlog_entry_void(); 2934 2935 if (lvb) 2936 dlm_flags |= DLM_LKF_VALBLK; 2937 2938 ret = ocfs2_dlm_lock(osb->cconn, 2939 new_level, 2940 &lockres->l_lksb, 2941 dlm_flags, 2942 lockres->l_name, 2943 OCFS2_LOCK_ID_MAX_LEN - 1, 2944 lockres); 2945 lockres_clear_pending(lockres, generation, osb); 2946 if (ret) { 2947 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 2948 ocfs2_recover_from_dlm_error(lockres, 1); 2949 goto bail; 2950 } 2951 2952 ret = 0; 2953 bail: 2954 mlog_exit(ret); 2955 return ret; 2956 } 2957 2958 /* returns 1 when the caller should unlock and call ocfs2_dlm_unlock */ 2959 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb, 2960 struct ocfs2_lock_res *lockres) 2961 { 2962 assert_spin_locked(&lockres->l_lock); 2963 2964 mlog_entry_void(); 2965 mlog(0, "lock %s\n", lockres->l_name); 2966 2967 if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) { 2968 /* If we're already trying to cancel a lock conversion 2969 * then just drop the spinlock and allow the caller to 2970 * requeue this lock. */ 2971 2972 mlog(0, "Lockres %s, skip convert\n", lockres->l_name); 2973 return 0; 2974 } 2975 2976 /* were we in a convert when we got the bast fire? */ 2977 BUG_ON(lockres->l_action != OCFS2_AST_CONVERT && 2978 lockres->l_action != OCFS2_AST_DOWNCONVERT); 2979 /* set things up for the unlockast to know to just 2980 * clear out the ast_action and unset busy, etc. */ 2981 lockres->l_unlock_action = OCFS2_UNLOCK_CANCEL_CONVERT; 2982 2983 mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_BUSY), 2984 "lock %s, invalid flags: 0x%lx\n", 2985 lockres->l_name, lockres->l_flags); 2986 2987 return 1; 2988 } 2989 2990 static int ocfs2_cancel_convert(struct ocfs2_super *osb, 2991 struct ocfs2_lock_res *lockres) 2992 { 2993 int ret; 2994 2995 mlog_entry_void(); 2996 mlog(0, "lock %s\n", lockres->l_name); 2997 2998 ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, 2999 DLM_LKF_CANCEL, lockres); 3000 if (ret) { 3001 ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres); 3002 ocfs2_recover_from_dlm_error(lockres, 0); 3003 } 3004 3005 mlog(0, "lock %s return from ocfs2_dlm_unlock\n", lockres->l_name); 3006 3007 mlog_exit(ret); 3008 return ret; 3009 } 3010 3011 static int ocfs2_unblock_lock(struct ocfs2_super *osb, 3012 struct ocfs2_lock_res *lockres, 3013 struct ocfs2_unblock_ctl *ctl) 3014 { 3015 unsigned long flags; 3016 int blocking; 3017 int new_level; 3018 int ret = 0; 3019 int set_lvb = 0; 3020 unsigned int gen; 3021 3022 mlog_entry_void(); 3023 3024 spin_lock_irqsave(&lockres->l_lock, flags); 3025 3026 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); 3027 3028 recheck: 3029 if (lockres->l_flags & OCFS2_LOCK_BUSY) { 3030 /* XXX 3031 * This is a *big* race. The OCFS2_LOCK_PENDING flag 3032 * exists entirely for one reason - another thread has set 3033 * OCFS2_LOCK_BUSY, but has *NOT* yet called dlm_lock(). 3034 * 3035 * If we do ocfs2_cancel_convert() before the other thread 3036 * calls dlm_lock(), our cancel will do nothing. We will 3037 * get no ast, and we will have no way of knowing the 3038 * cancel failed. Meanwhile, the other thread will call 3039 * into dlm_lock() and wait...forever. 3040 * 3041 * Why forever? Because another node has asked for the 3042 * lock first; that's why we're here in unblock_lock(). 3043 * 3044 * The solution is OCFS2_LOCK_PENDING. When PENDING is 3045 * set, we just requeue the unblock. Only when the other 3046 * thread has called dlm_lock() and cleared PENDING will 3047 * we then cancel their request. 3048 * 3049 * All callers of dlm_lock() must set OCFS2_DLM_PENDING 3050 * at the same time they set OCFS2_DLM_BUSY. They must 3051 * clear OCFS2_DLM_PENDING after dlm_lock() returns. 3052 */ 3053 if (lockres->l_flags & OCFS2_LOCK_PENDING) 3054 goto leave_requeue; 3055 3056 ctl->requeue = 1; 3057 ret = ocfs2_prepare_cancel_convert(osb, lockres); 3058 spin_unlock_irqrestore(&lockres->l_lock, flags); 3059 if (ret) { 3060 ret = ocfs2_cancel_convert(osb, lockres); 3061 if (ret < 0) 3062 mlog_errno(ret); 3063 } 3064 goto leave; 3065 } 3066 3067 /* if we're blocking an exclusive and we have *any* holders, 3068 * then requeue. */ 3069 if ((lockres->l_blocking == DLM_LOCK_EX) 3070 && (lockres->l_ex_holders || lockres->l_ro_holders)) 3071 goto leave_requeue; 3072 3073 /* If it's a PR we're blocking, then only 3074 * requeue if we've got any EX holders */ 3075 if (lockres->l_blocking == DLM_LOCK_PR && 3076 lockres->l_ex_holders) 3077 goto leave_requeue; 3078 3079 /* 3080 * Can we get a lock in this state if the holder counts are 3081 * zero? The meta data unblock code used to check this. 3082 */ 3083 if ((lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 3084 && (lockres->l_flags & OCFS2_LOCK_REFRESHING)) 3085 goto leave_requeue; 3086 3087 new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking); 3088 3089 if (lockres->l_ops->check_downconvert 3090 && !lockres->l_ops->check_downconvert(lockres, new_level)) 3091 goto leave_requeue; 3092 3093 /* If we get here, then we know that there are no more 3094 * incompatible holders (and anyone asking for an incompatible 3095 * lock is blocked). We can now downconvert the lock */ 3096 if (!lockres->l_ops->downconvert_worker) 3097 goto downconvert; 3098 3099 /* Some lockres types want to do a bit of work before 3100 * downconverting a lock. Allow that here. The worker function 3101 * may sleep, so we save off a copy of what we're blocking as 3102 * it may change while we're not holding the spin lock. */ 3103 blocking = lockres->l_blocking; 3104 spin_unlock_irqrestore(&lockres->l_lock, flags); 3105 3106 ctl->unblock_action = lockres->l_ops->downconvert_worker(lockres, blocking); 3107 3108 if (ctl->unblock_action == UNBLOCK_STOP_POST) 3109 goto leave; 3110 3111 spin_lock_irqsave(&lockres->l_lock, flags); 3112 if (blocking != lockres->l_blocking) { 3113 /* If this changed underneath us, then we can't drop 3114 * it just yet. */ 3115 goto recheck; 3116 } 3117 3118 downconvert: 3119 ctl->requeue = 0; 3120 3121 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) { 3122 if (lockres->l_level == DLM_LOCK_EX) 3123 set_lvb = 1; 3124 3125 /* 3126 * We only set the lvb if the lock has been fully 3127 * refreshed - otherwise we risk setting stale 3128 * data. Otherwise, there's no need to actually clear 3129 * out the lvb here as it's value is still valid. 3130 */ 3131 if (set_lvb && !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) 3132 lockres->l_ops->set_lvb(lockres); 3133 } 3134 3135 gen = ocfs2_prepare_downconvert(lockres, new_level); 3136 spin_unlock_irqrestore(&lockres->l_lock, flags); 3137 ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb, 3138 gen); 3139 3140 leave: 3141 mlog_exit(ret); 3142 return ret; 3143 3144 leave_requeue: 3145 spin_unlock_irqrestore(&lockres->l_lock, flags); 3146 ctl->requeue = 1; 3147 3148 mlog_exit(0); 3149 return 0; 3150 } 3151 3152 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres, 3153 int blocking) 3154 { 3155 struct inode *inode; 3156 struct address_space *mapping; 3157 3158 inode = ocfs2_lock_res_inode(lockres); 3159 mapping = inode->i_mapping; 3160 3161 if (!S_ISREG(inode->i_mode)) 3162 goto out; 3163 3164 /* 3165 * We need this before the filemap_fdatawrite() so that it can 3166 * transfer the dirty bit from the PTE to the 3167 * page. Unfortunately this means that even for EX->PR 3168 * downconverts, we'll lose our mappings and have to build 3169 * them up again. 3170 */ 3171 unmap_mapping_range(mapping, 0, 0, 0); 3172 3173 if (filemap_fdatawrite(mapping)) { 3174 mlog(ML_ERROR, "Could not sync inode %llu for downconvert!", 3175 (unsigned long long)OCFS2_I(inode)->ip_blkno); 3176 } 3177 sync_mapping_buffers(mapping); 3178 if (blocking == DLM_LOCK_EX) { 3179 truncate_inode_pages(mapping, 0); 3180 } else { 3181 /* We only need to wait on the I/O if we're not also 3182 * truncating pages because truncate_inode_pages waits 3183 * for us above. We don't truncate pages if we're 3184 * blocking anything < EXMODE because we want to keep 3185 * them around in that case. */ 3186 filemap_fdatawait(mapping); 3187 } 3188 3189 out: 3190 return UNBLOCK_CONTINUE; 3191 } 3192 3193 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres, 3194 int new_level) 3195 { 3196 struct inode *inode = ocfs2_lock_res_inode(lockres); 3197 int checkpointed = ocfs2_inode_fully_checkpointed(inode); 3198 3199 BUG_ON(new_level != DLM_LOCK_NL && new_level != DLM_LOCK_PR); 3200 BUG_ON(lockres->l_level != DLM_LOCK_EX && !checkpointed); 3201 3202 if (checkpointed) 3203 return 1; 3204 3205 ocfs2_start_checkpoint(OCFS2_SB(inode->i_sb)); 3206 return 0; 3207 } 3208 3209 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres) 3210 { 3211 struct inode *inode = ocfs2_lock_res_inode(lockres); 3212 3213 __ocfs2_stuff_meta_lvb(inode); 3214 } 3215 3216 /* 3217 * Does the final reference drop on our dentry lock. Right now this 3218 * happens in the downconvert thread, but we could choose to simplify the 3219 * dlmglue API and push these off to the ocfs2_wq in the future. 3220 */ 3221 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb, 3222 struct ocfs2_lock_res *lockres) 3223 { 3224 struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres); 3225 ocfs2_dentry_lock_put(osb, dl); 3226 } 3227 3228 /* 3229 * d_delete() matching dentries before the lock downconvert. 3230 * 3231 * At this point, any process waiting to destroy the 3232 * dentry_lock due to last ref count is stopped by the 3233 * OCFS2_LOCK_QUEUED flag. 3234 * 3235 * We have two potential problems 3236 * 3237 * 1) If we do the last reference drop on our dentry_lock (via dput) 3238 * we'll wind up in ocfs2_release_dentry_lock(), waiting on 3239 * the downconvert to finish. Instead we take an elevated 3240 * reference and push the drop until after we've completed our 3241 * unblock processing. 3242 * 3243 * 2) There might be another process with a final reference, 3244 * waiting on us to finish processing. If this is the case, we 3245 * detect it and exit out - there's no more dentries anyway. 3246 */ 3247 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres, 3248 int blocking) 3249 { 3250 struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres); 3251 struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode); 3252 struct dentry *dentry; 3253 unsigned long flags; 3254 int extra_ref = 0; 3255 3256 /* 3257 * This node is blocking another node from getting a read 3258 * lock. This happens when we've renamed within a 3259 * directory. We've forced the other nodes to d_delete(), but 3260 * we never actually dropped our lock because it's still 3261 * valid. The downconvert code will retain a PR for this node, 3262 * so there's no further work to do. 3263 */ 3264 if (blocking == DLM_LOCK_PR) 3265 return UNBLOCK_CONTINUE; 3266 3267 /* 3268 * Mark this inode as potentially orphaned. The code in 3269 * ocfs2_delete_inode() will figure out whether it actually 3270 * needs to be freed or not. 3271 */ 3272 spin_lock(&oi->ip_lock); 3273 oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED; 3274 spin_unlock(&oi->ip_lock); 3275 3276 /* 3277 * Yuck. We need to make sure however that the check of 3278 * OCFS2_LOCK_FREEING and the extra reference are atomic with 3279 * respect to a reference decrement or the setting of that 3280 * flag. 3281 */ 3282 spin_lock_irqsave(&lockres->l_lock, flags); 3283 spin_lock(&dentry_attach_lock); 3284 if (!(lockres->l_flags & OCFS2_LOCK_FREEING) 3285 && dl->dl_count) { 3286 dl->dl_count++; 3287 extra_ref = 1; 3288 } 3289 spin_unlock(&dentry_attach_lock); 3290 spin_unlock_irqrestore(&lockres->l_lock, flags); 3291 3292 mlog(0, "extra_ref = %d\n", extra_ref); 3293 3294 /* 3295 * We have a process waiting on us in ocfs2_dentry_iput(), 3296 * which means we can't have any more outstanding 3297 * aliases. There's no need to do any more work. 3298 */ 3299 if (!extra_ref) 3300 return UNBLOCK_CONTINUE; 3301 3302 spin_lock(&dentry_attach_lock); 3303 while (1) { 3304 dentry = ocfs2_find_local_alias(dl->dl_inode, 3305 dl->dl_parent_blkno, 1); 3306 if (!dentry) 3307 break; 3308 spin_unlock(&dentry_attach_lock); 3309 3310 mlog(0, "d_delete(%.*s);\n", dentry->d_name.len, 3311 dentry->d_name.name); 3312 3313 /* 3314 * The following dcache calls may do an 3315 * iput(). Normally we don't want that from the 3316 * downconverting thread, but in this case it's ok 3317 * because the requesting node already has an 3318 * exclusive lock on the inode, so it can't be queued 3319 * for a downconvert. 3320 */ 3321 d_delete(dentry); 3322 dput(dentry); 3323 3324 spin_lock(&dentry_attach_lock); 3325 } 3326 spin_unlock(&dentry_attach_lock); 3327 3328 /* 3329 * If we are the last holder of this dentry lock, there is no 3330 * reason to downconvert so skip straight to the unlock. 3331 */ 3332 if (dl->dl_count == 1) 3333 return UNBLOCK_STOP_POST; 3334 3335 return UNBLOCK_CONTINUE_POST; 3336 } 3337 3338 /* 3339 * This is the filesystem locking protocol. It provides the lock handling 3340 * hooks for the underlying DLM. It has a maximum version number. 3341 * The version number allows interoperability with systems running at 3342 * the same major number and an equal or smaller minor number. 3343 * 3344 * Whenever the filesystem does new things with locks (adds or removes a 3345 * lock, orders them differently, does different things underneath a lock), 3346 * the version must be changed. The protocol is negotiated when joining 3347 * the dlm domain. A node may join the domain if its major version is 3348 * identical to all other nodes and its minor version is greater than 3349 * or equal to all other nodes. When its minor version is greater than 3350 * the other nodes, it will run at the minor version specified by the 3351 * other nodes. 3352 * 3353 * If a locking change is made that will not be compatible with older 3354 * versions, the major number must be increased and the minor version set 3355 * to zero. If a change merely adds a behavior that can be disabled when 3356 * speaking to older versions, the minor version must be increased. If a 3357 * change adds a fully backwards compatible change (eg, LVB changes that 3358 * are just ignored by older versions), the version does not need to be 3359 * updated. 3360 */ 3361 static struct ocfs2_locking_protocol lproto = { 3362 .lp_max_version = { 3363 .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR, 3364 .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR, 3365 }, 3366 .lp_lock_ast = ocfs2_locking_ast, 3367 .lp_blocking_ast = ocfs2_blocking_ast, 3368 .lp_unlock_ast = ocfs2_unlock_ast, 3369 }; 3370 3371 void ocfs2_set_locking_protocol(void) 3372 { 3373 ocfs2_stack_glue_set_locking_protocol(&lproto); 3374 } 3375 3376 3377 static void ocfs2_process_blocked_lock(struct ocfs2_super *osb, 3378 struct ocfs2_lock_res *lockres) 3379 { 3380 int status; 3381 struct ocfs2_unblock_ctl ctl = {0, 0,}; 3382 unsigned long flags; 3383 3384 /* Our reference to the lockres in this function can be 3385 * considered valid until we remove the OCFS2_LOCK_QUEUED 3386 * flag. */ 3387 3388 mlog_entry_void(); 3389 3390 BUG_ON(!lockres); 3391 BUG_ON(!lockres->l_ops); 3392 3393 mlog(0, "lockres %s blocked.\n", lockres->l_name); 3394 3395 /* Detect whether a lock has been marked as going away while 3396 * the downconvert thread was processing other things. A lock can 3397 * still be marked with OCFS2_LOCK_FREEING after this check, 3398 * but short circuiting here will still save us some 3399 * performance. */ 3400 spin_lock_irqsave(&lockres->l_lock, flags); 3401 if (lockres->l_flags & OCFS2_LOCK_FREEING) 3402 goto unqueue; 3403 spin_unlock_irqrestore(&lockres->l_lock, flags); 3404 3405 status = ocfs2_unblock_lock(osb, lockres, &ctl); 3406 if (status < 0) 3407 mlog_errno(status); 3408 3409 spin_lock_irqsave(&lockres->l_lock, flags); 3410 unqueue: 3411 if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) { 3412 lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED); 3413 } else 3414 ocfs2_schedule_blocked_lock(osb, lockres); 3415 3416 mlog(0, "lockres %s, requeue = %s.\n", lockres->l_name, 3417 ctl.requeue ? "yes" : "no"); 3418 spin_unlock_irqrestore(&lockres->l_lock, flags); 3419 3420 if (ctl.unblock_action != UNBLOCK_CONTINUE 3421 && lockres->l_ops->post_unlock) 3422 lockres->l_ops->post_unlock(osb, lockres); 3423 3424 mlog_exit_void(); 3425 } 3426 3427 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb, 3428 struct ocfs2_lock_res *lockres) 3429 { 3430 mlog_entry_void(); 3431 3432 assert_spin_locked(&lockres->l_lock); 3433 3434 if (lockres->l_flags & OCFS2_LOCK_FREEING) { 3435 /* Do not schedule a lock for downconvert when it's on 3436 * the way to destruction - any nodes wanting access 3437 * to the resource will get it soon. */ 3438 mlog(0, "Lockres %s won't be scheduled: flags 0x%lx\n", 3439 lockres->l_name, lockres->l_flags); 3440 return; 3441 } 3442 3443 lockres_or_flags(lockres, OCFS2_LOCK_QUEUED); 3444 3445 spin_lock(&osb->dc_task_lock); 3446 if (list_empty(&lockres->l_blocked_list)) { 3447 list_add_tail(&lockres->l_blocked_list, 3448 &osb->blocked_lock_list); 3449 osb->blocked_lock_count++; 3450 } 3451 spin_unlock(&osb->dc_task_lock); 3452 3453 mlog_exit_void(); 3454 } 3455 3456 static void ocfs2_downconvert_thread_do_work(struct ocfs2_super *osb) 3457 { 3458 unsigned long processed; 3459 struct ocfs2_lock_res *lockres; 3460 3461 mlog_entry_void(); 3462 3463 spin_lock(&osb->dc_task_lock); 3464 /* grab this early so we know to try again if a state change and 3465 * wake happens part-way through our work */ 3466 osb->dc_work_sequence = osb->dc_wake_sequence; 3467 3468 processed = osb->blocked_lock_count; 3469 while (processed) { 3470 BUG_ON(list_empty(&osb->blocked_lock_list)); 3471 3472 lockres = list_entry(osb->blocked_lock_list.next, 3473 struct ocfs2_lock_res, l_blocked_list); 3474 list_del_init(&lockres->l_blocked_list); 3475 osb->blocked_lock_count--; 3476 spin_unlock(&osb->dc_task_lock); 3477 3478 BUG_ON(!processed); 3479 processed--; 3480 3481 ocfs2_process_blocked_lock(osb, lockres); 3482 3483 spin_lock(&osb->dc_task_lock); 3484 } 3485 spin_unlock(&osb->dc_task_lock); 3486 3487 mlog_exit_void(); 3488 } 3489 3490 static int ocfs2_downconvert_thread_lists_empty(struct ocfs2_super *osb) 3491 { 3492 int empty = 0; 3493 3494 spin_lock(&osb->dc_task_lock); 3495 if (list_empty(&osb->blocked_lock_list)) 3496 empty = 1; 3497 3498 spin_unlock(&osb->dc_task_lock); 3499 return empty; 3500 } 3501 3502 static int ocfs2_downconvert_thread_should_wake(struct ocfs2_super *osb) 3503 { 3504 int should_wake = 0; 3505 3506 spin_lock(&osb->dc_task_lock); 3507 if (osb->dc_work_sequence != osb->dc_wake_sequence) 3508 should_wake = 1; 3509 spin_unlock(&osb->dc_task_lock); 3510 3511 return should_wake; 3512 } 3513 3514 static int ocfs2_downconvert_thread(void *arg) 3515 { 3516 int status = 0; 3517 struct ocfs2_super *osb = arg; 3518 3519 /* only quit once we've been asked to stop and there is no more 3520 * work available */ 3521 while (!(kthread_should_stop() && 3522 ocfs2_downconvert_thread_lists_empty(osb))) { 3523 3524 wait_event_interruptible(osb->dc_event, 3525 ocfs2_downconvert_thread_should_wake(osb) || 3526 kthread_should_stop()); 3527 3528 mlog(0, "downconvert_thread: awoken\n"); 3529 3530 ocfs2_downconvert_thread_do_work(osb); 3531 } 3532 3533 osb->dc_task = NULL; 3534 return status; 3535 } 3536 3537 void ocfs2_wake_downconvert_thread(struct ocfs2_super *osb) 3538 { 3539 spin_lock(&osb->dc_task_lock); 3540 /* make sure the voting thread gets a swipe at whatever changes 3541 * the caller may have made to the voting state */ 3542 osb->dc_wake_sequence++; 3543 spin_unlock(&osb->dc_task_lock); 3544 wake_up(&osb->dc_event); 3545 } 3546