1 /* -*- mode: c; c-basic-offset: 8; -*- 2 * vim: noexpandtab sw=8 ts=8 sts=0: 3 * 4 * dlmglue.c 5 * 6 * Code which implements an OCFS2 specific interface to our DLM. 7 * 8 * Copyright (C) 2003, 2004 Oracle. All rights reserved. 9 * 10 * This program is free software; you can redistribute it and/or 11 * modify it under the terms of the GNU General Public 12 * License as published by the Free Software Foundation; either 13 * version 2 of the License, or (at your option) any later version. 14 * 15 * This program is distributed in the hope that it will be useful, 16 * but WITHOUT ANY WARRANTY; without even the implied warranty of 17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 18 * General Public License for more details. 19 * 20 * You should have received a copy of the GNU General Public 21 * License along with this program; if not, write to the 22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 23 * Boston, MA 021110-1307, USA. 24 */ 25 26 #include <linux/types.h> 27 #include <linux/slab.h> 28 #include <linux/highmem.h> 29 #include <linux/mm.h> 30 #include <linux/kthread.h> 31 #include <linux/pagemap.h> 32 #include <linux/debugfs.h> 33 #include <linux/seq_file.h> 34 #include <linux/time.h> 35 36 #define MLOG_MASK_PREFIX ML_DLM_GLUE 37 #include <cluster/masklog.h> 38 39 #include "ocfs2.h" 40 #include "ocfs2_lockingver.h" 41 42 #include "alloc.h" 43 #include "dcache.h" 44 #include "dlmglue.h" 45 #include "extent_map.h" 46 #include "file.h" 47 #include "heartbeat.h" 48 #include "inode.h" 49 #include "journal.h" 50 #include "stackglue.h" 51 #include "slot_map.h" 52 #include "super.h" 53 #include "uptodate.h" 54 55 #include "buffer_head_io.h" 56 57 struct ocfs2_mask_waiter { 58 struct list_head mw_item; 59 int mw_status; 60 struct completion mw_complete; 61 unsigned long mw_mask; 62 unsigned long mw_goal; 63 #ifdef CONFIG_OCFS2_FS_STATS 64 unsigned long long mw_lock_start; 65 #endif 66 }; 67 68 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres); 69 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres); 70 static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres); 71 72 /* 73 * Return value from ->downconvert_worker functions. 74 * 75 * These control the precise actions of ocfs2_unblock_lock() 76 * and ocfs2_process_blocked_lock() 77 * 78 */ 79 enum ocfs2_unblock_action { 80 UNBLOCK_CONTINUE = 0, /* Continue downconvert */ 81 UNBLOCK_CONTINUE_POST = 1, /* Continue downconvert, fire 82 * ->post_unlock callback */ 83 UNBLOCK_STOP_POST = 2, /* Do not downconvert, fire 84 * ->post_unlock() callback. */ 85 }; 86 87 struct ocfs2_unblock_ctl { 88 int requeue; 89 enum ocfs2_unblock_action unblock_action; 90 }; 91 92 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres, 93 int new_level); 94 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres); 95 96 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres, 97 int blocking); 98 99 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres, 100 int blocking); 101 102 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb, 103 struct ocfs2_lock_res *lockres); 104 105 106 #define mlog_meta_lvb(__level, __lockres) ocfs2_dump_meta_lvb_info(__level, __PRETTY_FUNCTION__, __LINE__, __lockres) 107 108 /* This aids in debugging situations where a bad LVB might be involved. */ 109 static void ocfs2_dump_meta_lvb_info(u64 level, 110 const char *function, 111 unsigned int line, 112 struct ocfs2_lock_res *lockres) 113 { 114 struct ocfs2_meta_lvb *lvb = 115 (struct ocfs2_meta_lvb *)ocfs2_dlm_lvb(&lockres->l_lksb); 116 117 mlog(level, "LVB information for %s (called from %s:%u):\n", 118 lockres->l_name, function, line); 119 mlog(level, "version: %u, clusters: %u, generation: 0x%x\n", 120 lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters), 121 be32_to_cpu(lvb->lvb_igeneration)); 122 mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n", 123 (unsigned long long)be64_to_cpu(lvb->lvb_isize), 124 be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid), 125 be16_to_cpu(lvb->lvb_imode)); 126 mlog(level, "nlink %u, atime_packed 0x%llx, ctime_packed 0x%llx, " 127 "mtime_packed 0x%llx iattr 0x%x\n", be16_to_cpu(lvb->lvb_inlink), 128 (long long)be64_to_cpu(lvb->lvb_iatime_packed), 129 (long long)be64_to_cpu(lvb->lvb_ictime_packed), 130 (long long)be64_to_cpu(lvb->lvb_imtime_packed), 131 be32_to_cpu(lvb->lvb_iattr)); 132 } 133 134 135 /* 136 * OCFS2 Lock Resource Operations 137 * 138 * These fine tune the behavior of the generic dlmglue locking infrastructure. 139 * 140 * The most basic of lock types can point ->l_priv to their respective 141 * struct ocfs2_super and allow the default actions to manage things. 142 * 143 * Right now, each lock type also needs to implement an init function, 144 * and trivial lock/unlock wrappers. ocfs2_simple_drop_lockres() 145 * should be called when the lock is no longer needed (i.e., object 146 * destruction time). 147 */ 148 struct ocfs2_lock_res_ops { 149 /* 150 * Translate an ocfs2_lock_res * into an ocfs2_super *. Define 151 * this callback if ->l_priv is not an ocfs2_super pointer 152 */ 153 struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *); 154 155 /* 156 * Optionally called in the downconvert thread after a 157 * successful downconvert. The lockres will not be referenced 158 * after this callback is called, so it is safe to free 159 * memory, etc. 160 * 161 * The exact semantics of when this is called are controlled 162 * by ->downconvert_worker() 163 */ 164 void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *); 165 166 /* 167 * Allow a lock type to add checks to determine whether it is 168 * safe to downconvert a lock. Return 0 to re-queue the 169 * downconvert at a later time, nonzero to continue. 170 * 171 * For most locks, the default checks that there are no 172 * incompatible holders are sufficient. 173 * 174 * Called with the lockres spinlock held. 175 */ 176 int (*check_downconvert)(struct ocfs2_lock_res *, int); 177 178 /* 179 * Allows a lock type to populate the lock value block. This 180 * is called on downconvert, and when we drop a lock. 181 * 182 * Locks that want to use this should set LOCK_TYPE_USES_LVB 183 * in the flags field. 184 * 185 * Called with the lockres spinlock held. 186 */ 187 void (*set_lvb)(struct ocfs2_lock_res *); 188 189 /* 190 * Called from the downconvert thread when it is determined 191 * that a lock will be downconverted. This is called without 192 * any locks held so the function can do work that might 193 * schedule (syncing out data, etc). 194 * 195 * This should return any one of the ocfs2_unblock_action 196 * values, depending on what it wants the thread to do. 197 */ 198 int (*downconvert_worker)(struct ocfs2_lock_res *, int); 199 200 /* 201 * LOCK_TYPE_* flags which describe the specific requirements 202 * of a lock type. Descriptions of each individual flag follow. 203 */ 204 int flags; 205 }; 206 207 /* 208 * Some locks want to "refresh" potentially stale data when a 209 * meaningful (PRMODE or EXMODE) lock level is first obtained. If this 210 * flag is set, the OCFS2_LOCK_NEEDS_REFRESH flag will be set on the 211 * individual lockres l_flags member from the ast function. It is 212 * expected that the locking wrapper will clear the 213 * OCFS2_LOCK_NEEDS_REFRESH flag when done. 214 */ 215 #define LOCK_TYPE_REQUIRES_REFRESH 0x1 216 217 /* 218 * Indicate that a lock type makes use of the lock value block. The 219 * ->set_lvb lock type callback must be defined. 220 */ 221 #define LOCK_TYPE_USES_LVB 0x2 222 223 static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = { 224 .get_osb = ocfs2_get_inode_osb, 225 .flags = 0, 226 }; 227 228 static struct ocfs2_lock_res_ops ocfs2_inode_inode_lops = { 229 .get_osb = ocfs2_get_inode_osb, 230 .check_downconvert = ocfs2_check_meta_downconvert, 231 .set_lvb = ocfs2_set_meta_lvb, 232 .downconvert_worker = ocfs2_data_convert_worker, 233 .flags = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB, 234 }; 235 236 static struct ocfs2_lock_res_ops ocfs2_super_lops = { 237 .flags = LOCK_TYPE_REQUIRES_REFRESH, 238 }; 239 240 static struct ocfs2_lock_res_ops ocfs2_rename_lops = { 241 .flags = 0, 242 }; 243 244 static struct ocfs2_lock_res_ops ocfs2_dentry_lops = { 245 .get_osb = ocfs2_get_dentry_osb, 246 .post_unlock = ocfs2_dentry_post_unlock, 247 .downconvert_worker = ocfs2_dentry_convert_worker, 248 .flags = 0, 249 }; 250 251 static struct ocfs2_lock_res_ops ocfs2_inode_open_lops = { 252 .get_osb = ocfs2_get_inode_osb, 253 .flags = 0, 254 }; 255 256 static struct ocfs2_lock_res_ops ocfs2_flock_lops = { 257 .get_osb = ocfs2_get_file_osb, 258 .flags = 0, 259 }; 260 261 static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres) 262 { 263 return lockres->l_type == OCFS2_LOCK_TYPE_META || 264 lockres->l_type == OCFS2_LOCK_TYPE_RW || 265 lockres->l_type == OCFS2_LOCK_TYPE_OPEN; 266 } 267 268 static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres) 269 { 270 BUG_ON(!ocfs2_is_inode_lock(lockres)); 271 272 return (struct inode *) lockres->l_priv; 273 } 274 275 static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres) 276 { 277 BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY); 278 279 return (struct ocfs2_dentry_lock *)lockres->l_priv; 280 } 281 282 static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *lockres) 283 { 284 if (lockres->l_ops->get_osb) 285 return lockres->l_ops->get_osb(lockres); 286 287 return (struct ocfs2_super *)lockres->l_priv; 288 } 289 290 static int ocfs2_lock_create(struct ocfs2_super *osb, 291 struct ocfs2_lock_res *lockres, 292 int level, 293 u32 dlm_flags); 294 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres, 295 int wanted); 296 static void ocfs2_cluster_unlock(struct ocfs2_super *osb, 297 struct ocfs2_lock_res *lockres, 298 int level); 299 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres); 300 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres); 301 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres); 302 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, int level); 303 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb, 304 struct ocfs2_lock_res *lockres); 305 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres, 306 int convert); 307 #define ocfs2_log_dlm_error(_func, _err, _lockres) do { \ 308 mlog(ML_ERROR, "DLM error %d while calling %s on resource %s\n", \ 309 _err, _func, _lockres->l_name); \ 310 } while (0) 311 static int ocfs2_downconvert_thread(void *arg); 312 static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb, 313 struct ocfs2_lock_res *lockres); 314 static int ocfs2_inode_lock_update(struct inode *inode, 315 struct buffer_head **bh); 316 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb); 317 static inline int ocfs2_highest_compat_lock_level(int level); 318 static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres, 319 int new_level); 320 static int ocfs2_downconvert_lock(struct ocfs2_super *osb, 321 struct ocfs2_lock_res *lockres, 322 int new_level, 323 int lvb, 324 unsigned int generation); 325 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb, 326 struct ocfs2_lock_res *lockres); 327 static int ocfs2_cancel_convert(struct ocfs2_super *osb, 328 struct ocfs2_lock_res *lockres); 329 330 331 static void ocfs2_build_lock_name(enum ocfs2_lock_type type, 332 u64 blkno, 333 u32 generation, 334 char *name) 335 { 336 int len; 337 338 mlog_entry_void(); 339 340 BUG_ON(type >= OCFS2_NUM_LOCK_TYPES); 341 342 len = snprintf(name, OCFS2_LOCK_ID_MAX_LEN, "%c%s%016llx%08x", 343 ocfs2_lock_type_char(type), OCFS2_LOCK_ID_PAD, 344 (long long)blkno, generation); 345 346 BUG_ON(len != (OCFS2_LOCK_ID_MAX_LEN - 1)); 347 348 mlog(0, "built lock resource with name: %s\n", name); 349 350 mlog_exit_void(); 351 } 352 353 static DEFINE_SPINLOCK(ocfs2_dlm_tracking_lock); 354 355 static void ocfs2_add_lockres_tracking(struct ocfs2_lock_res *res, 356 struct ocfs2_dlm_debug *dlm_debug) 357 { 358 mlog(0, "Add tracking for lockres %s\n", res->l_name); 359 360 spin_lock(&ocfs2_dlm_tracking_lock); 361 list_add(&res->l_debug_list, &dlm_debug->d_lockres_tracking); 362 spin_unlock(&ocfs2_dlm_tracking_lock); 363 } 364 365 static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res) 366 { 367 spin_lock(&ocfs2_dlm_tracking_lock); 368 if (!list_empty(&res->l_debug_list)) 369 list_del_init(&res->l_debug_list); 370 spin_unlock(&ocfs2_dlm_tracking_lock); 371 } 372 373 #ifdef CONFIG_OCFS2_FS_STATS 374 static void ocfs2_init_lock_stats(struct ocfs2_lock_res *res) 375 { 376 res->l_lock_num_prmode = 0; 377 res->l_lock_num_prmode_failed = 0; 378 res->l_lock_total_prmode = 0; 379 res->l_lock_max_prmode = 0; 380 res->l_lock_num_exmode = 0; 381 res->l_lock_num_exmode_failed = 0; 382 res->l_lock_total_exmode = 0; 383 res->l_lock_max_exmode = 0; 384 res->l_lock_refresh = 0; 385 } 386 387 static void ocfs2_update_lock_stats(struct ocfs2_lock_res *res, int level, 388 struct ocfs2_mask_waiter *mw, int ret) 389 { 390 unsigned long long *num, *sum; 391 unsigned int *max, *failed; 392 struct timespec ts = current_kernel_time(); 393 unsigned long long time = timespec_to_ns(&ts) - mw->mw_lock_start; 394 395 if (level == LKM_PRMODE) { 396 num = &res->l_lock_num_prmode; 397 sum = &res->l_lock_total_prmode; 398 max = &res->l_lock_max_prmode; 399 failed = &res->l_lock_num_prmode_failed; 400 } else if (level == LKM_EXMODE) { 401 num = &res->l_lock_num_exmode; 402 sum = &res->l_lock_total_exmode; 403 max = &res->l_lock_max_exmode; 404 failed = &res->l_lock_num_exmode_failed; 405 } else 406 return; 407 408 (*num)++; 409 (*sum) += time; 410 if (time > *max) 411 *max = time; 412 if (ret) 413 (*failed)++; 414 } 415 416 static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres) 417 { 418 lockres->l_lock_refresh++; 419 } 420 421 static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw) 422 { 423 struct timespec ts = current_kernel_time(); 424 mw->mw_lock_start = timespec_to_ns(&ts); 425 } 426 #else 427 static inline void ocfs2_init_lock_stats(struct ocfs2_lock_res *res) 428 { 429 } 430 static inline void ocfs2_update_lock_stats(struct ocfs2_lock_res *res, 431 int level, struct ocfs2_mask_waiter *mw, int ret) 432 { 433 } 434 static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres) 435 { 436 } 437 static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw) 438 { 439 } 440 #endif 441 442 static void ocfs2_lock_res_init_common(struct ocfs2_super *osb, 443 struct ocfs2_lock_res *res, 444 enum ocfs2_lock_type type, 445 struct ocfs2_lock_res_ops *ops, 446 void *priv) 447 { 448 res->l_type = type; 449 res->l_ops = ops; 450 res->l_priv = priv; 451 452 res->l_level = DLM_LOCK_IV; 453 res->l_requested = DLM_LOCK_IV; 454 res->l_blocking = DLM_LOCK_IV; 455 res->l_action = OCFS2_AST_INVALID; 456 res->l_unlock_action = OCFS2_UNLOCK_INVALID; 457 458 res->l_flags = OCFS2_LOCK_INITIALIZED; 459 460 ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug); 461 462 ocfs2_init_lock_stats(res); 463 } 464 465 void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res) 466 { 467 /* This also clears out the lock status block */ 468 memset(res, 0, sizeof(struct ocfs2_lock_res)); 469 spin_lock_init(&res->l_lock); 470 init_waitqueue_head(&res->l_event); 471 INIT_LIST_HEAD(&res->l_blocked_list); 472 INIT_LIST_HEAD(&res->l_mask_waiters); 473 } 474 475 void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res, 476 enum ocfs2_lock_type type, 477 unsigned int generation, 478 struct inode *inode) 479 { 480 struct ocfs2_lock_res_ops *ops; 481 482 switch(type) { 483 case OCFS2_LOCK_TYPE_RW: 484 ops = &ocfs2_inode_rw_lops; 485 break; 486 case OCFS2_LOCK_TYPE_META: 487 ops = &ocfs2_inode_inode_lops; 488 break; 489 case OCFS2_LOCK_TYPE_OPEN: 490 ops = &ocfs2_inode_open_lops; 491 break; 492 default: 493 mlog_bug_on_msg(1, "type: %d\n", type); 494 ops = NULL; /* thanks, gcc */ 495 break; 496 }; 497 498 ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno, 499 generation, res->l_name); 500 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode); 501 } 502 503 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres) 504 { 505 struct inode *inode = ocfs2_lock_res_inode(lockres); 506 507 return OCFS2_SB(inode->i_sb); 508 } 509 510 static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres) 511 { 512 struct ocfs2_file_private *fp = lockres->l_priv; 513 514 return OCFS2_SB(fp->fp_file->f_mapping->host->i_sb); 515 } 516 517 static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres) 518 { 519 __be64 inode_blkno_be; 520 521 memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], 522 sizeof(__be64)); 523 524 return be64_to_cpu(inode_blkno_be); 525 } 526 527 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres) 528 { 529 struct ocfs2_dentry_lock *dl = lockres->l_priv; 530 531 return OCFS2_SB(dl->dl_inode->i_sb); 532 } 533 534 void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl, 535 u64 parent, struct inode *inode) 536 { 537 int len; 538 u64 inode_blkno = OCFS2_I(inode)->ip_blkno; 539 __be64 inode_blkno_be = cpu_to_be64(inode_blkno); 540 struct ocfs2_lock_res *lockres = &dl->dl_lockres; 541 542 ocfs2_lock_res_init_once(lockres); 543 544 /* 545 * Unfortunately, the standard lock naming scheme won't work 546 * here because we have two 16 byte values to use. Instead, 547 * we'll stuff the inode number as a binary value. We still 548 * want error prints to show something without garbling the 549 * display, so drop a null byte in there before the inode 550 * number. A future version of OCFS2 will likely use all 551 * binary lock names. The stringified names have been a 552 * tremendous aid in debugging, but now that the debugfs 553 * interface exists, we can mangle things there if need be. 554 * 555 * NOTE: We also drop the standard "pad" value (the total lock 556 * name size stays the same though - the last part is all 557 * zeros due to the memset in ocfs2_lock_res_init_once() 558 */ 559 len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START, 560 "%c%016llx", 561 ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY), 562 (long long)parent); 563 564 BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1)); 565 566 memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be, 567 sizeof(__be64)); 568 569 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres, 570 OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops, 571 dl); 572 } 573 574 static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res, 575 struct ocfs2_super *osb) 576 { 577 /* Superblock lockres doesn't come from a slab so we call init 578 * once on it manually. */ 579 ocfs2_lock_res_init_once(res); 580 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO, 581 0, res->l_name); 582 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER, 583 &ocfs2_super_lops, osb); 584 } 585 586 static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res, 587 struct ocfs2_super *osb) 588 { 589 /* Rename lockres doesn't come from a slab so we call init 590 * once on it manually. */ 591 ocfs2_lock_res_init_once(res); 592 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name); 593 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME, 594 &ocfs2_rename_lops, osb); 595 } 596 597 void ocfs2_file_lock_res_init(struct ocfs2_lock_res *lockres, 598 struct ocfs2_file_private *fp) 599 { 600 struct inode *inode = fp->fp_file->f_mapping->host; 601 struct ocfs2_inode_info *oi = OCFS2_I(inode); 602 603 ocfs2_lock_res_init_once(lockres); 604 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_FLOCK, oi->ip_blkno, 605 inode->i_generation, lockres->l_name); 606 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres, 607 OCFS2_LOCK_TYPE_FLOCK, &ocfs2_flock_lops, 608 fp); 609 lockres->l_flags |= OCFS2_LOCK_NOCACHE; 610 } 611 612 void ocfs2_lock_res_free(struct ocfs2_lock_res *res) 613 { 614 mlog_entry_void(); 615 616 if (!(res->l_flags & OCFS2_LOCK_INITIALIZED)) 617 return; 618 619 ocfs2_remove_lockres_tracking(res); 620 621 mlog_bug_on_msg(!list_empty(&res->l_blocked_list), 622 "Lockres %s is on the blocked list\n", 623 res->l_name); 624 mlog_bug_on_msg(!list_empty(&res->l_mask_waiters), 625 "Lockres %s has mask waiters pending\n", 626 res->l_name); 627 mlog_bug_on_msg(spin_is_locked(&res->l_lock), 628 "Lockres %s is locked\n", 629 res->l_name); 630 mlog_bug_on_msg(res->l_ro_holders, 631 "Lockres %s has %u ro holders\n", 632 res->l_name, res->l_ro_holders); 633 mlog_bug_on_msg(res->l_ex_holders, 634 "Lockres %s has %u ex holders\n", 635 res->l_name, res->l_ex_holders); 636 637 /* Need to clear out the lock status block for the dlm */ 638 memset(&res->l_lksb, 0, sizeof(res->l_lksb)); 639 640 res->l_flags = 0UL; 641 mlog_exit_void(); 642 } 643 644 static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres, 645 int level) 646 { 647 mlog_entry_void(); 648 649 BUG_ON(!lockres); 650 651 switch(level) { 652 case DLM_LOCK_EX: 653 lockres->l_ex_holders++; 654 break; 655 case DLM_LOCK_PR: 656 lockres->l_ro_holders++; 657 break; 658 default: 659 BUG(); 660 } 661 662 mlog_exit_void(); 663 } 664 665 static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres, 666 int level) 667 { 668 mlog_entry_void(); 669 670 BUG_ON(!lockres); 671 672 switch(level) { 673 case DLM_LOCK_EX: 674 BUG_ON(!lockres->l_ex_holders); 675 lockres->l_ex_holders--; 676 break; 677 case DLM_LOCK_PR: 678 BUG_ON(!lockres->l_ro_holders); 679 lockres->l_ro_holders--; 680 break; 681 default: 682 BUG(); 683 } 684 mlog_exit_void(); 685 } 686 687 /* WARNING: This function lives in a world where the only three lock 688 * levels are EX, PR, and NL. It *will* have to be adjusted when more 689 * lock types are added. */ 690 static inline int ocfs2_highest_compat_lock_level(int level) 691 { 692 int new_level = DLM_LOCK_EX; 693 694 if (level == DLM_LOCK_EX) 695 new_level = DLM_LOCK_NL; 696 else if (level == DLM_LOCK_PR) 697 new_level = DLM_LOCK_PR; 698 return new_level; 699 } 700 701 static void lockres_set_flags(struct ocfs2_lock_res *lockres, 702 unsigned long newflags) 703 { 704 struct ocfs2_mask_waiter *mw, *tmp; 705 706 assert_spin_locked(&lockres->l_lock); 707 708 lockres->l_flags = newflags; 709 710 list_for_each_entry_safe(mw, tmp, &lockres->l_mask_waiters, mw_item) { 711 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal) 712 continue; 713 714 list_del_init(&mw->mw_item); 715 mw->mw_status = 0; 716 complete(&mw->mw_complete); 717 } 718 } 719 static void lockres_or_flags(struct ocfs2_lock_res *lockres, unsigned long or) 720 { 721 lockres_set_flags(lockres, lockres->l_flags | or); 722 } 723 static void lockres_clear_flags(struct ocfs2_lock_res *lockres, 724 unsigned long clear) 725 { 726 lockres_set_flags(lockres, lockres->l_flags & ~clear); 727 } 728 729 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres) 730 { 731 mlog_entry_void(); 732 733 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 734 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED)); 735 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); 736 BUG_ON(lockres->l_blocking <= DLM_LOCK_NL); 737 738 lockres->l_level = lockres->l_requested; 739 if (lockres->l_level <= 740 ocfs2_highest_compat_lock_level(lockres->l_blocking)) { 741 lockres->l_blocking = DLM_LOCK_NL; 742 lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED); 743 } 744 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 745 746 mlog_exit_void(); 747 } 748 749 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres) 750 { 751 mlog_entry_void(); 752 753 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 754 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED)); 755 756 /* Convert from RO to EX doesn't really need anything as our 757 * information is already up to data. Convert from NL to 758 * *anything* however should mark ourselves as needing an 759 * update */ 760 if (lockres->l_level == DLM_LOCK_NL && 761 lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 762 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 763 764 lockres->l_level = lockres->l_requested; 765 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 766 767 mlog_exit_void(); 768 } 769 770 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres) 771 { 772 mlog_entry_void(); 773 774 BUG_ON((!(lockres->l_flags & OCFS2_LOCK_BUSY))); 775 BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED); 776 777 if (lockres->l_requested > DLM_LOCK_NL && 778 !(lockres->l_flags & OCFS2_LOCK_LOCAL) && 779 lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 780 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 781 782 lockres->l_level = lockres->l_requested; 783 lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED); 784 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 785 786 mlog_exit_void(); 787 } 788 789 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, 790 int level) 791 { 792 int needs_downconvert = 0; 793 mlog_entry_void(); 794 795 assert_spin_locked(&lockres->l_lock); 796 797 lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED); 798 799 if (level > lockres->l_blocking) { 800 /* only schedule a downconvert if we haven't already scheduled 801 * one that goes low enough to satisfy the level we're 802 * blocking. this also catches the case where we get 803 * duplicate BASTs */ 804 if (ocfs2_highest_compat_lock_level(level) < 805 ocfs2_highest_compat_lock_level(lockres->l_blocking)) 806 needs_downconvert = 1; 807 808 lockres->l_blocking = level; 809 } 810 811 mlog_exit(needs_downconvert); 812 return needs_downconvert; 813 } 814 815 /* 816 * OCFS2_LOCK_PENDING and l_pending_gen. 817 * 818 * Why does OCFS2_LOCK_PENDING exist? To close a race between setting 819 * OCFS2_LOCK_BUSY and calling ocfs2_dlm_lock(). See ocfs2_unblock_lock() 820 * for more details on the race. 821 * 822 * OCFS2_LOCK_PENDING closes the race quite nicely. However, it introduces 823 * a race on itself. In o2dlm, we can get the ast before ocfs2_dlm_lock() 824 * returns. The ast clears OCFS2_LOCK_BUSY, and must therefore clear 825 * OCFS2_LOCK_PENDING at the same time. When ocfs2_dlm_lock() returns, 826 * the caller is going to try to clear PENDING again. If nothing else is 827 * happening, __lockres_clear_pending() sees PENDING is unset and does 828 * nothing. 829 * 830 * But what if another path (eg downconvert thread) has just started a 831 * new locking action? The other path has re-set PENDING. Our path 832 * cannot clear PENDING, because that will re-open the original race 833 * window. 834 * 835 * [Example] 836 * 837 * ocfs2_meta_lock() 838 * ocfs2_cluster_lock() 839 * set BUSY 840 * set PENDING 841 * drop l_lock 842 * ocfs2_dlm_lock() 843 * ocfs2_locking_ast() ocfs2_downconvert_thread() 844 * clear PENDING ocfs2_unblock_lock() 845 * take_l_lock 846 * !BUSY 847 * ocfs2_prepare_downconvert() 848 * set BUSY 849 * set PENDING 850 * drop l_lock 851 * take l_lock 852 * clear PENDING 853 * drop l_lock 854 * <window> 855 * ocfs2_dlm_lock() 856 * 857 * So as you can see, we now have a window where l_lock is not held, 858 * PENDING is not set, and ocfs2_dlm_lock() has not been called. 859 * 860 * The core problem is that ocfs2_cluster_lock() has cleared the PENDING 861 * set by ocfs2_prepare_downconvert(). That wasn't nice. 862 * 863 * To solve this we introduce l_pending_gen. A call to 864 * lockres_clear_pending() will only do so when it is passed a generation 865 * number that matches the lockres. lockres_set_pending() will return the 866 * current generation number. When ocfs2_cluster_lock() goes to clear 867 * PENDING, it passes the generation it got from set_pending(). In our 868 * example above, the generation numbers will *not* match. Thus, 869 * ocfs2_cluster_lock() will not clear the PENDING set by 870 * ocfs2_prepare_downconvert(). 871 */ 872 873 /* Unlocked version for ocfs2_locking_ast() */ 874 static void __lockres_clear_pending(struct ocfs2_lock_res *lockres, 875 unsigned int generation, 876 struct ocfs2_super *osb) 877 { 878 assert_spin_locked(&lockres->l_lock); 879 880 /* 881 * The ast and locking functions can race us here. The winner 882 * will clear pending, the loser will not. 883 */ 884 if (!(lockres->l_flags & OCFS2_LOCK_PENDING) || 885 (lockres->l_pending_gen != generation)) 886 return; 887 888 lockres_clear_flags(lockres, OCFS2_LOCK_PENDING); 889 lockres->l_pending_gen++; 890 891 /* 892 * The downconvert thread may have skipped us because we 893 * were PENDING. Wake it up. 894 */ 895 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) 896 ocfs2_wake_downconvert_thread(osb); 897 } 898 899 /* Locked version for callers of ocfs2_dlm_lock() */ 900 static void lockres_clear_pending(struct ocfs2_lock_res *lockres, 901 unsigned int generation, 902 struct ocfs2_super *osb) 903 { 904 unsigned long flags; 905 906 spin_lock_irqsave(&lockres->l_lock, flags); 907 __lockres_clear_pending(lockres, generation, osb); 908 spin_unlock_irqrestore(&lockres->l_lock, flags); 909 } 910 911 static unsigned int lockres_set_pending(struct ocfs2_lock_res *lockres) 912 { 913 assert_spin_locked(&lockres->l_lock); 914 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 915 916 lockres_or_flags(lockres, OCFS2_LOCK_PENDING); 917 918 return lockres->l_pending_gen; 919 } 920 921 922 static void ocfs2_blocking_ast(void *opaque, int level) 923 { 924 struct ocfs2_lock_res *lockres = opaque; 925 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 926 int needs_downconvert; 927 unsigned long flags; 928 929 BUG_ON(level <= DLM_LOCK_NL); 930 931 mlog(0, "BAST fired for lockres %s, blocking %d, level %d type %s\n", 932 lockres->l_name, level, lockres->l_level, 933 ocfs2_lock_type_string(lockres->l_type)); 934 935 /* 936 * We can skip the bast for locks which don't enable caching - 937 * they'll be dropped at the earliest possible time anyway. 938 */ 939 if (lockres->l_flags & OCFS2_LOCK_NOCACHE) 940 return; 941 942 spin_lock_irqsave(&lockres->l_lock, flags); 943 needs_downconvert = ocfs2_generic_handle_bast(lockres, level); 944 if (needs_downconvert) 945 ocfs2_schedule_blocked_lock(osb, lockres); 946 spin_unlock_irqrestore(&lockres->l_lock, flags); 947 948 wake_up(&lockres->l_event); 949 950 ocfs2_wake_downconvert_thread(osb); 951 } 952 953 static void ocfs2_locking_ast(void *opaque) 954 { 955 struct ocfs2_lock_res *lockres = opaque; 956 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 957 unsigned long flags; 958 int status; 959 960 spin_lock_irqsave(&lockres->l_lock, flags); 961 962 status = ocfs2_dlm_lock_status(&lockres->l_lksb); 963 964 if (status == -EAGAIN) { 965 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 966 goto out; 967 } 968 969 if (status) { 970 mlog(ML_ERROR, "lockres %s: lksb status value of %d!\n", 971 lockres->l_name, status); 972 spin_unlock_irqrestore(&lockres->l_lock, flags); 973 return; 974 } 975 976 switch(lockres->l_action) { 977 case OCFS2_AST_ATTACH: 978 ocfs2_generic_handle_attach_action(lockres); 979 lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL); 980 break; 981 case OCFS2_AST_CONVERT: 982 ocfs2_generic_handle_convert_action(lockres); 983 break; 984 case OCFS2_AST_DOWNCONVERT: 985 ocfs2_generic_handle_downconvert_action(lockres); 986 break; 987 default: 988 mlog(ML_ERROR, "lockres %s: ast fired with invalid action: %u " 989 "lockres flags = 0x%lx, unlock action: %u\n", 990 lockres->l_name, lockres->l_action, lockres->l_flags, 991 lockres->l_unlock_action); 992 BUG(); 993 } 994 out: 995 /* set it to something invalid so if we get called again we 996 * can catch it. */ 997 lockres->l_action = OCFS2_AST_INVALID; 998 999 /* Did we try to cancel this lock? Clear that state */ 1000 if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) 1001 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 1002 1003 /* 1004 * We may have beaten the locking functions here. We certainly 1005 * know that dlm_lock() has been called :-) 1006 * Because we can't have two lock calls in flight at once, we 1007 * can use lockres->l_pending_gen. 1008 */ 1009 __lockres_clear_pending(lockres, lockres->l_pending_gen, osb); 1010 1011 wake_up(&lockres->l_event); 1012 spin_unlock_irqrestore(&lockres->l_lock, flags); 1013 } 1014 1015 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres, 1016 int convert) 1017 { 1018 unsigned long flags; 1019 1020 mlog_entry_void(); 1021 spin_lock_irqsave(&lockres->l_lock, flags); 1022 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 1023 if (convert) 1024 lockres->l_action = OCFS2_AST_INVALID; 1025 else 1026 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 1027 spin_unlock_irqrestore(&lockres->l_lock, flags); 1028 1029 wake_up(&lockres->l_event); 1030 mlog_exit_void(); 1031 } 1032 1033 /* Note: If we detect another process working on the lock (i.e., 1034 * OCFS2_LOCK_BUSY), we'll bail out returning 0. It's up to the caller 1035 * to do the right thing in that case. 1036 */ 1037 static int ocfs2_lock_create(struct ocfs2_super *osb, 1038 struct ocfs2_lock_res *lockres, 1039 int level, 1040 u32 dlm_flags) 1041 { 1042 int ret = 0; 1043 unsigned long flags; 1044 unsigned int gen; 1045 1046 mlog_entry_void(); 1047 1048 mlog(0, "lock %s, level = %d, flags = %u\n", lockres->l_name, level, 1049 dlm_flags); 1050 1051 spin_lock_irqsave(&lockres->l_lock, flags); 1052 if ((lockres->l_flags & OCFS2_LOCK_ATTACHED) || 1053 (lockres->l_flags & OCFS2_LOCK_BUSY)) { 1054 spin_unlock_irqrestore(&lockres->l_lock, flags); 1055 goto bail; 1056 } 1057 1058 lockres->l_action = OCFS2_AST_ATTACH; 1059 lockres->l_requested = level; 1060 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 1061 gen = lockres_set_pending(lockres); 1062 spin_unlock_irqrestore(&lockres->l_lock, flags); 1063 1064 ret = ocfs2_dlm_lock(osb->cconn, 1065 level, 1066 &lockres->l_lksb, 1067 dlm_flags, 1068 lockres->l_name, 1069 OCFS2_LOCK_ID_MAX_LEN - 1, 1070 lockres); 1071 lockres_clear_pending(lockres, gen, osb); 1072 if (ret) { 1073 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 1074 ocfs2_recover_from_dlm_error(lockres, 1); 1075 } 1076 1077 mlog(0, "lock %s, return from ocfs2_dlm_lock\n", lockres->l_name); 1078 1079 bail: 1080 mlog_exit(ret); 1081 return ret; 1082 } 1083 1084 static inline int ocfs2_check_wait_flag(struct ocfs2_lock_res *lockres, 1085 int flag) 1086 { 1087 unsigned long flags; 1088 int ret; 1089 1090 spin_lock_irqsave(&lockres->l_lock, flags); 1091 ret = lockres->l_flags & flag; 1092 spin_unlock_irqrestore(&lockres->l_lock, flags); 1093 1094 return ret; 1095 } 1096 1097 static inline void ocfs2_wait_on_busy_lock(struct ocfs2_lock_res *lockres) 1098 1099 { 1100 wait_event(lockres->l_event, 1101 !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_BUSY)); 1102 } 1103 1104 static inline void ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res *lockres) 1105 1106 { 1107 wait_event(lockres->l_event, 1108 !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_REFRESHING)); 1109 } 1110 1111 /* predict what lock level we'll be dropping down to on behalf 1112 * of another node, and return true if the currently wanted 1113 * level will be compatible with it. */ 1114 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres, 1115 int wanted) 1116 { 1117 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); 1118 1119 return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking); 1120 } 1121 1122 static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw) 1123 { 1124 INIT_LIST_HEAD(&mw->mw_item); 1125 init_completion(&mw->mw_complete); 1126 ocfs2_init_start_time(mw); 1127 } 1128 1129 static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw) 1130 { 1131 wait_for_completion(&mw->mw_complete); 1132 /* Re-arm the completion in case we want to wait on it again */ 1133 INIT_COMPLETION(mw->mw_complete); 1134 return mw->mw_status; 1135 } 1136 1137 static void lockres_add_mask_waiter(struct ocfs2_lock_res *lockres, 1138 struct ocfs2_mask_waiter *mw, 1139 unsigned long mask, 1140 unsigned long goal) 1141 { 1142 BUG_ON(!list_empty(&mw->mw_item)); 1143 1144 assert_spin_locked(&lockres->l_lock); 1145 1146 list_add_tail(&mw->mw_item, &lockres->l_mask_waiters); 1147 mw->mw_mask = mask; 1148 mw->mw_goal = goal; 1149 } 1150 1151 /* returns 0 if the mw that was removed was already satisfied, -EBUSY 1152 * if the mask still hadn't reached its goal */ 1153 static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres, 1154 struct ocfs2_mask_waiter *mw) 1155 { 1156 unsigned long flags; 1157 int ret = 0; 1158 1159 spin_lock_irqsave(&lockres->l_lock, flags); 1160 if (!list_empty(&mw->mw_item)) { 1161 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal) 1162 ret = -EBUSY; 1163 1164 list_del_init(&mw->mw_item); 1165 init_completion(&mw->mw_complete); 1166 } 1167 spin_unlock_irqrestore(&lockres->l_lock, flags); 1168 1169 return ret; 1170 1171 } 1172 1173 static int ocfs2_wait_for_mask_interruptible(struct ocfs2_mask_waiter *mw, 1174 struct ocfs2_lock_res *lockres) 1175 { 1176 int ret; 1177 1178 ret = wait_for_completion_interruptible(&mw->mw_complete); 1179 if (ret) 1180 lockres_remove_mask_waiter(lockres, mw); 1181 else 1182 ret = mw->mw_status; 1183 /* Re-arm the completion in case we want to wait on it again */ 1184 INIT_COMPLETION(mw->mw_complete); 1185 return ret; 1186 } 1187 1188 static int ocfs2_cluster_lock(struct ocfs2_super *osb, 1189 struct ocfs2_lock_res *lockres, 1190 int level, 1191 u32 lkm_flags, 1192 int arg_flags) 1193 { 1194 struct ocfs2_mask_waiter mw; 1195 int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR); 1196 int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */ 1197 unsigned long flags; 1198 unsigned int gen; 1199 int noqueue_attempted = 0; 1200 1201 mlog_entry_void(); 1202 1203 ocfs2_init_mask_waiter(&mw); 1204 1205 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) 1206 lkm_flags |= DLM_LKF_VALBLK; 1207 1208 again: 1209 wait = 0; 1210 1211 if (catch_signals && signal_pending(current)) { 1212 ret = -ERESTARTSYS; 1213 goto out; 1214 } 1215 1216 spin_lock_irqsave(&lockres->l_lock, flags); 1217 1218 mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING, 1219 "Cluster lock called on freeing lockres %s! flags " 1220 "0x%lx\n", lockres->l_name, lockres->l_flags); 1221 1222 /* We only compare against the currently granted level 1223 * here. If the lock is blocked waiting on a downconvert, 1224 * we'll get caught below. */ 1225 if (lockres->l_flags & OCFS2_LOCK_BUSY && 1226 level > lockres->l_level) { 1227 /* is someone sitting in dlm_lock? If so, wait on 1228 * them. */ 1229 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1230 wait = 1; 1231 goto unlock; 1232 } 1233 1234 if (lockres->l_flags & OCFS2_LOCK_BLOCKED && 1235 !ocfs2_may_continue_on_blocked_lock(lockres, level)) { 1236 /* is the lock is currently blocked on behalf of 1237 * another node */ 1238 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BLOCKED, 0); 1239 wait = 1; 1240 goto unlock; 1241 } 1242 1243 if (level > lockres->l_level) { 1244 if (noqueue_attempted > 0) { 1245 ret = -EAGAIN; 1246 goto unlock; 1247 } 1248 if (lkm_flags & DLM_LKF_NOQUEUE) 1249 noqueue_attempted = 1; 1250 1251 if (lockres->l_action != OCFS2_AST_INVALID) 1252 mlog(ML_ERROR, "lockres %s has action %u pending\n", 1253 lockres->l_name, lockres->l_action); 1254 1255 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 1256 lockres->l_action = OCFS2_AST_ATTACH; 1257 lkm_flags &= ~DLM_LKF_CONVERT; 1258 } else { 1259 lockres->l_action = OCFS2_AST_CONVERT; 1260 lkm_flags |= DLM_LKF_CONVERT; 1261 } 1262 1263 lockres->l_requested = level; 1264 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 1265 gen = lockres_set_pending(lockres); 1266 spin_unlock_irqrestore(&lockres->l_lock, flags); 1267 1268 BUG_ON(level == DLM_LOCK_IV); 1269 BUG_ON(level == DLM_LOCK_NL); 1270 1271 mlog(0, "lock %s, convert from %d to level = %d\n", 1272 lockres->l_name, lockres->l_level, level); 1273 1274 /* call dlm_lock to upgrade lock now */ 1275 ret = ocfs2_dlm_lock(osb->cconn, 1276 level, 1277 &lockres->l_lksb, 1278 lkm_flags, 1279 lockres->l_name, 1280 OCFS2_LOCK_ID_MAX_LEN - 1, 1281 lockres); 1282 lockres_clear_pending(lockres, gen, osb); 1283 if (ret) { 1284 if (!(lkm_flags & DLM_LKF_NOQUEUE) || 1285 (ret != -EAGAIN)) { 1286 ocfs2_log_dlm_error("ocfs2_dlm_lock", 1287 ret, lockres); 1288 } 1289 ocfs2_recover_from_dlm_error(lockres, 1); 1290 goto out; 1291 } 1292 1293 mlog(0, "lock %s, successfull return from ocfs2_dlm_lock\n", 1294 lockres->l_name); 1295 1296 /* At this point we've gone inside the dlm and need to 1297 * complete our work regardless. */ 1298 catch_signals = 0; 1299 1300 /* wait for busy to clear and carry on */ 1301 goto again; 1302 } 1303 1304 /* Ok, if we get here then we're good to go. */ 1305 ocfs2_inc_holders(lockres, level); 1306 1307 ret = 0; 1308 unlock: 1309 spin_unlock_irqrestore(&lockres->l_lock, flags); 1310 out: 1311 /* 1312 * This is helping work around a lock inversion between the page lock 1313 * and dlm locks. One path holds the page lock while calling aops 1314 * which block acquiring dlm locks. The voting thread holds dlm 1315 * locks while acquiring page locks while down converting data locks. 1316 * This block is helping an aop path notice the inversion and back 1317 * off to unlock its page lock before trying the dlm lock again. 1318 */ 1319 if (wait && arg_flags & OCFS2_LOCK_NONBLOCK && 1320 mw.mw_mask & (OCFS2_LOCK_BUSY|OCFS2_LOCK_BLOCKED)) { 1321 wait = 0; 1322 if (lockres_remove_mask_waiter(lockres, &mw)) 1323 ret = -EAGAIN; 1324 else 1325 goto again; 1326 } 1327 if (wait) { 1328 ret = ocfs2_wait_for_mask(&mw); 1329 if (ret == 0) 1330 goto again; 1331 mlog_errno(ret); 1332 } 1333 ocfs2_update_lock_stats(lockres, level, &mw, ret); 1334 1335 mlog_exit(ret); 1336 return ret; 1337 } 1338 1339 static void ocfs2_cluster_unlock(struct ocfs2_super *osb, 1340 struct ocfs2_lock_res *lockres, 1341 int level) 1342 { 1343 unsigned long flags; 1344 1345 mlog_entry_void(); 1346 spin_lock_irqsave(&lockres->l_lock, flags); 1347 ocfs2_dec_holders(lockres, level); 1348 ocfs2_downconvert_on_unlock(osb, lockres); 1349 spin_unlock_irqrestore(&lockres->l_lock, flags); 1350 mlog_exit_void(); 1351 } 1352 1353 static int ocfs2_create_new_lock(struct ocfs2_super *osb, 1354 struct ocfs2_lock_res *lockres, 1355 int ex, 1356 int local) 1357 { 1358 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 1359 unsigned long flags; 1360 u32 lkm_flags = local ? DLM_LKF_LOCAL : 0; 1361 1362 spin_lock_irqsave(&lockres->l_lock, flags); 1363 BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED); 1364 lockres_or_flags(lockres, OCFS2_LOCK_LOCAL); 1365 spin_unlock_irqrestore(&lockres->l_lock, flags); 1366 1367 return ocfs2_lock_create(osb, lockres, level, lkm_flags); 1368 } 1369 1370 /* Grants us an EX lock on the data and metadata resources, skipping 1371 * the normal cluster directory lookup. Use this ONLY on newly created 1372 * inodes which other nodes can't possibly see, and which haven't been 1373 * hashed in the inode hash yet. This can give us a good performance 1374 * increase as it'll skip the network broadcast normally associated 1375 * with creating a new lock resource. */ 1376 int ocfs2_create_new_inode_locks(struct inode *inode) 1377 { 1378 int ret; 1379 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1380 1381 BUG_ON(!inode); 1382 BUG_ON(!ocfs2_inode_is_new(inode)); 1383 1384 mlog_entry_void(); 1385 1386 mlog(0, "Inode %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno); 1387 1388 /* NOTE: That we don't increment any of the holder counts, nor 1389 * do we add anything to a journal handle. Since this is 1390 * supposed to be a new inode which the cluster doesn't know 1391 * about yet, there is no need to. As far as the LVB handling 1392 * is concerned, this is basically like acquiring an EX lock 1393 * on a resource which has an invalid one -- we'll set it 1394 * valid when we release the EX. */ 1395 1396 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1, 1); 1397 if (ret) { 1398 mlog_errno(ret); 1399 goto bail; 1400 } 1401 1402 /* 1403 * We don't want to use DLM_LKF_LOCAL on a meta data lock as they 1404 * don't use a generation in their lock names. 1405 */ 1406 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_inode_lockres, 1, 0); 1407 if (ret) { 1408 mlog_errno(ret); 1409 goto bail; 1410 } 1411 1412 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_open_lockres, 0, 0); 1413 if (ret) { 1414 mlog_errno(ret); 1415 goto bail; 1416 } 1417 1418 bail: 1419 mlog_exit(ret); 1420 return ret; 1421 } 1422 1423 int ocfs2_rw_lock(struct inode *inode, int write) 1424 { 1425 int status, level; 1426 struct ocfs2_lock_res *lockres; 1427 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1428 1429 BUG_ON(!inode); 1430 1431 mlog_entry_void(); 1432 1433 mlog(0, "inode %llu take %s RW lock\n", 1434 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1435 write ? "EXMODE" : "PRMODE"); 1436 1437 if (ocfs2_mount_local(osb)) 1438 return 0; 1439 1440 lockres = &OCFS2_I(inode)->ip_rw_lockres; 1441 1442 level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1443 1444 status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0, 1445 0); 1446 if (status < 0) 1447 mlog_errno(status); 1448 1449 mlog_exit(status); 1450 return status; 1451 } 1452 1453 void ocfs2_rw_unlock(struct inode *inode, int write) 1454 { 1455 int level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1456 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres; 1457 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1458 1459 mlog_entry_void(); 1460 1461 mlog(0, "inode %llu drop %s RW lock\n", 1462 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1463 write ? "EXMODE" : "PRMODE"); 1464 1465 if (!ocfs2_mount_local(osb)) 1466 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level); 1467 1468 mlog_exit_void(); 1469 } 1470 1471 /* 1472 * ocfs2_open_lock always get PR mode lock. 1473 */ 1474 int ocfs2_open_lock(struct inode *inode) 1475 { 1476 int status = 0; 1477 struct ocfs2_lock_res *lockres; 1478 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1479 1480 BUG_ON(!inode); 1481 1482 mlog_entry_void(); 1483 1484 mlog(0, "inode %llu take PRMODE open lock\n", 1485 (unsigned long long)OCFS2_I(inode)->ip_blkno); 1486 1487 if (ocfs2_mount_local(osb)) 1488 goto out; 1489 1490 lockres = &OCFS2_I(inode)->ip_open_lockres; 1491 1492 status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, 1493 DLM_LOCK_PR, 0, 0); 1494 if (status < 0) 1495 mlog_errno(status); 1496 1497 out: 1498 mlog_exit(status); 1499 return status; 1500 } 1501 1502 int ocfs2_try_open_lock(struct inode *inode, int write) 1503 { 1504 int status = 0, level; 1505 struct ocfs2_lock_res *lockres; 1506 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1507 1508 BUG_ON(!inode); 1509 1510 mlog_entry_void(); 1511 1512 mlog(0, "inode %llu try to take %s open lock\n", 1513 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1514 write ? "EXMODE" : "PRMODE"); 1515 1516 if (ocfs2_mount_local(osb)) 1517 goto out; 1518 1519 lockres = &OCFS2_I(inode)->ip_open_lockres; 1520 1521 level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1522 1523 /* 1524 * The file system may already holding a PRMODE/EXMODE open lock. 1525 * Since we pass DLM_LKF_NOQUEUE, the request won't block waiting on 1526 * other nodes and the -EAGAIN will indicate to the caller that 1527 * this inode is still in use. 1528 */ 1529 status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, 1530 level, DLM_LKF_NOQUEUE, 0); 1531 1532 out: 1533 mlog_exit(status); 1534 return status; 1535 } 1536 1537 /* 1538 * ocfs2_open_unlock unlock PR and EX mode open locks. 1539 */ 1540 void ocfs2_open_unlock(struct inode *inode) 1541 { 1542 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_open_lockres; 1543 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1544 1545 mlog_entry_void(); 1546 1547 mlog(0, "inode %llu drop open lock\n", 1548 (unsigned long long)OCFS2_I(inode)->ip_blkno); 1549 1550 if (ocfs2_mount_local(osb)) 1551 goto out; 1552 1553 if(lockres->l_ro_holders) 1554 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, 1555 DLM_LOCK_PR); 1556 if(lockres->l_ex_holders) 1557 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, 1558 DLM_LOCK_EX); 1559 1560 out: 1561 mlog_exit_void(); 1562 } 1563 1564 static int ocfs2_flock_handle_signal(struct ocfs2_lock_res *lockres, 1565 int level) 1566 { 1567 int ret; 1568 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 1569 unsigned long flags; 1570 struct ocfs2_mask_waiter mw; 1571 1572 ocfs2_init_mask_waiter(&mw); 1573 1574 retry_cancel: 1575 spin_lock_irqsave(&lockres->l_lock, flags); 1576 if (lockres->l_flags & OCFS2_LOCK_BUSY) { 1577 ret = ocfs2_prepare_cancel_convert(osb, lockres); 1578 if (ret) { 1579 spin_unlock_irqrestore(&lockres->l_lock, flags); 1580 ret = ocfs2_cancel_convert(osb, lockres); 1581 if (ret < 0) { 1582 mlog_errno(ret); 1583 goto out; 1584 } 1585 goto retry_cancel; 1586 } 1587 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1588 spin_unlock_irqrestore(&lockres->l_lock, flags); 1589 1590 ocfs2_wait_for_mask(&mw); 1591 goto retry_cancel; 1592 } 1593 1594 ret = -ERESTARTSYS; 1595 /* 1596 * We may still have gotten the lock, in which case there's no 1597 * point to restarting the syscall. 1598 */ 1599 if (lockres->l_level == level) 1600 ret = 0; 1601 1602 mlog(0, "Cancel returning %d. flags: 0x%lx, level: %d, act: %d\n", ret, 1603 lockres->l_flags, lockres->l_level, lockres->l_action); 1604 1605 spin_unlock_irqrestore(&lockres->l_lock, flags); 1606 1607 out: 1608 return ret; 1609 } 1610 1611 /* 1612 * ocfs2_file_lock() and ocfs2_file_unlock() map to a single pair of 1613 * flock() calls. The locking approach this requires is sufficiently 1614 * different from all other cluster lock types that we implement a 1615 * seperate path to the "low-level" dlm calls. In particular: 1616 * 1617 * - No optimization of lock levels is done - we take at exactly 1618 * what's been requested. 1619 * 1620 * - No lock caching is employed. We immediately downconvert to 1621 * no-lock at unlock time. This also means flock locks never go on 1622 * the blocking list). 1623 * 1624 * - Since userspace can trivially deadlock itself with flock, we make 1625 * sure to allow cancellation of a misbehaving applications flock() 1626 * request. 1627 * 1628 * - Access to any flock lockres doesn't require concurrency, so we 1629 * can simplify the code by requiring the caller to guarantee 1630 * serialization of dlmglue flock calls. 1631 */ 1632 int ocfs2_file_lock(struct file *file, int ex, int trylock) 1633 { 1634 int ret, level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 1635 unsigned int lkm_flags = trylock ? DLM_LKF_NOQUEUE : 0; 1636 unsigned long flags; 1637 struct ocfs2_file_private *fp = file->private_data; 1638 struct ocfs2_lock_res *lockres = &fp->fp_flock; 1639 struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb); 1640 struct ocfs2_mask_waiter mw; 1641 1642 ocfs2_init_mask_waiter(&mw); 1643 1644 if ((lockres->l_flags & OCFS2_LOCK_BUSY) || 1645 (lockres->l_level > DLM_LOCK_NL)) { 1646 mlog(ML_ERROR, 1647 "File lock \"%s\" has busy or locked state: flags: 0x%lx, " 1648 "level: %u\n", lockres->l_name, lockres->l_flags, 1649 lockres->l_level); 1650 return -EINVAL; 1651 } 1652 1653 spin_lock_irqsave(&lockres->l_lock, flags); 1654 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 1655 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1656 spin_unlock_irqrestore(&lockres->l_lock, flags); 1657 1658 /* 1659 * Get the lock at NLMODE to start - that way we 1660 * can cancel the upconvert request if need be. 1661 */ 1662 ret = ocfs2_lock_create(osb, lockres, DLM_LOCK_NL, 0); 1663 if (ret < 0) { 1664 mlog_errno(ret); 1665 goto out; 1666 } 1667 1668 ret = ocfs2_wait_for_mask(&mw); 1669 if (ret) { 1670 mlog_errno(ret); 1671 goto out; 1672 } 1673 spin_lock_irqsave(&lockres->l_lock, flags); 1674 } 1675 1676 lockres->l_action = OCFS2_AST_CONVERT; 1677 lkm_flags |= DLM_LKF_CONVERT; 1678 lockres->l_requested = level; 1679 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 1680 1681 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1682 spin_unlock_irqrestore(&lockres->l_lock, flags); 1683 1684 ret = ocfs2_dlm_lock(osb->cconn, level, &lockres->l_lksb, lkm_flags, 1685 lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1, 1686 lockres); 1687 if (ret) { 1688 if (!trylock || (ret != -EAGAIN)) { 1689 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 1690 ret = -EINVAL; 1691 } 1692 1693 ocfs2_recover_from_dlm_error(lockres, 1); 1694 lockres_remove_mask_waiter(lockres, &mw); 1695 goto out; 1696 } 1697 1698 ret = ocfs2_wait_for_mask_interruptible(&mw, lockres); 1699 if (ret == -ERESTARTSYS) { 1700 /* 1701 * Userspace can cause deadlock itself with 1702 * flock(). Current behavior locally is to allow the 1703 * deadlock, but abort the system call if a signal is 1704 * received. We follow this example, otherwise a 1705 * poorly written program could sit in kernel until 1706 * reboot. 1707 * 1708 * Handling this is a bit more complicated for Ocfs2 1709 * though. We can't exit this function with an 1710 * outstanding lock request, so a cancel convert is 1711 * required. We intentionally overwrite 'ret' - if the 1712 * cancel fails and the lock was granted, it's easier 1713 * to just bubble sucess back up to the user. 1714 */ 1715 ret = ocfs2_flock_handle_signal(lockres, level); 1716 } else if (!ret && (level > lockres->l_level)) { 1717 /* Trylock failed asynchronously */ 1718 BUG_ON(!trylock); 1719 ret = -EAGAIN; 1720 } 1721 1722 out: 1723 1724 mlog(0, "Lock: \"%s\" ex: %d, trylock: %d, returns: %d\n", 1725 lockres->l_name, ex, trylock, ret); 1726 return ret; 1727 } 1728 1729 void ocfs2_file_unlock(struct file *file) 1730 { 1731 int ret; 1732 unsigned int gen; 1733 unsigned long flags; 1734 struct ocfs2_file_private *fp = file->private_data; 1735 struct ocfs2_lock_res *lockres = &fp->fp_flock; 1736 struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb); 1737 struct ocfs2_mask_waiter mw; 1738 1739 ocfs2_init_mask_waiter(&mw); 1740 1741 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) 1742 return; 1743 1744 if (lockres->l_level == DLM_LOCK_NL) 1745 return; 1746 1747 mlog(0, "Unlock: \"%s\" flags: 0x%lx, level: %d, act: %d\n", 1748 lockres->l_name, lockres->l_flags, lockres->l_level, 1749 lockres->l_action); 1750 1751 spin_lock_irqsave(&lockres->l_lock, flags); 1752 /* 1753 * Fake a blocking ast for the downconvert code. 1754 */ 1755 lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED); 1756 lockres->l_blocking = DLM_LOCK_EX; 1757 1758 gen = ocfs2_prepare_downconvert(lockres, DLM_LOCK_NL); 1759 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1760 spin_unlock_irqrestore(&lockres->l_lock, flags); 1761 1762 ret = ocfs2_downconvert_lock(osb, lockres, DLM_LOCK_NL, 0, gen); 1763 if (ret) { 1764 mlog_errno(ret); 1765 return; 1766 } 1767 1768 ret = ocfs2_wait_for_mask(&mw); 1769 if (ret) 1770 mlog_errno(ret); 1771 } 1772 1773 static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb, 1774 struct ocfs2_lock_res *lockres) 1775 { 1776 int kick = 0; 1777 1778 mlog_entry_void(); 1779 1780 /* If we know that another node is waiting on our lock, kick 1781 * the downconvert thread * pre-emptively when we reach a release 1782 * condition. */ 1783 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) { 1784 switch(lockres->l_blocking) { 1785 case DLM_LOCK_EX: 1786 if (!lockres->l_ex_holders && !lockres->l_ro_holders) 1787 kick = 1; 1788 break; 1789 case DLM_LOCK_PR: 1790 if (!lockres->l_ex_holders) 1791 kick = 1; 1792 break; 1793 default: 1794 BUG(); 1795 } 1796 } 1797 1798 if (kick) 1799 ocfs2_wake_downconvert_thread(osb); 1800 1801 mlog_exit_void(); 1802 } 1803 1804 #define OCFS2_SEC_BITS 34 1805 #define OCFS2_SEC_SHIFT (64 - 34) 1806 #define OCFS2_NSEC_MASK ((1ULL << OCFS2_SEC_SHIFT) - 1) 1807 1808 /* LVB only has room for 64 bits of time here so we pack it for 1809 * now. */ 1810 static u64 ocfs2_pack_timespec(struct timespec *spec) 1811 { 1812 u64 res; 1813 u64 sec = spec->tv_sec; 1814 u32 nsec = spec->tv_nsec; 1815 1816 res = (sec << OCFS2_SEC_SHIFT) | (nsec & OCFS2_NSEC_MASK); 1817 1818 return res; 1819 } 1820 1821 /* Call this with the lockres locked. I am reasonably sure we don't 1822 * need ip_lock in this function as anyone who would be changing those 1823 * values is supposed to be blocked in ocfs2_inode_lock right now. */ 1824 static void __ocfs2_stuff_meta_lvb(struct inode *inode) 1825 { 1826 struct ocfs2_inode_info *oi = OCFS2_I(inode); 1827 struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; 1828 struct ocfs2_meta_lvb *lvb; 1829 1830 mlog_entry_void(); 1831 1832 lvb = (struct ocfs2_meta_lvb *)ocfs2_dlm_lvb(&lockres->l_lksb); 1833 1834 /* 1835 * Invalidate the LVB of a deleted inode - this way other 1836 * nodes are forced to go to disk and discover the new inode 1837 * status. 1838 */ 1839 if (oi->ip_flags & OCFS2_INODE_DELETED) { 1840 lvb->lvb_version = 0; 1841 goto out; 1842 } 1843 1844 lvb->lvb_version = OCFS2_LVB_VERSION; 1845 lvb->lvb_isize = cpu_to_be64(i_size_read(inode)); 1846 lvb->lvb_iclusters = cpu_to_be32(oi->ip_clusters); 1847 lvb->lvb_iuid = cpu_to_be32(inode->i_uid); 1848 lvb->lvb_igid = cpu_to_be32(inode->i_gid); 1849 lvb->lvb_imode = cpu_to_be16(inode->i_mode); 1850 lvb->lvb_inlink = cpu_to_be16(inode->i_nlink); 1851 lvb->lvb_iatime_packed = 1852 cpu_to_be64(ocfs2_pack_timespec(&inode->i_atime)); 1853 lvb->lvb_ictime_packed = 1854 cpu_to_be64(ocfs2_pack_timespec(&inode->i_ctime)); 1855 lvb->lvb_imtime_packed = 1856 cpu_to_be64(ocfs2_pack_timespec(&inode->i_mtime)); 1857 lvb->lvb_iattr = cpu_to_be32(oi->ip_attr); 1858 lvb->lvb_idynfeatures = cpu_to_be16(oi->ip_dyn_features); 1859 lvb->lvb_igeneration = cpu_to_be32(inode->i_generation); 1860 1861 out: 1862 mlog_meta_lvb(0, lockres); 1863 1864 mlog_exit_void(); 1865 } 1866 1867 static void ocfs2_unpack_timespec(struct timespec *spec, 1868 u64 packed_time) 1869 { 1870 spec->tv_sec = packed_time >> OCFS2_SEC_SHIFT; 1871 spec->tv_nsec = packed_time & OCFS2_NSEC_MASK; 1872 } 1873 1874 static void ocfs2_refresh_inode_from_lvb(struct inode *inode) 1875 { 1876 struct ocfs2_inode_info *oi = OCFS2_I(inode); 1877 struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; 1878 struct ocfs2_meta_lvb *lvb; 1879 1880 mlog_entry_void(); 1881 1882 mlog_meta_lvb(0, lockres); 1883 1884 lvb = (struct ocfs2_meta_lvb *)ocfs2_dlm_lvb(&lockres->l_lksb); 1885 1886 /* We're safe here without the lockres lock... */ 1887 spin_lock(&oi->ip_lock); 1888 oi->ip_clusters = be32_to_cpu(lvb->lvb_iclusters); 1889 i_size_write(inode, be64_to_cpu(lvb->lvb_isize)); 1890 1891 oi->ip_attr = be32_to_cpu(lvb->lvb_iattr); 1892 oi->ip_dyn_features = be16_to_cpu(lvb->lvb_idynfeatures); 1893 ocfs2_set_inode_flags(inode); 1894 1895 /* fast-symlinks are a special case */ 1896 if (S_ISLNK(inode->i_mode) && !oi->ip_clusters) 1897 inode->i_blocks = 0; 1898 else 1899 inode->i_blocks = ocfs2_inode_sector_count(inode); 1900 1901 inode->i_uid = be32_to_cpu(lvb->lvb_iuid); 1902 inode->i_gid = be32_to_cpu(lvb->lvb_igid); 1903 inode->i_mode = be16_to_cpu(lvb->lvb_imode); 1904 inode->i_nlink = be16_to_cpu(lvb->lvb_inlink); 1905 ocfs2_unpack_timespec(&inode->i_atime, 1906 be64_to_cpu(lvb->lvb_iatime_packed)); 1907 ocfs2_unpack_timespec(&inode->i_mtime, 1908 be64_to_cpu(lvb->lvb_imtime_packed)); 1909 ocfs2_unpack_timespec(&inode->i_ctime, 1910 be64_to_cpu(lvb->lvb_ictime_packed)); 1911 spin_unlock(&oi->ip_lock); 1912 1913 mlog_exit_void(); 1914 } 1915 1916 static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode, 1917 struct ocfs2_lock_res *lockres) 1918 { 1919 struct ocfs2_meta_lvb *lvb = 1920 (struct ocfs2_meta_lvb *)ocfs2_dlm_lvb(&lockres->l_lksb); 1921 1922 if (lvb->lvb_version == OCFS2_LVB_VERSION 1923 && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation) 1924 return 1; 1925 return 0; 1926 } 1927 1928 /* Determine whether a lock resource needs to be refreshed, and 1929 * arbitrate who gets to refresh it. 1930 * 1931 * 0 means no refresh needed. 1932 * 1933 * > 0 means you need to refresh this and you MUST call 1934 * ocfs2_complete_lock_res_refresh afterwards. */ 1935 static int ocfs2_should_refresh_lock_res(struct ocfs2_lock_res *lockres) 1936 { 1937 unsigned long flags; 1938 int status = 0; 1939 1940 mlog_entry_void(); 1941 1942 refresh_check: 1943 spin_lock_irqsave(&lockres->l_lock, flags); 1944 if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) { 1945 spin_unlock_irqrestore(&lockres->l_lock, flags); 1946 goto bail; 1947 } 1948 1949 if (lockres->l_flags & OCFS2_LOCK_REFRESHING) { 1950 spin_unlock_irqrestore(&lockres->l_lock, flags); 1951 1952 ocfs2_wait_on_refreshing_lock(lockres); 1953 goto refresh_check; 1954 } 1955 1956 /* Ok, I'll be the one to refresh this lock. */ 1957 lockres_or_flags(lockres, OCFS2_LOCK_REFRESHING); 1958 spin_unlock_irqrestore(&lockres->l_lock, flags); 1959 1960 status = 1; 1961 bail: 1962 mlog_exit(status); 1963 return status; 1964 } 1965 1966 /* If status is non zero, I'll mark it as not being in refresh 1967 * anymroe, but i won't clear the needs refresh flag. */ 1968 static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockres, 1969 int status) 1970 { 1971 unsigned long flags; 1972 mlog_entry_void(); 1973 1974 spin_lock_irqsave(&lockres->l_lock, flags); 1975 lockres_clear_flags(lockres, OCFS2_LOCK_REFRESHING); 1976 if (!status) 1977 lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 1978 spin_unlock_irqrestore(&lockres->l_lock, flags); 1979 1980 wake_up(&lockres->l_event); 1981 1982 mlog_exit_void(); 1983 } 1984 1985 /* may or may not return a bh if it went to disk. */ 1986 static int ocfs2_inode_lock_update(struct inode *inode, 1987 struct buffer_head **bh) 1988 { 1989 int status = 0; 1990 struct ocfs2_inode_info *oi = OCFS2_I(inode); 1991 struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; 1992 struct ocfs2_dinode *fe; 1993 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1994 1995 mlog_entry_void(); 1996 1997 if (ocfs2_mount_local(osb)) 1998 goto bail; 1999 2000 spin_lock(&oi->ip_lock); 2001 if (oi->ip_flags & OCFS2_INODE_DELETED) { 2002 mlog(0, "Orphaned inode %llu was deleted while we " 2003 "were waiting on a lock. ip_flags = 0x%x\n", 2004 (unsigned long long)oi->ip_blkno, oi->ip_flags); 2005 spin_unlock(&oi->ip_lock); 2006 status = -ENOENT; 2007 goto bail; 2008 } 2009 spin_unlock(&oi->ip_lock); 2010 2011 if (!ocfs2_should_refresh_lock_res(lockres)) 2012 goto bail; 2013 2014 /* This will discard any caching information we might have had 2015 * for the inode metadata. */ 2016 ocfs2_metadata_cache_purge(inode); 2017 2018 ocfs2_extent_map_trunc(inode, 0); 2019 2020 if (ocfs2_meta_lvb_is_trustable(inode, lockres)) { 2021 mlog(0, "Trusting LVB on inode %llu\n", 2022 (unsigned long long)oi->ip_blkno); 2023 ocfs2_refresh_inode_from_lvb(inode); 2024 } else { 2025 /* Boo, we have to go to disk. */ 2026 /* read bh, cast, ocfs2_refresh_inode */ 2027 status = ocfs2_read_block(OCFS2_SB(inode->i_sb), oi->ip_blkno, 2028 bh, OCFS2_BH_CACHED, inode); 2029 if (status < 0) { 2030 mlog_errno(status); 2031 goto bail_refresh; 2032 } 2033 fe = (struct ocfs2_dinode *) (*bh)->b_data; 2034 2035 /* This is a good chance to make sure we're not 2036 * locking an invalid object. 2037 * 2038 * We bug on a stale inode here because we checked 2039 * above whether it was wiped from disk. The wiping 2040 * node provides a guarantee that we receive that 2041 * message and can mark the inode before dropping any 2042 * locks associated with it. */ 2043 if (!OCFS2_IS_VALID_DINODE(fe)) { 2044 OCFS2_RO_ON_INVALID_DINODE(inode->i_sb, fe); 2045 status = -EIO; 2046 goto bail_refresh; 2047 } 2048 mlog_bug_on_msg(inode->i_generation != 2049 le32_to_cpu(fe->i_generation), 2050 "Invalid dinode %llu disk generation: %u " 2051 "inode->i_generation: %u\n", 2052 (unsigned long long)oi->ip_blkno, 2053 le32_to_cpu(fe->i_generation), 2054 inode->i_generation); 2055 mlog_bug_on_msg(le64_to_cpu(fe->i_dtime) || 2056 !(fe->i_flags & cpu_to_le32(OCFS2_VALID_FL)), 2057 "Stale dinode %llu dtime: %llu flags: 0x%x\n", 2058 (unsigned long long)oi->ip_blkno, 2059 (unsigned long long)le64_to_cpu(fe->i_dtime), 2060 le32_to_cpu(fe->i_flags)); 2061 2062 ocfs2_refresh_inode(inode, fe); 2063 ocfs2_track_lock_refresh(lockres); 2064 } 2065 2066 status = 0; 2067 bail_refresh: 2068 ocfs2_complete_lock_res_refresh(lockres, status); 2069 bail: 2070 mlog_exit(status); 2071 return status; 2072 } 2073 2074 static int ocfs2_assign_bh(struct inode *inode, 2075 struct buffer_head **ret_bh, 2076 struct buffer_head *passed_bh) 2077 { 2078 int status; 2079 2080 if (passed_bh) { 2081 /* Ok, the update went to disk for us, use the 2082 * returned bh. */ 2083 *ret_bh = passed_bh; 2084 get_bh(*ret_bh); 2085 2086 return 0; 2087 } 2088 2089 status = ocfs2_read_block(OCFS2_SB(inode->i_sb), 2090 OCFS2_I(inode)->ip_blkno, 2091 ret_bh, 2092 OCFS2_BH_CACHED, 2093 inode); 2094 if (status < 0) 2095 mlog_errno(status); 2096 2097 return status; 2098 } 2099 2100 /* 2101 * returns < 0 error if the callback will never be called, otherwise 2102 * the result of the lock will be communicated via the callback. 2103 */ 2104 int ocfs2_inode_lock_full(struct inode *inode, 2105 struct buffer_head **ret_bh, 2106 int ex, 2107 int arg_flags) 2108 { 2109 int status, level, acquired; 2110 u32 dlm_flags; 2111 struct ocfs2_lock_res *lockres = NULL; 2112 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 2113 struct buffer_head *local_bh = NULL; 2114 2115 BUG_ON(!inode); 2116 2117 mlog_entry_void(); 2118 2119 mlog(0, "inode %llu, take %s META lock\n", 2120 (unsigned long long)OCFS2_I(inode)->ip_blkno, 2121 ex ? "EXMODE" : "PRMODE"); 2122 2123 status = 0; 2124 acquired = 0; 2125 /* We'll allow faking a readonly metadata lock for 2126 * rodevices. */ 2127 if (ocfs2_is_hard_readonly(osb)) { 2128 if (ex) 2129 status = -EROFS; 2130 goto bail; 2131 } 2132 2133 if (ocfs2_mount_local(osb)) 2134 goto local; 2135 2136 if (!(arg_flags & OCFS2_META_LOCK_RECOVERY)) 2137 ocfs2_wait_for_recovery(osb); 2138 2139 lockres = &OCFS2_I(inode)->ip_inode_lockres; 2140 level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2141 dlm_flags = 0; 2142 if (arg_flags & OCFS2_META_LOCK_NOQUEUE) 2143 dlm_flags |= DLM_LKF_NOQUEUE; 2144 2145 status = ocfs2_cluster_lock(osb, lockres, level, dlm_flags, arg_flags); 2146 if (status < 0) { 2147 if (status != -EAGAIN && status != -EIOCBRETRY) 2148 mlog_errno(status); 2149 goto bail; 2150 } 2151 2152 /* Notify the error cleanup path to drop the cluster lock. */ 2153 acquired = 1; 2154 2155 /* We wait twice because a node may have died while we were in 2156 * the lower dlm layers. The second time though, we've 2157 * committed to owning this lock so we don't allow signals to 2158 * abort the operation. */ 2159 if (!(arg_flags & OCFS2_META_LOCK_RECOVERY)) 2160 ocfs2_wait_for_recovery(osb); 2161 2162 local: 2163 /* 2164 * We only see this flag if we're being called from 2165 * ocfs2_read_locked_inode(). It means we're locking an inode 2166 * which hasn't been populated yet, so clear the refresh flag 2167 * and let the caller handle it. 2168 */ 2169 if (inode->i_state & I_NEW) { 2170 status = 0; 2171 if (lockres) 2172 ocfs2_complete_lock_res_refresh(lockres, 0); 2173 goto bail; 2174 } 2175 2176 /* This is fun. The caller may want a bh back, or it may 2177 * not. ocfs2_inode_lock_update definitely wants one in, but 2178 * may or may not read one, depending on what's in the 2179 * LVB. The result of all of this is that we've *only* gone to 2180 * disk if we have to, so the complexity is worthwhile. */ 2181 status = ocfs2_inode_lock_update(inode, &local_bh); 2182 if (status < 0) { 2183 if (status != -ENOENT) 2184 mlog_errno(status); 2185 goto bail; 2186 } 2187 2188 if (ret_bh) { 2189 status = ocfs2_assign_bh(inode, ret_bh, local_bh); 2190 if (status < 0) { 2191 mlog_errno(status); 2192 goto bail; 2193 } 2194 } 2195 2196 bail: 2197 if (status < 0) { 2198 if (ret_bh && (*ret_bh)) { 2199 brelse(*ret_bh); 2200 *ret_bh = NULL; 2201 } 2202 if (acquired) 2203 ocfs2_inode_unlock(inode, ex); 2204 } 2205 2206 if (local_bh) 2207 brelse(local_bh); 2208 2209 mlog_exit(status); 2210 return status; 2211 } 2212 2213 /* 2214 * This is working around a lock inversion between tasks acquiring DLM 2215 * locks while holding a page lock and the downconvert thread which 2216 * blocks dlm lock acquiry while acquiring page locks. 2217 * 2218 * ** These _with_page variantes are only intended to be called from aop 2219 * methods that hold page locks and return a very specific *positive* error 2220 * code that aop methods pass up to the VFS -- test for errors with != 0. ** 2221 * 2222 * The DLM is called such that it returns -EAGAIN if it would have 2223 * blocked waiting for the downconvert thread. In that case we unlock 2224 * our page so the downconvert thread can make progress. Once we've 2225 * done this we have to return AOP_TRUNCATED_PAGE so the aop method 2226 * that called us can bubble that back up into the VFS who will then 2227 * immediately retry the aop call. 2228 * 2229 * We do a blocking lock and immediate unlock before returning, though, so that 2230 * the lock has a great chance of being cached on this node by the time the VFS 2231 * calls back to retry the aop. This has a potential to livelock as nodes 2232 * ping locks back and forth, but that's a risk we're willing to take to avoid 2233 * the lock inversion simply. 2234 */ 2235 int ocfs2_inode_lock_with_page(struct inode *inode, 2236 struct buffer_head **ret_bh, 2237 int ex, 2238 struct page *page) 2239 { 2240 int ret; 2241 2242 ret = ocfs2_inode_lock_full(inode, ret_bh, ex, OCFS2_LOCK_NONBLOCK); 2243 if (ret == -EAGAIN) { 2244 unlock_page(page); 2245 if (ocfs2_inode_lock(inode, ret_bh, ex) == 0) 2246 ocfs2_inode_unlock(inode, ex); 2247 ret = AOP_TRUNCATED_PAGE; 2248 } 2249 2250 return ret; 2251 } 2252 2253 int ocfs2_inode_lock_atime(struct inode *inode, 2254 struct vfsmount *vfsmnt, 2255 int *level) 2256 { 2257 int ret; 2258 2259 mlog_entry_void(); 2260 ret = ocfs2_inode_lock(inode, NULL, 0); 2261 if (ret < 0) { 2262 mlog_errno(ret); 2263 return ret; 2264 } 2265 2266 /* 2267 * If we should update atime, we will get EX lock, 2268 * otherwise we just get PR lock. 2269 */ 2270 if (ocfs2_should_update_atime(inode, vfsmnt)) { 2271 struct buffer_head *bh = NULL; 2272 2273 ocfs2_inode_unlock(inode, 0); 2274 ret = ocfs2_inode_lock(inode, &bh, 1); 2275 if (ret < 0) { 2276 mlog_errno(ret); 2277 return ret; 2278 } 2279 *level = 1; 2280 if (ocfs2_should_update_atime(inode, vfsmnt)) 2281 ocfs2_update_inode_atime(inode, bh); 2282 if (bh) 2283 brelse(bh); 2284 } else 2285 *level = 0; 2286 2287 mlog_exit(ret); 2288 return ret; 2289 } 2290 2291 void ocfs2_inode_unlock(struct inode *inode, 2292 int ex) 2293 { 2294 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2295 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_inode_lockres; 2296 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 2297 2298 mlog_entry_void(); 2299 2300 mlog(0, "inode %llu drop %s META lock\n", 2301 (unsigned long long)OCFS2_I(inode)->ip_blkno, 2302 ex ? "EXMODE" : "PRMODE"); 2303 2304 if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)) && 2305 !ocfs2_mount_local(osb)) 2306 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level); 2307 2308 mlog_exit_void(); 2309 } 2310 2311 int ocfs2_super_lock(struct ocfs2_super *osb, 2312 int ex) 2313 { 2314 int status = 0; 2315 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2316 struct ocfs2_lock_res *lockres = &osb->osb_super_lockres; 2317 2318 mlog_entry_void(); 2319 2320 if (ocfs2_is_hard_readonly(osb)) 2321 return -EROFS; 2322 2323 if (ocfs2_mount_local(osb)) 2324 goto bail; 2325 2326 status = ocfs2_cluster_lock(osb, lockres, level, 0, 0); 2327 if (status < 0) { 2328 mlog_errno(status); 2329 goto bail; 2330 } 2331 2332 /* The super block lock path is really in the best position to 2333 * know when resources covered by the lock need to be 2334 * refreshed, so we do it here. Of course, making sense of 2335 * everything is up to the caller :) */ 2336 status = ocfs2_should_refresh_lock_res(lockres); 2337 if (status < 0) { 2338 mlog_errno(status); 2339 goto bail; 2340 } 2341 if (status) { 2342 status = ocfs2_refresh_slot_info(osb); 2343 2344 ocfs2_complete_lock_res_refresh(lockres, status); 2345 2346 if (status < 0) 2347 mlog_errno(status); 2348 ocfs2_track_lock_refresh(lockres); 2349 } 2350 bail: 2351 mlog_exit(status); 2352 return status; 2353 } 2354 2355 void ocfs2_super_unlock(struct ocfs2_super *osb, 2356 int ex) 2357 { 2358 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2359 struct ocfs2_lock_res *lockres = &osb->osb_super_lockres; 2360 2361 if (!ocfs2_mount_local(osb)) 2362 ocfs2_cluster_unlock(osb, lockres, level); 2363 } 2364 2365 int ocfs2_rename_lock(struct ocfs2_super *osb) 2366 { 2367 int status; 2368 struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres; 2369 2370 if (ocfs2_is_hard_readonly(osb)) 2371 return -EROFS; 2372 2373 if (ocfs2_mount_local(osb)) 2374 return 0; 2375 2376 status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0); 2377 if (status < 0) 2378 mlog_errno(status); 2379 2380 return status; 2381 } 2382 2383 void ocfs2_rename_unlock(struct ocfs2_super *osb) 2384 { 2385 struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres; 2386 2387 if (!ocfs2_mount_local(osb)) 2388 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX); 2389 } 2390 2391 int ocfs2_dentry_lock(struct dentry *dentry, int ex) 2392 { 2393 int ret; 2394 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2395 struct ocfs2_dentry_lock *dl = dentry->d_fsdata; 2396 struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb); 2397 2398 BUG_ON(!dl); 2399 2400 if (ocfs2_is_hard_readonly(osb)) 2401 return -EROFS; 2402 2403 if (ocfs2_mount_local(osb)) 2404 return 0; 2405 2406 ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0); 2407 if (ret < 0) 2408 mlog_errno(ret); 2409 2410 return ret; 2411 } 2412 2413 void ocfs2_dentry_unlock(struct dentry *dentry, int ex) 2414 { 2415 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2416 struct ocfs2_dentry_lock *dl = dentry->d_fsdata; 2417 struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb); 2418 2419 if (!ocfs2_mount_local(osb)) 2420 ocfs2_cluster_unlock(osb, &dl->dl_lockres, level); 2421 } 2422 2423 /* Reference counting of the dlm debug structure. We want this because 2424 * open references on the debug inodes can live on after a mount, so 2425 * we can't rely on the ocfs2_super to always exist. */ 2426 static void ocfs2_dlm_debug_free(struct kref *kref) 2427 { 2428 struct ocfs2_dlm_debug *dlm_debug; 2429 2430 dlm_debug = container_of(kref, struct ocfs2_dlm_debug, d_refcnt); 2431 2432 kfree(dlm_debug); 2433 } 2434 2435 void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug) 2436 { 2437 if (dlm_debug) 2438 kref_put(&dlm_debug->d_refcnt, ocfs2_dlm_debug_free); 2439 } 2440 2441 static void ocfs2_get_dlm_debug(struct ocfs2_dlm_debug *debug) 2442 { 2443 kref_get(&debug->d_refcnt); 2444 } 2445 2446 struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void) 2447 { 2448 struct ocfs2_dlm_debug *dlm_debug; 2449 2450 dlm_debug = kmalloc(sizeof(struct ocfs2_dlm_debug), GFP_KERNEL); 2451 if (!dlm_debug) { 2452 mlog_errno(-ENOMEM); 2453 goto out; 2454 } 2455 2456 kref_init(&dlm_debug->d_refcnt); 2457 INIT_LIST_HEAD(&dlm_debug->d_lockres_tracking); 2458 dlm_debug->d_locking_state = NULL; 2459 out: 2460 return dlm_debug; 2461 } 2462 2463 /* Access to this is arbitrated for us via seq_file->sem. */ 2464 struct ocfs2_dlm_seq_priv { 2465 struct ocfs2_dlm_debug *p_dlm_debug; 2466 struct ocfs2_lock_res p_iter_res; 2467 struct ocfs2_lock_res p_tmp_res; 2468 }; 2469 2470 static struct ocfs2_lock_res *ocfs2_dlm_next_res(struct ocfs2_lock_res *start, 2471 struct ocfs2_dlm_seq_priv *priv) 2472 { 2473 struct ocfs2_lock_res *iter, *ret = NULL; 2474 struct ocfs2_dlm_debug *dlm_debug = priv->p_dlm_debug; 2475 2476 assert_spin_locked(&ocfs2_dlm_tracking_lock); 2477 2478 list_for_each_entry(iter, &start->l_debug_list, l_debug_list) { 2479 /* discover the head of the list */ 2480 if (&iter->l_debug_list == &dlm_debug->d_lockres_tracking) { 2481 mlog(0, "End of list found, %p\n", ret); 2482 break; 2483 } 2484 2485 /* We track our "dummy" iteration lockres' by a NULL 2486 * l_ops field. */ 2487 if (iter->l_ops != NULL) { 2488 ret = iter; 2489 break; 2490 } 2491 } 2492 2493 return ret; 2494 } 2495 2496 static void *ocfs2_dlm_seq_start(struct seq_file *m, loff_t *pos) 2497 { 2498 struct ocfs2_dlm_seq_priv *priv = m->private; 2499 struct ocfs2_lock_res *iter; 2500 2501 spin_lock(&ocfs2_dlm_tracking_lock); 2502 iter = ocfs2_dlm_next_res(&priv->p_iter_res, priv); 2503 if (iter) { 2504 /* Since lockres' have the lifetime of their container 2505 * (which can be inodes, ocfs2_supers, etc) we want to 2506 * copy this out to a temporary lockres while still 2507 * under the spinlock. Obviously after this we can't 2508 * trust any pointers on the copy returned, but that's 2509 * ok as the information we want isn't typically held 2510 * in them. */ 2511 priv->p_tmp_res = *iter; 2512 iter = &priv->p_tmp_res; 2513 } 2514 spin_unlock(&ocfs2_dlm_tracking_lock); 2515 2516 return iter; 2517 } 2518 2519 static void ocfs2_dlm_seq_stop(struct seq_file *m, void *v) 2520 { 2521 } 2522 2523 static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos) 2524 { 2525 struct ocfs2_dlm_seq_priv *priv = m->private; 2526 struct ocfs2_lock_res *iter = v; 2527 struct ocfs2_lock_res *dummy = &priv->p_iter_res; 2528 2529 spin_lock(&ocfs2_dlm_tracking_lock); 2530 iter = ocfs2_dlm_next_res(iter, priv); 2531 list_del_init(&dummy->l_debug_list); 2532 if (iter) { 2533 list_add(&dummy->l_debug_list, &iter->l_debug_list); 2534 priv->p_tmp_res = *iter; 2535 iter = &priv->p_tmp_res; 2536 } 2537 spin_unlock(&ocfs2_dlm_tracking_lock); 2538 2539 return iter; 2540 } 2541 2542 /* So that debugfs.ocfs2 can determine which format is being used */ 2543 #define OCFS2_DLM_DEBUG_STR_VERSION 2 2544 static int ocfs2_dlm_seq_show(struct seq_file *m, void *v) 2545 { 2546 int i; 2547 char *lvb; 2548 struct ocfs2_lock_res *lockres = v; 2549 2550 if (!lockres) 2551 return -EINVAL; 2552 2553 seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION); 2554 2555 if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY) 2556 seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1, 2557 lockres->l_name, 2558 (unsigned int)ocfs2_get_dentry_lock_ino(lockres)); 2559 else 2560 seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name); 2561 2562 seq_printf(m, "%d\t" 2563 "0x%lx\t" 2564 "0x%x\t" 2565 "0x%x\t" 2566 "%u\t" 2567 "%u\t" 2568 "%d\t" 2569 "%d\t", 2570 lockres->l_level, 2571 lockres->l_flags, 2572 lockres->l_action, 2573 lockres->l_unlock_action, 2574 lockres->l_ro_holders, 2575 lockres->l_ex_holders, 2576 lockres->l_requested, 2577 lockres->l_blocking); 2578 2579 /* Dump the raw LVB */ 2580 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2581 for(i = 0; i < DLM_LVB_LEN; i++) 2582 seq_printf(m, "0x%x\t", lvb[i]); 2583 2584 #ifdef CONFIG_OCFS2_FS_STATS 2585 # define lock_num_prmode(_l) (_l)->l_lock_num_prmode 2586 # define lock_num_exmode(_l) (_l)->l_lock_num_exmode 2587 # define lock_num_prmode_failed(_l) (_l)->l_lock_num_prmode_failed 2588 # define lock_num_exmode_failed(_l) (_l)->l_lock_num_exmode_failed 2589 # define lock_total_prmode(_l) (_l)->l_lock_total_prmode 2590 # define lock_total_exmode(_l) (_l)->l_lock_total_exmode 2591 # define lock_max_prmode(_l) (_l)->l_lock_max_prmode 2592 # define lock_max_exmode(_l) (_l)->l_lock_max_exmode 2593 # define lock_refresh(_l) (_l)->l_lock_refresh 2594 #else 2595 # define lock_num_prmode(_l) (0ULL) 2596 # define lock_num_exmode(_l) (0ULL) 2597 # define lock_num_prmode_failed(_l) (0) 2598 # define lock_num_exmode_failed(_l) (0) 2599 # define lock_total_prmode(_l) (0ULL) 2600 # define lock_total_exmode(_l) (0ULL) 2601 # define lock_max_prmode(_l) (0) 2602 # define lock_max_exmode(_l) (0) 2603 # define lock_refresh(_l) (0) 2604 #endif 2605 /* The following seq_print was added in version 2 of this output */ 2606 seq_printf(m, "%llu\t" 2607 "%llu\t" 2608 "%u\t" 2609 "%u\t" 2610 "%llu\t" 2611 "%llu\t" 2612 "%u\t" 2613 "%u\t" 2614 "%u\t", 2615 lock_num_prmode(lockres), 2616 lock_num_exmode(lockres), 2617 lock_num_prmode_failed(lockres), 2618 lock_num_exmode_failed(lockres), 2619 lock_total_prmode(lockres), 2620 lock_total_exmode(lockres), 2621 lock_max_prmode(lockres), 2622 lock_max_exmode(lockres), 2623 lock_refresh(lockres)); 2624 2625 /* End the line */ 2626 seq_printf(m, "\n"); 2627 return 0; 2628 } 2629 2630 static const struct seq_operations ocfs2_dlm_seq_ops = { 2631 .start = ocfs2_dlm_seq_start, 2632 .stop = ocfs2_dlm_seq_stop, 2633 .next = ocfs2_dlm_seq_next, 2634 .show = ocfs2_dlm_seq_show, 2635 }; 2636 2637 static int ocfs2_dlm_debug_release(struct inode *inode, struct file *file) 2638 { 2639 struct seq_file *seq = (struct seq_file *) file->private_data; 2640 struct ocfs2_dlm_seq_priv *priv = seq->private; 2641 struct ocfs2_lock_res *res = &priv->p_iter_res; 2642 2643 ocfs2_remove_lockres_tracking(res); 2644 ocfs2_put_dlm_debug(priv->p_dlm_debug); 2645 return seq_release_private(inode, file); 2646 } 2647 2648 static int ocfs2_dlm_debug_open(struct inode *inode, struct file *file) 2649 { 2650 int ret; 2651 struct ocfs2_dlm_seq_priv *priv; 2652 struct seq_file *seq; 2653 struct ocfs2_super *osb; 2654 2655 priv = kzalloc(sizeof(struct ocfs2_dlm_seq_priv), GFP_KERNEL); 2656 if (!priv) { 2657 ret = -ENOMEM; 2658 mlog_errno(ret); 2659 goto out; 2660 } 2661 osb = inode->i_private; 2662 ocfs2_get_dlm_debug(osb->osb_dlm_debug); 2663 priv->p_dlm_debug = osb->osb_dlm_debug; 2664 INIT_LIST_HEAD(&priv->p_iter_res.l_debug_list); 2665 2666 ret = seq_open(file, &ocfs2_dlm_seq_ops); 2667 if (ret) { 2668 kfree(priv); 2669 mlog_errno(ret); 2670 goto out; 2671 } 2672 2673 seq = (struct seq_file *) file->private_data; 2674 seq->private = priv; 2675 2676 ocfs2_add_lockres_tracking(&priv->p_iter_res, 2677 priv->p_dlm_debug); 2678 2679 out: 2680 return ret; 2681 } 2682 2683 static const struct file_operations ocfs2_dlm_debug_fops = { 2684 .open = ocfs2_dlm_debug_open, 2685 .release = ocfs2_dlm_debug_release, 2686 .read = seq_read, 2687 .llseek = seq_lseek, 2688 }; 2689 2690 static int ocfs2_dlm_init_debug(struct ocfs2_super *osb) 2691 { 2692 int ret = 0; 2693 struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug; 2694 2695 dlm_debug->d_locking_state = debugfs_create_file("locking_state", 2696 S_IFREG|S_IRUSR, 2697 osb->osb_debug_root, 2698 osb, 2699 &ocfs2_dlm_debug_fops); 2700 if (!dlm_debug->d_locking_state) { 2701 ret = -EINVAL; 2702 mlog(ML_ERROR, 2703 "Unable to create locking state debugfs file.\n"); 2704 goto out; 2705 } 2706 2707 ocfs2_get_dlm_debug(dlm_debug); 2708 out: 2709 return ret; 2710 } 2711 2712 static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb) 2713 { 2714 struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug; 2715 2716 if (dlm_debug) { 2717 debugfs_remove(dlm_debug->d_locking_state); 2718 ocfs2_put_dlm_debug(dlm_debug); 2719 } 2720 } 2721 2722 int ocfs2_dlm_init(struct ocfs2_super *osb) 2723 { 2724 int status = 0; 2725 struct ocfs2_cluster_connection *conn = NULL; 2726 2727 mlog_entry_void(); 2728 2729 if (ocfs2_mount_local(osb)) { 2730 osb->node_num = 0; 2731 goto local; 2732 } 2733 2734 status = ocfs2_dlm_init_debug(osb); 2735 if (status < 0) { 2736 mlog_errno(status); 2737 goto bail; 2738 } 2739 2740 /* launch downconvert thread */ 2741 osb->dc_task = kthread_run(ocfs2_downconvert_thread, osb, "ocfs2dc"); 2742 if (IS_ERR(osb->dc_task)) { 2743 status = PTR_ERR(osb->dc_task); 2744 osb->dc_task = NULL; 2745 mlog_errno(status); 2746 goto bail; 2747 } 2748 2749 /* for now, uuid == domain */ 2750 status = ocfs2_cluster_connect(osb->osb_cluster_stack, 2751 osb->uuid_str, 2752 strlen(osb->uuid_str), 2753 ocfs2_do_node_down, osb, 2754 &conn); 2755 if (status) { 2756 mlog_errno(status); 2757 goto bail; 2758 } 2759 2760 status = ocfs2_cluster_this_node(&osb->node_num); 2761 if (status < 0) { 2762 mlog_errno(status); 2763 mlog(ML_ERROR, 2764 "could not find this host's node number\n"); 2765 ocfs2_cluster_disconnect(conn, 0); 2766 goto bail; 2767 } 2768 2769 local: 2770 ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb); 2771 ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb); 2772 2773 osb->cconn = conn; 2774 2775 status = 0; 2776 bail: 2777 if (status < 0) { 2778 ocfs2_dlm_shutdown_debug(osb); 2779 if (osb->dc_task) 2780 kthread_stop(osb->dc_task); 2781 } 2782 2783 mlog_exit(status); 2784 return status; 2785 } 2786 2787 void ocfs2_dlm_shutdown(struct ocfs2_super *osb, 2788 int hangup_pending) 2789 { 2790 mlog_entry_void(); 2791 2792 ocfs2_drop_osb_locks(osb); 2793 2794 /* 2795 * Now that we have dropped all locks and ocfs2_dismount_volume() 2796 * has disabled recovery, the DLM won't be talking to us. It's 2797 * safe to tear things down before disconnecting the cluster. 2798 */ 2799 2800 if (osb->dc_task) { 2801 kthread_stop(osb->dc_task); 2802 osb->dc_task = NULL; 2803 } 2804 2805 ocfs2_lock_res_free(&osb->osb_super_lockres); 2806 ocfs2_lock_res_free(&osb->osb_rename_lockres); 2807 2808 ocfs2_cluster_disconnect(osb->cconn, hangup_pending); 2809 osb->cconn = NULL; 2810 2811 ocfs2_dlm_shutdown_debug(osb); 2812 2813 mlog_exit_void(); 2814 } 2815 2816 static void ocfs2_unlock_ast(void *opaque, int error) 2817 { 2818 struct ocfs2_lock_res *lockres = opaque; 2819 unsigned long flags; 2820 2821 mlog_entry_void(); 2822 2823 mlog(0, "UNLOCK AST called on lock %s, action = %d\n", lockres->l_name, 2824 lockres->l_unlock_action); 2825 2826 spin_lock_irqsave(&lockres->l_lock, flags); 2827 if (error) { 2828 mlog(ML_ERROR, "Dlm passes error %d for lock %s, " 2829 "unlock_action %d\n", error, lockres->l_name, 2830 lockres->l_unlock_action); 2831 spin_unlock_irqrestore(&lockres->l_lock, flags); 2832 return; 2833 } 2834 2835 switch(lockres->l_unlock_action) { 2836 case OCFS2_UNLOCK_CANCEL_CONVERT: 2837 mlog(0, "Cancel convert success for %s\n", lockres->l_name); 2838 lockres->l_action = OCFS2_AST_INVALID; 2839 break; 2840 case OCFS2_UNLOCK_DROP_LOCK: 2841 lockres->l_level = DLM_LOCK_IV; 2842 break; 2843 default: 2844 BUG(); 2845 } 2846 2847 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 2848 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 2849 spin_unlock_irqrestore(&lockres->l_lock, flags); 2850 2851 wake_up(&lockres->l_event); 2852 2853 mlog_exit_void(); 2854 } 2855 2856 static int ocfs2_drop_lock(struct ocfs2_super *osb, 2857 struct ocfs2_lock_res *lockres) 2858 { 2859 int ret; 2860 unsigned long flags; 2861 u32 lkm_flags = 0; 2862 2863 /* We didn't get anywhere near actually using this lockres. */ 2864 if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED)) 2865 goto out; 2866 2867 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) 2868 lkm_flags |= DLM_LKF_VALBLK; 2869 2870 spin_lock_irqsave(&lockres->l_lock, flags); 2871 2872 mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_FREEING), 2873 "lockres %s, flags 0x%lx\n", 2874 lockres->l_name, lockres->l_flags); 2875 2876 while (lockres->l_flags & OCFS2_LOCK_BUSY) { 2877 mlog(0, "waiting on busy lock \"%s\": flags = %lx, action = " 2878 "%u, unlock_action = %u\n", 2879 lockres->l_name, lockres->l_flags, lockres->l_action, 2880 lockres->l_unlock_action); 2881 2882 spin_unlock_irqrestore(&lockres->l_lock, flags); 2883 2884 /* XXX: Today we just wait on any busy 2885 * locks... Perhaps we need to cancel converts in the 2886 * future? */ 2887 ocfs2_wait_on_busy_lock(lockres); 2888 2889 spin_lock_irqsave(&lockres->l_lock, flags); 2890 } 2891 2892 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) { 2893 if (lockres->l_flags & OCFS2_LOCK_ATTACHED && 2894 lockres->l_level == DLM_LOCK_EX && 2895 !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) 2896 lockres->l_ops->set_lvb(lockres); 2897 } 2898 2899 if (lockres->l_flags & OCFS2_LOCK_BUSY) 2900 mlog(ML_ERROR, "destroying busy lock: \"%s\"\n", 2901 lockres->l_name); 2902 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) 2903 mlog(0, "destroying blocked lock: \"%s\"\n", lockres->l_name); 2904 2905 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 2906 spin_unlock_irqrestore(&lockres->l_lock, flags); 2907 goto out; 2908 } 2909 2910 lockres_clear_flags(lockres, OCFS2_LOCK_ATTACHED); 2911 2912 /* make sure we never get here while waiting for an ast to 2913 * fire. */ 2914 BUG_ON(lockres->l_action != OCFS2_AST_INVALID); 2915 2916 /* is this necessary? */ 2917 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 2918 lockres->l_unlock_action = OCFS2_UNLOCK_DROP_LOCK; 2919 spin_unlock_irqrestore(&lockres->l_lock, flags); 2920 2921 mlog(0, "lock %s\n", lockres->l_name); 2922 2923 ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, lkm_flags, 2924 lockres); 2925 if (ret) { 2926 ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres); 2927 mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags); 2928 ocfs2_dlm_dump_lksb(&lockres->l_lksb); 2929 BUG(); 2930 } 2931 mlog(0, "lock %s, successfull return from ocfs2_dlm_unlock\n", 2932 lockres->l_name); 2933 2934 ocfs2_wait_on_busy_lock(lockres); 2935 out: 2936 mlog_exit(0); 2937 return 0; 2938 } 2939 2940 /* Mark the lockres as being dropped. It will no longer be 2941 * queued if blocking, but we still may have to wait on it 2942 * being dequeued from the downconvert thread before we can consider 2943 * it safe to drop. 2944 * 2945 * You can *not* attempt to call cluster_lock on this lockres anymore. */ 2946 void ocfs2_mark_lockres_freeing(struct ocfs2_lock_res *lockres) 2947 { 2948 int status; 2949 struct ocfs2_mask_waiter mw; 2950 unsigned long flags; 2951 2952 ocfs2_init_mask_waiter(&mw); 2953 2954 spin_lock_irqsave(&lockres->l_lock, flags); 2955 lockres->l_flags |= OCFS2_LOCK_FREEING; 2956 while (lockres->l_flags & OCFS2_LOCK_QUEUED) { 2957 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_QUEUED, 0); 2958 spin_unlock_irqrestore(&lockres->l_lock, flags); 2959 2960 mlog(0, "Waiting on lockres %s\n", lockres->l_name); 2961 2962 status = ocfs2_wait_for_mask(&mw); 2963 if (status) 2964 mlog_errno(status); 2965 2966 spin_lock_irqsave(&lockres->l_lock, flags); 2967 } 2968 spin_unlock_irqrestore(&lockres->l_lock, flags); 2969 } 2970 2971 void ocfs2_simple_drop_lockres(struct ocfs2_super *osb, 2972 struct ocfs2_lock_res *lockres) 2973 { 2974 int ret; 2975 2976 ocfs2_mark_lockres_freeing(lockres); 2977 ret = ocfs2_drop_lock(osb, lockres); 2978 if (ret) 2979 mlog_errno(ret); 2980 } 2981 2982 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb) 2983 { 2984 ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres); 2985 ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres); 2986 } 2987 2988 int ocfs2_drop_inode_locks(struct inode *inode) 2989 { 2990 int status, err; 2991 2992 mlog_entry_void(); 2993 2994 /* No need to call ocfs2_mark_lockres_freeing here - 2995 * ocfs2_clear_inode has done it for us. */ 2996 2997 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 2998 &OCFS2_I(inode)->ip_open_lockres); 2999 if (err < 0) 3000 mlog_errno(err); 3001 3002 status = err; 3003 3004 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 3005 &OCFS2_I(inode)->ip_inode_lockres); 3006 if (err < 0) 3007 mlog_errno(err); 3008 if (err < 0 && !status) 3009 status = err; 3010 3011 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 3012 &OCFS2_I(inode)->ip_rw_lockres); 3013 if (err < 0) 3014 mlog_errno(err); 3015 if (err < 0 && !status) 3016 status = err; 3017 3018 mlog_exit(status); 3019 return status; 3020 } 3021 3022 static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres, 3023 int new_level) 3024 { 3025 assert_spin_locked(&lockres->l_lock); 3026 3027 BUG_ON(lockres->l_blocking <= DLM_LOCK_NL); 3028 3029 if (lockres->l_level <= new_level) { 3030 mlog(ML_ERROR, "lockres->l_level (%d) <= new_level (%d)\n", 3031 lockres->l_level, new_level); 3032 BUG(); 3033 } 3034 3035 mlog(0, "lock %s, new_level = %d, l_blocking = %d\n", 3036 lockres->l_name, new_level, lockres->l_blocking); 3037 3038 lockres->l_action = OCFS2_AST_DOWNCONVERT; 3039 lockres->l_requested = new_level; 3040 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 3041 return lockres_set_pending(lockres); 3042 } 3043 3044 static int ocfs2_downconvert_lock(struct ocfs2_super *osb, 3045 struct ocfs2_lock_res *lockres, 3046 int new_level, 3047 int lvb, 3048 unsigned int generation) 3049 { 3050 int ret; 3051 u32 dlm_flags = DLM_LKF_CONVERT; 3052 3053 mlog_entry_void(); 3054 3055 if (lvb) 3056 dlm_flags |= DLM_LKF_VALBLK; 3057 3058 ret = ocfs2_dlm_lock(osb->cconn, 3059 new_level, 3060 &lockres->l_lksb, 3061 dlm_flags, 3062 lockres->l_name, 3063 OCFS2_LOCK_ID_MAX_LEN - 1, 3064 lockres); 3065 lockres_clear_pending(lockres, generation, osb); 3066 if (ret) { 3067 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 3068 ocfs2_recover_from_dlm_error(lockres, 1); 3069 goto bail; 3070 } 3071 3072 ret = 0; 3073 bail: 3074 mlog_exit(ret); 3075 return ret; 3076 } 3077 3078 /* returns 1 when the caller should unlock and call ocfs2_dlm_unlock */ 3079 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb, 3080 struct ocfs2_lock_res *lockres) 3081 { 3082 assert_spin_locked(&lockres->l_lock); 3083 3084 mlog_entry_void(); 3085 mlog(0, "lock %s\n", lockres->l_name); 3086 3087 if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) { 3088 /* If we're already trying to cancel a lock conversion 3089 * then just drop the spinlock and allow the caller to 3090 * requeue this lock. */ 3091 3092 mlog(0, "Lockres %s, skip convert\n", lockres->l_name); 3093 return 0; 3094 } 3095 3096 /* were we in a convert when we got the bast fire? */ 3097 BUG_ON(lockres->l_action != OCFS2_AST_CONVERT && 3098 lockres->l_action != OCFS2_AST_DOWNCONVERT); 3099 /* set things up for the unlockast to know to just 3100 * clear out the ast_action and unset busy, etc. */ 3101 lockres->l_unlock_action = OCFS2_UNLOCK_CANCEL_CONVERT; 3102 3103 mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_BUSY), 3104 "lock %s, invalid flags: 0x%lx\n", 3105 lockres->l_name, lockres->l_flags); 3106 3107 return 1; 3108 } 3109 3110 static int ocfs2_cancel_convert(struct ocfs2_super *osb, 3111 struct ocfs2_lock_res *lockres) 3112 { 3113 int ret; 3114 3115 mlog_entry_void(); 3116 mlog(0, "lock %s\n", lockres->l_name); 3117 3118 ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, 3119 DLM_LKF_CANCEL, lockres); 3120 if (ret) { 3121 ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres); 3122 ocfs2_recover_from_dlm_error(lockres, 0); 3123 } 3124 3125 mlog(0, "lock %s return from ocfs2_dlm_unlock\n", lockres->l_name); 3126 3127 mlog_exit(ret); 3128 return ret; 3129 } 3130 3131 static int ocfs2_unblock_lock(struct ocfs2_super *osb, 3132 struct ocfs2_lock_res *lockres, 3133 struct ocfs2_unblock_ctl *ctl) 3134 { 3135 unsigned long flags; 3136 int blocking; 3137 int new_level; 3138 int ret = 0; 3139 int set_lvb = 0; 3140 unsigned int gen; 3141 3142 mlog_entry_void(); 3143 3144 spin_lock_irqsave(&lockres->l_lock, flags); 3145 3146 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); 3147 3148 recheck: 3149 if (lockres->l_flags & OCFS2_LOCK_BUSY) { 3150 /* XXX 3151 * This is a *big* race. The OCFS2_LOCK_PENDING flag 3152 * exists entirely for one reason - another thread has set 3153 * OCFS2_LOCK_BUSY, but has *NOT* yet called dlm_lock(). 3154 * 3155 * If we do ocfs2_cancel_convert() before the other thread 3156 * calls dlm_lock(), our cancel will do nothing. We will 3157 * get no ast, and we will have no way of knowing the 3158 * cancel failed. Meanwhile, the other thread will call 3159 * into dlm_lock() and wait...forever. 3160 * 3161 * Why forever? Because another node has asked for the 3162 * lock first; that's why we're here in unblock_lock(). 3163 * 3164 * The solution is OCFS2_LOCK_PENDING. When PENDING is 3165 * set, we just requeue the unblock. Only when the other 3166 * thread has called dlm_lock() and cleared PENDING will 3167 * we then cancel their request. 3168 * 3169 * All callers of dlm_lock() must set OCFS2_DLM_PENDING 3170 * at the same time they set OCFS2_DLM_BUSY. They must 3171 * clear OCFS2_DLM_PENDING after dlm_lock() returns. 3172 */ 3173 if (lockres->l_flags & OCFS2_LOCK_PENDING) 3174 goto leave_requeue; 3175 3176 ctl->requeue = 1; 3177 ret = ocfs2_prepare_cancel_convert(osb, lockres); 3178 spin_unlock_irqrestore(&lockres->l_lock, flags); 3179 if (ret) { 3180 ret = ocfs2_cancel_convert(osb, lockres); 3181 if (ret < 0) 3182 mlog_errno(ret); 3183 } 3184 goto leave; 3185 } 3186 3187 /* if we're blocking an exclusive and we have *any* holders, 3188 * then requeue. */ 3189 if ((lockres->l_blocking == DLM_LOCK_EX) 3190 && (lockres->l_ex_holders || lockres->l_ro_holders)) 3191 goto leave_requeue; 3192 3193 /* If it's a PR we're blocking, then only 3194 * requeue if we've got any EX holders */ 3195 if (lockres->l_blocking == DLM_LOCK_PR && 3196 lockres->l_ex_holders) 3197 goto leave_requeue; 3198 3199 /* 3200 * Can we get a lock in this state if the holder counts are 3201 * zero? The meta data unblock code used to check this. 3202 */ 3203 if ((lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 3204 && (lockres->l_flags & OCFS2_LOCK_REFRESHING)) 3205 goto leave_requeue; 3206 3207 new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking); 3208 3209 if (lockres->l_ops->check_downconvert 3210 && !lockres->l_ops->check_downconvert(lockres, new_level)) 3211 goto leave_requeue; 3212 3213 /* If we get here, then we know that there are no more 3214 * incompatible holders (and anyone asking for an incompatible 3215 * lock is blocked). We can now downconvert the lock */ 3216 if (!lockres->l_ops->downconvert_worker) 3217 goto downconvert; 3218 3219 /* Some lockres types want to do a bit of work before 3220 * downconverting a lock. Allow that here. The worker function 3221 * may sleep, so we save off a copy of what we're blocking as 3222 * it may change while we're not holding the spin lock. */ 3223 blocking = lockres->l_blocking; 3224 spin_unlock_irqrestore(&lockres->l_lock, flags); 3225 3226 ctl->unblock_action = lockres->l_ops->downconvert_worker(lockres, blocking); 3227 3228 if (ctl->unblock_action == UNBLOCK_STOP_POST) 3229 goto leave; 3230 3231 spin_lock_irqsave(&lockres->l_lock, flags); 3232 if (blocking != lockres->l_blocking) { 3233 /* If this changed underneath us, then we can't drop 3234 * it just yet. */ 3235 goto recheck; 3236 } 3237 3238 downconvert: 3239 ctl->requeue = 0; 3240 3241 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) { 3242 if (lockres->l_level == DLM_LOCK_EX) 3243 set_lvb = 1; 3244 3245 /* 3246 * We only set the lvb if the lock has been fully 3247 * refreshed - otherwise we risk setting stale 3248 * data. Otherwise, there's no need to actually clear 3249 * out the lvb here as it's value is still valid. 3250 */ 3251 if (set_lvb && !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) 3252 lockres->l_ops->set_lvb(lockres); 3253 } 3254 3255 gen = ocfs2_prepare_downconvert(lockres, new_level); 3256 spin_unlock_irqrestore(&lockres->l_lock, flags); 3257 ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb, 3258 gen); 3259 3260 leave: 3261 mlog_exit(ret); 3262 return ret; 3263 3264 leave_requeue: 3265 spin_unlock_irqrestore(&lockres->l_lock, flags); 3266 ctl->requeue = 1; 3267 3268 mlog_exit(0); 3269 return 0; 3270 } 3271 3272 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres, 3273 int blocking) 3274 { 3275 struct inode *inode; 3276 struct address_space *mapping; 3277 3278 inode = ocfs2_lock_res_inode(lockres); 3279 mapping = inode->i_mapping; 3280 3281 if (!S_ISREG(inode->i_mode)) 3282 goto out; 3283 3284 /* 3285 * We need this before the filemap_fdatawrite() so that it can 3286 * transfer the dirty bit from the PTE to the 3287 * page. Unfortunately this means that even for EX->PR 3288 * downconverts, we'll lose our mappings and have to build 3289 * them up again. 3290 */ 3291 unmap_mapping_range(mapping, 0, 0, 0); 3292 3293 if (filemap_fdatawrite(mapping)) { 3294 mlog(ML_ERROR, "Could not sync inode %llu for downconvert!", 3295 (unsigned long long)OCFS2_I(inode)->ip_blkno); 3296 } 3297 sync_mapping_buffers(mapping); 3298 if (blocking == DLM_LOCK_EX) { 3299 truncate_inode_pages(mapping, 0); 3300 } else { 3301 /* We only need to wait on the I/O if we're not also 3302 * truncating pages because truncate_inode_pages waits 3303 * for us above. We don't truncate pages if we're 3304 * blocking anything < EXMODE because we want to keep 3305 * them around in that case. */ 3306 filemap_fdatawait(mapping); 3307 } 3308 3309 out: 3310 return UNBLOCK_CONTINUE; 3311 } 3312 3313 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres, 3314 int new_level) 3315 { 3316 struct inode *inode = ocfs2_lock_res_inode(lockres); 3317 int checkpointed = ocfs2_inode_fully_checkpointed(inode); 3318 3319 BUG_ON(new_level != DLM_LOCK_NL && new_level != DLM_LOCK_PR); 3320 BUG_ON(lockres->l_level != DLM_LOCK_EX && !checkpointed); 3321 3322 if (checkpointed) 3323 return 1; 3324 3325 ocfs2_start_checkpoint(OCFS2_SB(inode->i_sb)); 3326 return 0; 3327 } 3328 3329 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres) 3330 { 3331 struct inode *inode = ocfs2_lock_res_inode(lockres); 3332 3333 __ocfs2_stuff_meta_lvb(inode); 3334 } 3335 3336 /* 3337 * Does the final reference drop on our dentry lock. Right now this 3338 * happens in the downconvert thread, but we could choose to simplify the 3339 * dlmglue API and push these off to the ocfs2_wq in the future. 3340 */ 3341 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb, 3342 struct ocfs2_lock_res *lockres) 3343 { 3344 struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres); 3345 ocfs2_dentry_lock_put(osb, dl); 3346 } 3347 3348 /* 3349 * d_delete() matching dentries before the lock downconvert. 3350 * 3351 * At this point, any process waiting to destroy the 3352 * dentry_lock due to last ref count is stopped by the 3353 * OCFS2_LOCK_QUEUED flag. 3354 * 3355 * We have two potential problems 3356 * 3357 * 1) If we do the last reference drop on our dentry_lock (via dput) 3358 * we'll wind up in ocfs2_release_dentry_lock(), waiting on 3359 * the downconvert to finish. Instead we take an elevated 3360 * reference and push the drop until after we've completed our 3361 * unblock processing. 3362 * 3363 * 2) There might be another process with a final reference, 3364 * waiting on us to finish processing. If this is the case, we 3365 * detect it and exit out - there's no more dentries anyway. 3366 */ 3367 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres, 3368 int blocking) 3369 { 3370 struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres); 3371 struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode); 3372 struct dentry *dentry; 3373 unsigned long flags; 3374 int extra_ref = 0; 3375 3376 /* 3377 * This node is blocking another node from getting a read 3378 * lock. This happens when we've renamed within a 3379 * directory. We've forced the other nodes to d_delete(), but 3380 * we never actually dropped our lock because it's still 3381 * valid. The downconvert code will retain a PR for this node, 3382 * so there's no further work to do. 3383 */ 3384 if (blocking == DLM_LOCK_PR) 3385 return UNBLOCK_CONTINUE; 3386 3387 /* 3388 * Mark this inode as potentially orphaned. The code in 3389 * ocfs2_delete_inode() will figure out whether it actually 3390 * needs to be freed or not. 3391 */ 3392 spin_lock(&oi->ip_lock); 3393 oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED; 3394 spin_unlock(&oi->ip_lock); 3395 3396 /* 3397 * Yuck. We need to make sure however that the check of 3398 * OCFS2_LOCK_FREEING and the extra reference are atomic with 3399 * respect to a reference decrement or the setting of that 3400 * flag. 3401 */ 3402 spin_lock_irqsave(&lockres->l_lock, flags); 3403 spin_lock(&dentry_attach_lock); 3404 if (!(lockres->l_flags & OCFS2_LOCK_FREEING) 3405 && dl->dl_count) { 3406 dl->dl_count++; 3407 extra_ref = 1; 3408 } 3409 spin_unlock(&dentry_attach_lock); 3410 spin_unlock_irqrestore(&lockres->l_lock, flags); 3411 3412 mlog(0, "extra_ref = %d\n", extra_ref); 3413 3414 /* 3415 * We have a process waiting on us in ocfs2_dentry_iput(), 3416 * which means we can't have any more outstanding 3417 * aliases. There's no need to do any more work. 3418 */ 3419 if (!extra_ref) 3420 return UNBLOCK_CONTINUE; 3421 3422 spin_lock(&dentry_attach_lock); 3423 while (1) { 3424 dentry = ocfs2_find_local_alias(dl->dl_inode, 3425 dl->dl_parent_blkno, 1); 3426 if (!dentry) 3427 break; 3428 spin_unlock(&dentry_attach_lock); 3429 3430 mlog(0, "d_delete(%.*s);\n", dentry->d_name.len, 3431 dentry->d_name.name); 3432 3433 /* 3434 * The following dcache calls may do an 3435 * iput(). Normally we don't want that from the 3436 * downconverting thread, but in this case it's ok 3437 * because the requesting node already has an 3438 * exclusive lock on the inode, so it can't be queued 3439 * for a downconvert. 3440 */ 3441 d_delete(dentry); 3442 dput(dentry); 3443 3444 spin_lock(&dentry_attach_lock); 3445 } 3446 spin_unlock(&dentry_attach_lock); 3447 3448 /* 3449 * If we are the last holder of this dentry lock, there is no 3450 * reason to downconvert so skip straight to the unlock. 3451 */ 3452 if (dl->dl_count == 1) 3453 return UNBLOCK_STOP_POST; 3454 3455 return UNBLOCK_CONTINUE_POST; 3456 } 3457 3458 /* 3459 * This is the filesystem locking protocol. It provides the lock handling 3460 * hooks for the underlying DLM. It has a maximum version number. 3461 * The version number allows interoperability with systems running at 3462 * the same major number and an equal or smaller minor number. 3463 * 3464 * Whenever the filesystem does new things with locks (adds or removes a 3465 * lock, orders them differently, does different things underneath a lock), 3466 * the version must be changed. The protocol is negotiated when joining 3467 * the dlm domain. A node may join the domain if its major version is 3468 * identical to all other nodes and its minor version is greater than 3469 * or equal to all other nodes. When its minor version is greater than 3470 * the other nodes, it will run at the minor version specified by the 3471 * other nodes. 3472 * 3473 * If a locking change is made that will not be compatible with older 3474 * versions, the major number must be increased and the minor version set 3475 * to zero. If a change merely adds a behavior that can be disabled when 3476 * speaking to older versions, the minor version must be increased. If a 3477 * change adds a fully backwards compatible change (eg, LVB changes that 3478 * are just ignored by older versions), the version does not need to be 3479 * updated. 3480 */ 3481 static struct ocfs2_locking_protocol lproto = { 3482 .lp_max_version = { 3483 .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR, 3484 .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR, 3485 }, 3486 .lp_lock_ast = ocfs2_locking_ast, 3487 .lp_blocking_ast = ocfs2_blocking_ast, 3488 .lp_unlock_ast = ocfs2_unlock_ast, 3489 }; 3490 3491 void ocfs2_set_locking_protocol(void) 3492 { 3493 ocfs2_stack_glue_set_locking_protocol(&lproto); 3494 } 3495 3496 3497 static void ocfs2_process_blocked_lock(struct ocfs2_super *osb, 3498 struct ocfs2_lock_res *lockres) 3499 { 3500 int status; 3501 struct ocfs2_unblock_ctl ctl = {0, 0,}; 3502 unsigned long flags; 3503 3504 /* Our reference to the lockres in this function can be 3505 * considered valid until we remove the OCFS2_LOCK_QUEUED 3506 * flag. */ 3507 3508 mlog_entry_void(); 3509 3510 BUG_ON(!lockres); 3511 BUG_ON(!lockres->l_ops); 3512 3513 mlog(0, "lockres %s blocked.\n", lockres->l_name); 3514 3515 /* Detect whether a lock has been marked as going away while 3516 * the downconvert thread was processing other things. A lock can 3517 * still be marked with OCFS2_LOCK_FREEING after this check, 3518 * but short circuiting here will still save us some 3519 * performance. */ 3520 spin_lock_irqsave(&lockres->l_lock, flags); 3521 if (lockres->l_flags & OCFS2_LOCK_FREEING) 3522 goto unqueue; 3523 spin_unlock_irqrestore(&lockres->l_lock, flags); 3524 3525 status = ocfs2_unblock_lock(osb, lockres, &ctl); 3526 if (status < 0) 3527 mlog_errno(status); 3528 3529 spin_lock_irqsave(&lockres->l_lock, flags); 3530 unqueue: 3531 if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) { 3532 lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED); 3533 } else 3534 ocfs2_schedule_blocked_lock(osb, lockres); 3535 3536 mlog(0, "lockres %s, requeue = %s.\n", lockres->l_name, 3537 ctl.requeue ? "yes" : "no"); 3538 spin_unlock_irqrestore(&lockres->l_lock, flags); 3539 3540 if (ctl.unblock_action != UNBLOCK_CONTINUE 3541 && lockres->l_ops->post_unlock) 3542 lockres->l_ops->post_unlock(osb, lockres); 3543 3544 mlog_exit_void(); 3545 } 3546 3547 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb, 3548 struct ocfs2_lock_res *lockres) 3549 { 3550 mlog_entry_void(); 3551 3552 assert_spin_locked(&lockres->l_lock); 3553 3554 if (lockres->l_flags & OCFS2_LOCK_FREEING) { 3555 /* Do not schedule a lock for downconvert when it's on 3556 * the way to destruction - any nodes wanting access 3557 * to the resource will get it soon. */ 3558 mlog(0, "Lockres %s won't be scheduled: flags 0x%lx\n", 3559 lockres->l_name, lockres->l_flags); 3560 return; 3561 } 3562 3563 lockres_or_flags(lockres, OCFS2_LOCK_QUEUED); 3564 3565 spin_lock(&osb->dc_task_lock); 3566 if (list_empty(&lockres->l_blocked_list)) { 3567 list_add_tail(&lockres->l_blocked_list, 3568 &osb->blocked_lock_list); 3569 osb->blocked_lock_count++; 3570 } 3571 spin_unlock(&osb->dc_task_lock); 3572 3573 mlog_exit_void(); 3574 } 3575 3576 static void ocfs2_downconvert_thread_do_work(struct ocfs2_super *osb) 3577 { 3578 unsigned long processed; 3579 struct ocfs2_lock_res *lockres; 3580 3581 mlog_entry_void(); 3582 3583 spin_lock(&osb->dc_task_lock); 3584 /* grab this early so we know to try again if a state change and 3585 * wake happens part-way through our work */ 3586 osb->dc_work_sequence = osb->dc_wake_sequence; 3587 3588 processed = osb->blocked_lock_count; 3589 while (processed) { 3590 BUG_ON(list_empty(&osb->blocked_lock_list)); 3591 3592 lockres = list_entry(osb->blocked_lock_list.next, 3593 struct ocfs2_lock_res, l_blocked_list); 3594 list_del_init(&lockres->l_blocked_list); 3595 osb->blocked_lock_count--; 3596 spin_unlock(&osb->dc_task_lock); 3597 3598 BUG_ON(!processed); 3599 processed--; 3600 3601 ocfs2_process_blocked_lock(osb, lockres); 3602 3603 spin_lock(&osb->dc_task_lock); 3604 } 3605 spin_unlock(&osb->dc_task_lock); 3606 3607 mlog_exit_void(); 3608 } 3609 3610 static int ocfs2_downconvert_thread_lists_empty(struct ocfs2_super *osb) 3611 { 3612 int empty = 0; 3613 3614 spin_lock(&osb->dc_task_lock); 3615 if (list_empty(&osb->blocked_lock_list)) 3616 empty = 1; 3617 3618 spin_unlock(&osb->dc_task_lock); 3619 return empty; 3620 } 3621 3622 static int ocfs2_downconvert_thread_should_wake(struct ocfs2_super *osb) 3623 { 3624 int should_wake = 0; 3625 3626 spin_lock(&osb->dc_task_lock); 3627 if (osb->dc_work_sequence != osb->dc_wake_sequence) 3628 should_wake = 1; 3629 spin_unlock(&osb->dc_task_lock); 3630 3631 return should_wake; 3632 } 3633 3634 static int ocfs2_downconvert_thread(void *arg) 3635 { 3636 int status = 0; 3637 struct ocfs2_super *osb = arg; 3638 3639 /* only quit once we've been asked to stop and there is no more 3640 * work available */ 3641 while (!(kthread_should_stop() && 3642 ocfs2_downconvert_thread_lists_empty(osb))) { 3643 3644 wait_event_interruptible(osb->dc_event, 3645 ocfs2_downconvert_thread_should_wake(osb) || 3646 kthread_should_stop()); 3647 3648 mlog(0, "downconvert_thread: awoken\n"); 3649 3650 ocfs2_downconvert_thread_do_work(osb); 3651 } 3652 3653 osb->dc_task = NULL; 3654 return status; 3655 } 3656 3657 void ocfs2_wake_downconvert_thread(struct ocfs2_super *osb) 3658 { 3659 spin_lock(&osb->dc_task_lock); 3660 /* make sure the voting thread gets a swipe at whatever changes 3661 * the caller may have made to the voting state */ 3662 osb->dc_wake_sequence++; 3663 spin_unlock(&osb->dc_task_lock); 3664 wake_up(&osb->dc_event); 3665 } 3666