1 /* -*- mode: c; c-basic-offset: 8; -*- 2 * vim: noexpandtab sw=8 ts=8 sts=0: 3 * 4 * dlmglue.c 5 * 6 * Code which implements an OCFS2 specific interface to our DLM. 7 * 8 * Copyright (C) 2003, 2004 Oracle. All rights reserved. 9 * 10 * This program is free software; you can redistribute it and/or 11 * modify it under the terms of the GNU General Public 12 * License as published by the Free Software Foundation; either 13 * version 2 of the License, or (at your option) any later version. 14 * 15 * This program is distributed in the hope that it will be useful, 16 * but WITHOUT ANY WARRANTY; without even the implied warranty of 17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 18 * General Public License for more details. 19 * 20 * You should have received a copy of the GNU General Public 21 * License along with this program; if not, write to the 22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 23 * Boston, MA 021110-1307, USA. 24 */ 25 26 #include <linux/types.h> 27 #include <linux/slab.h> 28 #include <linux/highmem.h> 29 #include <linux/mm.h> 30 #include <linux/kthread.h> 31 #include <linux/pagemap.h> 32 #include <linux/debugfs.h> 33 #include <linux/seq_file.h> 34 #include <linux/time.h> 35 #include <linux/quotaops.h> 36 37 #define MLOG_MASK_PREFIX ML_DLM_GLUE 38 #include <cluster/masklog.h> 39 40 #include "ocfs2.h" 41 #include "ocfs2_lockingver.h" 42 43 #include "alloc.h" 44 #include "dcache.h" 45 #include "dlmglue.h" 46 #include "extent_map.h" 47 #include "file.h" 48 #include "heartbeat.h" 49 #include "inode.h" 50 #include "journal.h" 51 #include "stackglue.h" 52 #include "slot_map.h" 53 #include "super.h" 54 #include "uptodate.h" 55 #include "quota.h" 56 57 #include "buffer_head_io.h" 58 59 struct ocfs2_mask_waiter { 60 struct list_head mw_item; 61 int mw_status; 62 struct completion mw_complete; 63 unsigned long mw_mask; 64 unsigned long mw_goal; 65 #ifdef CONFIG_OCFS2_FS_STATS 66 unsigned long long mw_lock_start; 67 #endif 68 }; 69 70 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres); 71 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres); 72 static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres); 73 static struct ocfs2_super *ocfs2_get_qinfo_osb(struct ocfs2_lock_res *lockres); 74 75 /* 76 * Return value from ->downconvert_worker functions. 77 * 78 * These control the precise actions of ocfs2_unblock_lock() 79 * and ocfs2_process_blocked_lock() 80 * 81 */ 82 enum ocfs2_unblock_action { 83 UNBLOCK_CONTINUE = 0, /* Continue downconvert */ 84 UNBLOCK_CONTINUE_POST = 1, /* Continue downconvert, fire 85 * ->post_unlock callback */ 86 UNBLOCK_STOP_POST = 2, /* Do not downconvert, fire 87 * ->post_unlock() callback. */ 88 }; 89 90 struct ocfs2_unblock_ctl { 91 int requeue; 92 enum ocfs2_unblock_action unblock_action; 93 }; 94 95 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres, 96 int new_level); 97 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres); 98 99 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres, 100 int blocking); 101 102 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres, 103 int blocking); 104 105 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb, 106 struct ocfs2_lock_res *lockres); 107 108 static void ocfs2_set_qinfo_lvb(struct ocfs2_lock_res *lockres); 109 110 #define mlog_meta_lvb(__level, __lockres) ocfs2_dump_meta_lvb_info(__level, __PRETTY_FUNCTION__, __LINE__, __lockres) 111 112 /* This aids in debugging situations where a bad LVB might be involved. */ 113 static void ocfs2_dump_meta_lvb_info(u64 level, 114 const char *function, 115 unsigned int line, 116 struct ocfs2_lock_res *lockres) 117 { 118 struct ocfs2_meta_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 119 120 mlog(level, "LVB information for %s (called from %s:%u):\n", 121 lockres->l_name, function, line); 122 mlog(level, "version: %u, clusters: %u, generation: 0x%x\n", 123 lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters), 124 be32_to_cpu(lvb->lvb_igeneration)); 125 mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n", 126 (unsigned long long)be64_to_cpu(lvb->lvb_isize), 127 be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid), 128 be16_to_cpu(lvb->lvb_imode)); 129 mlog(level, "nlink %u, atime_packed 0x%llx, ctime_packed 0x%llx, " 130 "mtime_packed 0x%llx iattr 0x%x\n", be16_to_cpu(lvb->lvb_inlink), 131 (long long)be64_to_cpu(lvb->lvb_iatime_packed), 132 (long long)be64_to_cpu(lvb->lvb_ictime_packed), 133 (long long)be64_to_cpu(lvb->lvb_imtime_packed), 134 be32_to_cpu(lvb->lvb_iattr)); 135 } 136 137 138 /* 139 * OCFS2 Lock Resource Operations 140 * 141 * These fine tune the behavior of the generic dlmglue locking infrastructure. 142 * 143 * The most basic of lock types can point ->l_priv to their respective 144 * struct ocfs2_super and allow the default actions to manage things. 145 * 146 * Right now, each lock type also needs to implement an init function, 147 * and trivial lock/unlock wrappers. ocfs2_simple_drop_lockres() 148 * should be called when the lock is no longer needed (i.e., object 149 * destruction time). 150 */ 151 struct ocfs2_lock_res_ops { 152 /* 153 * Translate an ocfs2_lock_res * into an ocfs2_super *. Define 154 * this callback if ->l_priv is not an ocfs2_super pointer 155 */ 156 struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *); 157 158 /* 159 * Optionally called in the downconvert thread after a 160 * successful downconvert. The lockres will not be referenced 161 * after this callback is called, so it is safe to free 162 * memory, etc. 163 * 164 * The exact semantics of when this is called are controlled 165 * by ->downconvert_worker() 166 */ 167 void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *); 168 169 /* 170 * Allow a lock type to add checks to determine whether it is 171 * safe to downconvert a lock. Return 0 to re-queue the 172 * downconvert at a later time, nonzero to continue. 173 * 174 * For most locks, the default checks that there are no 175 * incompatible holders are sufficient. 176 * 177 * Called with the lockres spinlock held. 178 */ 179 int (*check_downconvert)(struct ocfs2_lock_res *, int); 180 181 /* 182 * Allows a lock type to populate the lock value block. This 183 * is called on downconvert, and when we drop a lock. 184 * 185 * Locks that want to use this should set LOCK_TYPE_USES_LVB 186 * in the flags field. 187 * 188 * Called with the lockres spinlock held. 189 */ 190 void (*set_lvb)(struct ocfs2_lock_res *); 191 192 /* 193 * Called from the downconvert thread when it is determined 194 * that a lock will be downconverted. This is called without 195 * any locks held so the function can do work that might 196 * schedule (syncing out data, etc). 197 * 198 * This should return any one of the ocfs2_unblock_action 199 * values, depending on what it wants the thread to do. 200 */ 201 int (*downconvert_worker)(struct ocfs2_lock_res *, int); 202 203 /* 204 * LOCK_TYPE_* flags which describe the specific requirements 205 * of a lock type. Descriptions of each individual flag follow. 206 */ 207 int flags; 208 }; 209 210 /* 211 * Some locks want to "refresh" potentially stale data when a 212 * meaningful (PRMODE or EXMODE) lock level is first obtained. If this 213 * flag is set, the OCFS2_LOCK_NEEDS_REFRESH flag will be set on the 214 * individual lockres l_flags member from the ast function. It is 215 * expected that the locking wrapper will clear the 216 * OCFS2_LOCK_NEEDS_REFRESH flag when done. 217 */ 218 #define LOCK_TYPE_REQUIRES_REFRESH 0x1 219 220 /* 221 * Indicate that a lock type makes use of the lock value block. The 222 * ->set_lvb lock type callback must be defined. 223 */ 224 #define LOCK_TYPE_USES_LVB 0x2 225 226 static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = { 227 .get_osb = ocfs2_get_inode_osb, 228 .flags = 0, 229 }; 230 231 static struct ocfs2_lock_res_ops ocfs2_inode_inode_lops = { 232 .get_osb = ocfs2_get_inode_osb, 233 .check_downconvert = ocfs2_check_meta_downconvert, 234 .set_lvb = ocfs2_set_meta_lvb, 235 .downconvert_worker = ocfs2_data_convert_worker, 236 .flags = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB, 237 }; 238 239 static struct ocfs2_lock_res_ops ocfs2_super_lops = { 240 .flags = LOCK_TYPE_REQUIRES_REFRESH, 241 }; 242 243 static struct ocfs2_lock_res_ops ocfs2_rename_lops = { 244 .flags = 0, 245 }; 246 247 static struct ocfs2_lock_res_ops ocfs2_nfs_sync_lops = { 248 .flags = 0, 249 }; 250 251 static struct ocfs2_lock_res_ops ocfs2_dentry_lops = { 252 .get_osb = ocfs2_get_dentry_osb, 253 .post_unlock = ocfs2_dentry_post_unlock, 254 .downconvert_worker = ocfs2_dentry_convert_worker, 255 .flags = 0, 256 }; 257 258 static struct ocfs2_lock_res_ops ocfs2_inode_open_lops = { 259 .get_osb = ocfs2_get_inode_osb, 260 .flags = 0, 261 }; 262 263 static struct ocfs2_lock_res_ops ocfs2_flock_lops = { 264 .get_osb = ocfs2_get_file_osb, 265 .flags = 0, 266 }; 267 268 static struct ocfs2_lock_res_ops ocfs2_qinfo_lops = { 269 .set_lvb = ocfs2_set_qinfo_lvb, 270 .get_osb = ocfs2_get_qinfo_osb, 271 .flags = LOCK_TYPE_REQUIRES_REFRESH | LOCK_TYPE_USES_LVB, 272 }; 273 274 static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres) 275 { 276 return lockres->l_type == OCFS2_LOCK_TYPE_META || 277 lockres->l_type == OCFS2_LOCK_TYPE_RW || 278 lockres->l_type == OCFS2_LOCK_TYPE_OPEN; 279 } 280 281 static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres) 282 { 283 BUG_ON(!ocfs2_is_inode_lock(lockres)); 284 285 return (struct inode *) lockres->l_priv; 286 } 287 288 static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres) 289 { 290 BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY); 291 292 return (struct ocfs2_dentry_lock *)lockres->l_priv; 293 } 294 295 static inline struct ocfs2_mem_dqinfo *ocfs2_lock_res_qinfo(struct ocfs2_lock_res *lockres) 296 { 297 BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_QINFO); 298 299 return (struct ocfs2_mem_dqinfo *)lockres->l_priv; 300 } 301 302 static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *lockres) 303 { 304 if (lockres->l_ops->get_osb) 305 return lockres->l_ops->get_osb(lockres); 306 307 return (struct ocfs2_super *)lockres->l_priv; 308 } 309 310 static int ocfs2_lock_create(struct ocfs2_super *osb, 311 struct ocfs2_lock_res *lockres, 312 int level, 313 u32 dlm_flags); 314 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres, 315 int wanted); 316 static void ocfs2_cluster_unlock(struct ocfs2_super *osb, 317 struct ocfs2_lock_res *lockres, 318 int level); 319 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres); 320 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres); 321 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres); 322 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, int level); 323 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb, 324 struct ocfs2_lock_res *lockres); 325 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres, 326 int convert); 327 #define ocfs2_log_dlm_error(_func, _err, _lockres) do { \ 328 if ((_lockres)->l_type != OCFS2_LOCK_TYPE_DENTRY) \ 329 mlog(ML_ERROR, "DLM error %d while calling %s on resource %s\n", \ 330 _err, _func, _lockres->l_name); \ 331 else \ 332 mlog(ML_ERROR, "DLM error %d while calling %s on resource %.*s%08x\n", \ 333 _err, _func, OCFS2_DENTRY_LOCK_INO_START - 1, (_lockres)->l_name, \ 334 (unsigned int)ocfs2_get_dentry_lock_ino(_lockres)); \ 335 } while (0) 336 static int ocfs2_downconvert_thread(void *arg); 337 static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb, 338 struct ocfs2_lock_res *lockres); 339 static int ocfs2_inode_lock_update(struct inode *inode, 340 struct buffer_head **bh); 341 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb); 342 static inline int ocfs2_highest_compat_lock_level(int level); 343 static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres, 344 int new_level); 345 static int ocfs2_downconvert_lock(struct ocfs2_super *osb, 346 struct ocfs2_lock_res *lockres, 347 int new_level, 348 int lvb, 349 unsigned int generation); 350 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb, 351 struct ocfs2_lock_res *lockres); 352 static int ocfs2_cancel_convert(struct ocfs2_super *osb, 353 struct ocfs2_lock_res *lockres); 354 355 356 static void ocfs2_build_lock_name(enum ocfs2_lock_type type, 357 u64 blkno, 358 u32 generation, 359 char *name) 360 { 361 int len; 362 363 mlog_entry_void(); 364 365 BUG_ON(type >= OCFS2_NUM_LOCK_TYPES); 366 367 len = snprintf(name, OCFS2_LOCK_ID_MAX_LEN, "%c%s%016llx%08x", 368 ocfs2_lock_type_char(type), OCFS2_LOCK_ID_PAD, 369 (long long)blkno, generation); 370 371 BUG_ON(len != (OCFS2_LOCK_ID_MAX_LEN - 1)); 372 373 mlog(0, "built lock resource with name: %s\n", name); 374 375 mlog_exit_void(); 376 } 377 378 static DEFINE_SPINLOCK(ocfs2_dlm_tracking_lock); 379 380 static void ocfs2_add_lockres_tracking(struct ocfs2_lock_res *res, 381 struct ocfs2_dlm_debug *dlm_debug) 382 { 383 mlog(0, "Add tracking for lockres %s\n", res->l_name); 384 385 spin_lock(&ocfs2_dlm_tracking_lock); 386 list_add(&res->l_debug_list, &dlm_debug->d_lockres_tracking); 387 spin_unlock(&ocfs2_dlm_tracking_lock); 388 } 389 390 static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res) 391 { 392 spin_lock(&ocfs2_dlm_tracking_lock); 393 if (!list_empty(&res->l_debug_list)) 394 list_del_init(&res->l_debug_list); 395 spin_unlock(&ocfs2_dlm_tracking_lock); 396 } 397 398 #ifdef CONFIG_OCFS2_FS_STATS 399 static void ocfs2_init_lock_stats(struct ocfs2_lock_res *res) 400 { 401 res->l_lock_num_prmode = 0; 402 res->l_lock_num_prmode_failed = 0; 403 res->l_lock_total_prmode = 0; 404 res->l_lock_max_prmode = 0; 405 res->l_lock_num_exmode = 0; 406 res->l_lock_num_exmode_failed = 0; 407 res->l_lock_total_exmode = 0; 408 res->l_lock_max_exmode = 0; 409 res->l_lock_refresh = 0; 410 } 411 412 static void ocfs2_update_lock_stats(struct ocfs2_lock_res *res, int level, 413 struct ocfs2_mask_waiter *mw, int ret) 414 { 415 unsigned long long *num, *sum; 416 unsigned int *max, *failed; 417 struct timespec ts = current_kernel_time(); 418 unsigned long long time = timespec_to_ns(&ts) - mw->mw_lock_start; 419 420 if (level == LKM_PRMODE) { 421 num = &res->l_lock_num_prmode; 422 sum = &res->l_lock_total_prmode; 423 max = &res->l_lock_max_prmode; 424 failed = &res->l_lock_num_prmode_failed; 425 } else if (level == LKM_EXMODE) { 426 num = &res->l_lock_num_exmode; 427 sum = &res->l_lock_total_exmode; 428 max = &res->l_lock_max_exmode; 429 failed = &res->l_lock_num_exmode_failed; 430 } else 431 return; 432 433 (*num)++; 434 (*sum) += time; 435 if (time > *max) 436 *max = time; 437 if (ret) 438 (*failed)++; 439 } 440 441 static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres) 442 { 443 lockres->l_lock_refresh++; 444 } 445 446 static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw) 447 { 448 struct timespec ts = current_kernel_time(); 449 mw->mw_lock_start = timespec_to_ns(&ts); 450 } 451 #else 452 static inline void ocfs2_init_lock_stats(struct ocfs2_lock_res *res) 453 { 454 } 455 static inline void ocfs2_update_lock_stats(struct ocfs2_lock_res *res, 456 int level, struct ocfs2_mask_waiter *mw, int ret) 457 { 458 } 459 static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres) 460 { 461 } 462 static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw) 463 { 464 } 465 #endif 466 467 static void ocfs2_lock_res_init_common(struct ocfs2_super *osb, 468 struct ocfs2_lock_res *res, 469 enum ocfs2_lock_type type, 470 struct ocfs2_lock_res_ops *ops, 471 void *priv) 472 { 473 res->l_type = type; 474 res->l_ops = ops; 475 res->l_priv = priv; 476 477 res->l_level = DLM_LOCK_IV; 478 res->l_requested = DLM_LOCK_IV; 479 res->l_blocking = DLM_LOCK_IV; 480 res->l_action = OCFS2_AST_INVALID; 481 res->l_unlock_action = OCFS2_UNLOCK_INVALID; 482 483 res->l_flags = OCFS2_LOCK_INITIALIZED; 484 485 ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug); 486 487 ocfs2_init_lock_stats(res); 488 } 489 490 void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res) 491 { 492 /* This also clears out the lock status block */ 493 memset(res, 0, sizeof(struct ocfs2_lock_res)); 494 spin_lock_init(&res->l_lock); 495 init_waitqueue_head(&res->l_event); 496 INIT_LIST_HEAD(&res->l_blocked_list); 497 INIT_LIST_HEAD(&res->l_mask_waiters); 498 } 499 500 void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res, 501 enum ocfs2_lock_type type, 502 unsigned int generation, 503 struct inode *inode) 504 { 505 struct ocfs2_lock_res_ops *ops; 506 507 switch(type) { 508 case OCFS2_LOCK_TYPE_RW: 509 ops = &ocfs2_inode_rw_lops; 510 break; 511 case OCFS2_LOCK_TYPE_META: 512 ops = &ocfs2_inode_inode_lops; 513 break; 514 case OCFS2_LOCK_TYPE_OPEN: 515 ops = &ocfs2_inode_open_lops; 516 break; 517 default: 518 mlog_bug_on_msg(1, "type: %d\n", type); 519 ops = NULL; /* thanks, gcc */ 520 break; 521 }; 522 523 ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno, 524 generation, res->l_name); 525 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode); 526 } 527 528 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres) 529 { 530 struct inode *inode = ocfs2_lock_res_inode(lockres); 531 532 return OCFS2_SB(inode->i_sb); 533 } 534 535 static struct ocfs2_super *ocfs2_get_qinfo_osb(struct ocfs2_lock_res *lockres) 536 { 537 struct ocfs2_mem_dqinfo *info = lockres->l_priv; 538 539 return OCFS2_SB(info->dqi_gi.dqi_sb); 540 } 541 542 static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres) 543 { 544 struct ocfs2_file_private *fp = lockres->l_priv; 545 546 return OCFS2_SB(fp->fp_file->f_mapping->host->i_sb); 547 } 548 549 static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres) 550 { 551 __be64 inode_blkno_be; 552 553 memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], 554 sizeof(__be64)); 555 556 return be64_to_cpu(inode_blkno_be); 557 } 558 559 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres) 560 { 561 struct ocfs2_dentry_lock *dl = lockres->l_priv; 562 563 return OCFS2_SB(dl->dl_inode->i_sb); 564 } 565 566 void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl, 567 u64 parent, struct inode *inode) 568 { 569 int len; 570 u64 inode_blkno = OCFS2_I(inode)->ip_blkno; 571 __be64 inode_blkno_be = cpu_to_be64(inode_blkno); 572 struct ocfs2_lock_res *lockres = &dl->dl_lockres; 573 574 ocfs2_lock_res_init_once(lockres); 575 576 /* 577 * Unfortunately, the standard lock naming scheme won't work 578 * here because we have two 16 byte values to use. Instead, 579 * we'll stuff the inode number as a binary value. We still 580 * want error prints to show something without garbling the 581 * display, so drop a null byte in there before the inode 582 * number. A future version of OCFS2 will likely use all 583 * binary lock names. The stringified names have been a 584 * tremendous aid in debugging, but now that the debugfs 585 * interface exists, we can mangle things there if need be. 586 * 587 * NOTE: We also drop the standard "pad" value (the total lock 588 * name size stays the same though - the last part is all 589 * zeros due to the memset in ocfs2_lock_res_init_once() 590 */ 591 len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START, 592 "%c%016llx", 593 ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY), 594 (long long)parent); 595 596 BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1)); 597 598 memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be, 599 sizeof(__be64)); 600 601 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres, 602 OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops, 603 dl); 604 } 605 606 static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res, 607 struct ocfs2_super *osb) 608 { 609 /* Superblock lockres doesn't come from a slab so we call init 610 * once on it manually. */ 611 ocfs2_lock_res_init_once(res); 612 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO, 613 0, res->l_name); 614 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER, 615 &ocfs2_super_lops, osb); 616 } 617 618 static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res, 619 struct ocfs2_super *osb) 620 { 621 /* Rename lockres doesn't come from a slab so we call init 622 * once on it manually. */ 623 ocfs2_lock_res_init_once(res); 624 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name); 625 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME, 626 &ocfs2_rename_lops, osb); 627 } 628 629 static void ocfs2_nfs_sync_lock_res_init(struct ocfs2_lock_res *res, 630 struct ocfs2_super *osb) 631 { 632 /* nfs_sync lockres doesn't come from a slab so we call init 633 * once on it manually. */ 634 ocfs2_lock_res_init_once(res); 635 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_NFS_SYNC, 0, 0, res->l_name); 636 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_NFS_SYNC, 637 &ocfs2_nfs_sync_lops, osb); 638 } 639 640 void ocfs2_file_lock_res_init(struct ocfs2_lock_res *lockres, 641 struct ocfs2_file_private *fp) 642 { 643 struct inode *inode = fp->fp_file->f_mapping->host; 644 struct ocfs2_inode_info *oi = OCFS2_I(inode); 645 646 ocfs2_lock_res_init_once(lockres); 647 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_FLOCK, oi->ip_blkno, 648 inode->i_generation, lockres->l_name); 649 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres, 650 OCFS2_LOCK_TYPE_FLOCK, &ocfs2_flock_lops, 651 fp); 652 lockres->l_flags |= OCFS2_LOCK_NOCACHE; 653 } 654 655 void ocfs2_qinfo_lock_res_init(struct ocfs2_lock_res *lockres, 656 struct ocfs2_mem_dqinfo *info) 657 { 658 ocfs2_lock_res_init_once(lockres); 659 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_QINFO, info->dqi_gi.dqi_type, 660 0, lockres->l_name); 661 ocfs2_lock_res_init_common(OCFS2_SB(info->dqi_gi.dqi_sb), lockres, 662 OCFS2_LOCK_TYPE_QINFO, &ocfs2_qinfo_lops, 663 info); 664 } 665 666 void ocfs2_lock_res_free(struct ocfs2_lock_res *res) 667 { 668 mlog_entry_void(); 669 670 if (!(res->l_flags & OCFS2_LOCK_INITIALIZED)) 671 return; 672 673 ocfs2_remove_lockres_tracking(res); 674 675 mlog_bug_on_msg(!list_empty(&res->l_blocked_list), 676 "Lockres %s is on the blocked list\n", 677 res->l_name); 678 mlog_bug_on_msg(!list_empty(&res->l_mask_waiters), 679 "Lockres %s has mask waiters pending\n", 680 res->l_name); 681 mlog_bug_on_msg(spin_is_locked(&res->l_lock), 682 "Lockres %s is locked\n", 683 res->l_name); 684 mlog_bug_on_msg(res->l_ro_holders, 685 "Lockres %s has %u ro holders\n", 686 res->l_name, res->l_ro_holders); 687 mlog_bug_on_msg(res->l_ex_holders, 688 "Lockres %s has %u ex holders\n", 689 res->l_name, res->l_ex_holders); 690 691 /* Need to clear out the lock status block for the dlm */ 692 memset(&res->l_lksb, 0, sizeof(res->l_lksb)); 693 694 res->l_flags = 0UL; 695 mlog_exit_void(); 696 } 697 698 static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres, 699 int level) 700 { 701 mlog_entry_void(); 702 703 BUG_ON(!lockres); 704 705 switch(level) { 706 case DLM_LOCK_EX: 707 lockres->l_ex_holders++; 708 break; 709 case DLM_LOCK_PR: 710 lockres->l_ro_holders++; 711 break; 712 default: 713 BUG(); 714 } 715 716 mlog_exit_void(); 717 } 718 719 static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres, 720 int level) 721 { 722 mlog_entry_void(); 723 724 BUG_ON(!lockres); 725 726 switch(level) { 727 case DLM_LOCK_EX: 728 BUG_ON(!lockres->l_ex_holders); 729 lockres->l_ex_holders--; 730 break; 731 case DLM_LOCK_PR: 732 BUG_ON(!lockres->l_ro_holders); 733 lockres->l_ro_holders--; 734 break; 735 default: 736 BUG(); 737 } 738 mlog_exit_void(); 739 } 740 741 /* WARNING: This function lives in a world where the only three lock 742 * levels are EX, PR, and NL. It *will* have to be adjusted when more 743 * lock types are added. */ 744 static inline int ocfs2_highest_compat_lock_level(int level) 745 { 746 int new_level = DLM_LOCK_EX; 747 748 if (level == DLM_LOCK_EX) 749 new_level = DLM_LOCK_NL; 750 else if (level == DLM_LOCK_PR) 751 new_level = DLM_LOCK_PR; 752 return new_level; 753 } 754 755 static void lockres_set_flags(struct ocfs2_lock_res *lockres, 756 unsigned long newflags) 757 { 758 struct ocfs2_mask_waiter *mw, *tmp; 759 760 assert_spin_locked(&lockres->l_lock); 761 762 lockres->l_flags = newflags; 763 764 list_for_each_entry_safe(mw, tmp, &lockres->l_mask_waiters, mw_item) { 765 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal) 766 continue; 767 768 list_del_init(&mw->mw_item); 769 mw->mw_status = 0; 770 complete(&mw->mw_complete); 771 } 772 } 773 static void lockres_or_flags(struct ocfs2_lock_res *lockres, unsigned long or) 774 { 775 lockres_set_flags(lockres, lockres->l_flags | or); 776 } 777 static void lockres_clear_flags(struct ocfs2_lock_res *lockres, 778 unsigned long clear) 779 { 780 lockres_set_flags(lockres, lockres->l_flags & ~clear); 781 } 782 783 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres) 784 { 785 mlog_entry_void(); 786 787 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 788 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED)); 789 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); 790 BUG_ON(lockres->l_blocking <= DLM_LOCK_NL); 791 792 lockres->l_level = lockres->l_requested; 793 if (lockres->l_level <= 794 ocfs2_highest_compat_lock_level(lockres->l_blocking)) { 795 lockres->l_blocking = DLM_LOCK_NL; 796 lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED); 797 } 798 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 799 800 mlog_exit_void(); 801 } 802 803 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres) 804 { 805 mlog_entry_void(); 806 807 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 808 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED)); 809 810 /* Convert from RO to EX doesn't really need anything as our 811 * information is already up to data. Convert from NL to 812 * *anything* however should mark ourselves as needing an 813 * update */ 814 if (lockres->l_level == DLM_LOCK_NL && 815 lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 816 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 817 818 lockres->l_level = lockres->l_requested; 819 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 820 821 mlog_exit_void(); 822 } 823 824 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres) 825 { 826 mlog_entry_void(); 827 828 BUG_ON((!(lockres->l_flags & OCFS2_LOCK_BUSY))); 829 BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED); 830 831 if (lockres->l_requested > DLM_LOCK_NL && 832 !(lockres->l_flags & OCFS2_LOCK_LOCAL) && 833 lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 834 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 835 836 lockres->l_level = lockres->l_requested; 837 lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED); 838 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 839 840 mlog_exit_void(); 841 } 842 843 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, 844 int level) 845 { 846 int needs_downconvert = 0; 847 mlog_entry_void(); 848 849 assert_spin_locked(&lockres->l_lock); 850 851 lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED); 852 853 if (level > lockres->l_blocking) { 854 /* only schedule a downconvert if we haven't already scheduled 855 * one that goes low enough to satisfy the level we're 856 * blocking. this also catches the case where we get 857 * duplicate BASTs */ 858 if (ocfs2_highest_compat_lock_level(level) < 859 ocfs2_highest_compat_lock_level(lockres->l_blocking)) 860 needs_downconvert = 1; 861 862 lockres->l_blocking = level; 863 } 864 865 mlog_exit(needs_downconvert); 866 return needs_downconvert; 867 } 868 869 /* 870 * OCFS2_LOCK_PENDING and l_pending_gen. 871 * 872 * Why does OCFS2_LOCK_PENDING exist? To close a race between setting 873 * OCFS2_LOCK_BUSY and calling ocfs2_dlm_lock(). See ocfs2_unblock_lock() 874 * for more details on the race. 875 * 876 * OCFS2_LOCK_PENDING closes the race quite nicely. However, it introduces 877 * a race on itself. In o2dlm, we can get the ast before ocfs2_dlm_lock() 878 * returns. The ast clears OCFS2_LOCK_BUSY, and must therefore clear 879 * OCFS2_LOCK_PENDING at the same time. When ocfs2_dlm_lock() returns, 880 * the caller is going to try to clear PENDING again. If nothing else is 881 * happening, __lockres_clear_pending() sees PENDING is unset and does 882 * nothing. 883 * 884 * But what if another path (eg downconvert thread) has just started a 885 * new locking action? The other path has re-set PENDING. Our path 886 * cannot clear PENDING, because that will re-open the original race 887 * window. 888 * 889 * [Example] 890 * 891 * ocfs2_meta_lock() 892 * ocfs2_cluster_lock() 893 * set BUSY 894 * set PENDING 895 * drop l_lock 896 * ocfs2_dlm_lock() 897 * ocfs2_locking_ast() ocfs2_downconvert_thread() 898 * clear PENDING ocfs2_unblock_lock() 899 * take_l_lock 900 * !BUSY 901 * ocfs2_prepare_downconvert() 902 * set BUSY 903 * set PENDING 904 * drop l_lock 905 * take l_lock 906 * clear PENDING 907 * drop l_lock 908 * <window> 909 * ocfs2_dlm_lock() 910 * 911 * So as you can see, we now have a window where l_lock is not held, 912 * PENDING is not set, and ocfs2_dlm_lock() has not been called. 913 * 914 * The core problem is that ocfs2_cluster_lock() has cleared the PENDING 915 * set by ocfs2_prepare_downconvert(). That wasn't nice. 916 * 917 * To solve this we introduce l_pending_gen. A call to 918 * lockres_clear_pending() will only do so when it is passed a generation 919 * number that matches the lockres. lockres_set_pending() will return the 920 * current generation number. When ocfs2_cluster_lock() goes to clear 921 * PENDING, it passes the generation it got from set_pending(). In our 922 * example above, the generation numbers will *not* match. Thus, 923 * ocfs2_cluster_lock() will not clear the PENDING set by 924 * ocfs2_prepare_downconvert(). 925 */ 926 927 /* Unlocked version for ocfs2_locking_ast() */ 928 static void __lockres_clear_pending(struct ocfs2_lock_res *lockres, 929 unsigned int generation, 930 struct ocfs2_super *osb) 931 { 932 assert_spin_locked(&lockres->l_lock); 933 934 /* 935 * The ast and locking functions can race us here. The winner 936 * will clear pending, the loser will not. 937 */ 938 if (!(lockres->l_flags & OCFS2_LOCK_PENDING) || 939 (lockres->l_pending_gen != generation)) 940 return; 941 942 lockres_clear_flags(lockres, OCFS2_LOCK_PENDING); 943 lockres->l_pending_gen++; 944 945 /* 946 * The downconvert thread may have skipped us because we 947 * were PENDING. Wake it up. 948 */ 949 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) 950 ocfs2_wake_downconvert_thread(osb); 951 } 952 953 /* Locked version for callers of ocfs2_dlm_lock() */ 954 static void lockres_clear_pending(struct ocfs2_lock_res *lockres, 955 unsigned int generation, 956 struct ocfs2_super *osb) 957 { 958 unsigned long flags; 959 960 spin_lock_irqsave(&lockres->l_lock, flags); 961 __lockres_clear_pending(lockres, generation, osb); 962 spin_unlock_irqrestore(&lockres->l_lock, flags); 963 } 964 965 static unsigned int lockres_set_pending(struct ocfs2_lock_res *lockres) 966 { 967 assert_spin_locked(&lockres->l_lock); 968 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 969 970 lockres_or_flags(lockres, OCFS2_LOCK_PENDING); 971 972 return lockres->l_pending_gen; 973 } 974 975 976 static void ocfs2_blocking_ast(void *opaque, int level) 977 { 978 struct ocfs2_lock_res *lockres = opaque; 979 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 980 int needs_downconvert; 981 unsigned long flags; 982 983 BUG_ON(level <= DLM_LOCK_NL); 984 985 mlog(0, "BAST fired for lockres %s, blocking %d, level %d type %s\n", 986 lockres->l_name, level, lockres->l_level, 987 ocfs2_lock_type_string(lockres->l_type)); 988 989 /* 990 * We can skip the bast for locks which don't enable caching - 991 * they'll be dropped at the earliest possible time anyway. 992 */ 993 if (lockres->l_flags & OCFS2_LOCK_NOCACHE) 994 return; 995 996 spin_lock_irqsave(&lockres->l_lock, flags); 997 needs_downconvert = ocfs2_generic_handle_bast(lockres, level); 998 if (needs_downconvert) 999 ocfs2_schedule_blocked_lock(osb, lockres); 1000 spin_unlock_irqrestore(&lockres->l_lock, flags); 1001 1002 wake_up(&lockres->l_event); 1003 1004 ocfs2_wake_downconvert_thread(osb); 1005 } 1006 1007 static void ocfs2_locking_ast(void *opaque) 1008 { 1009 struct ocfs2_lock_res *lockres = opaque; 1010 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 1011 unsigned long flags; 1012 int status; 1013 1014 spin_lock_irqsave(&lockres->l_lock, flags); 1015 1016 status = ocfs2_dlm_lock_status(&lockres->l_lksb); 1017 1018 if (status == -EAGAIN) { 1019 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 1020 goto out; 1021 } 1022 1023 if (status) { 1024 mlog(ML_ERROR, "lockres %s: lksb status value of %d!\n", 1025 lockres->l_name, status); 1026 spin_unlock_irqrestore(&lockres->l_lock, flags); 1027 return; 1028 } 1029 1030 switch(lockres->l_action) { 1031 case OCFS2_AST_ATTACH: 1032 ocfs2_generic_handle_attach_action(lockres); 1033 lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL); 1034 break; 1035 case OCFS2_AST_CONVERT: 1036 ocfs2_generic_handle_convert_action(lockres); 1037 break; 1038 case OCFS2_AST_DOWNCONVERT: 1039 ocfs2_generic_handle_downconvert_action(lockres); 1040 break; 1041 default: 1042 mlog(ML_ERROR, "lockres %s: ast fired with invalid action: %u " 1043 "lockres flags = 0x%lx, unlock action: %u\n", 1044 lockres->l_name, lockres->l_action, lockres->l_flags, 1045 lockres->l_unlock_action); 1046 BUG(); 1047 } 1048 out: 1049 /* set it to something invalid so if we get called again we 1050 * can catch it. */ 1051 lockres->l_action = OCFS2_AST_INVALID; 1052 1053 /* Did we try to cancel this lock? Clear that state */ 1054 if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) 1055 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 1056 1057 /* 1058 * We may have beaten the locking functions here. We certainly 1059 * know that dlm_lock() has been called :-) 1060 * Because we can't have two lock calls in flight at once, we 1061 * can use lockres->l_pending_gen. 1062 */ 1063 __lockres_clear_pending(lockres, lockres->l_pending_gen, osb); 1064 1065 wake_up(&lockres->l_event); 1066 spin_unlock_irqrestore(&lockres->l_lock, flags); 1067 } 1068 1069 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres, 1070 int convert) 1071 { 1072 unsigned long flags; 1073 1074 mlog_entry_void(); 1075 spin_lock_irqsave(&lockres->l_lock, flags); 1076 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 1077 if (convert) 1078 lockres->l_action = OCFS2_AST_INVALID; 1079 else 1080 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 1081 spin_unlock_irqrestore(&lockres->l_lock, flags); 1082 1083 wake_up(&lockres->l_event); 1084 mlog_exit_void(); 1085 } 1086 1087 /* Note: If we detect another process working on the lock (i.e., 1088 * OCFS2_LOCK_BUSY), we'll bail out returning 0. It's up to the caller 1089 * to do the right thing in that case. 1090 */ 1091 static int ocfs2_lock_create(struct ocfs2_super *osb, 1092 struct ocfs2_lock_res *lockres, 1093 int level, 1094 u32 dlm_flags) 1095 { 1096 int ret = 0; 1097 unsigned long flags; 1098 unsigned int gen; 1099 1100 mlog_entry_void(); 1101 1102 mlog(0, "lock %s, level = %d, flags = %u\n", lockres->l_name, level, 1103 dlm_flags); 1104 1105 spin_lock_irqsave(&lockres->l_lock, flags); 1106 if ((lockres->l_flags & OCFS2_LOCK_ATTACHED) || 1107 (lockres->l_flags & OCFS2_LOCK_BUSY)) { 1108 spin_unlock_irqrestore(&lockres->l_lock, flags); 1109 goto bail; 1110 } 1111 1112 lockres->l_action = OCFS2_AST_ATTACH; 1113 lockres->l_requested = level; 1114 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 1115 gen = lockres_set_pending(lockres); 1116 spin_unlock_irqrestore(&lockres->l_lock, flags); 1117 1118 ret = ocfs2_dlm_lock(osb->cconn, 1119 level, 1120 &lockres->l_lksb, 1121 dlm_flags, 1122 lockres->l_name, 1123 OCFS2_LOCK_ID_MAX_LEN - 1, 1124 lockres); 1125 lockres_clear_pending(lockres, gen, osb); 1126 if (ret) { 1127 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 1128 ocfs2_recover_from_dlm_error(lockres, 1); 1129 } 1130 1131 mlog(0, "lock %s, return from ocfs2_dlm_lock\n", lockres->l_name); 1132 1133 bail: 1134 mlog_exit(ret); 1135 return ret; 1136 } 1137 1138 static inline int ocfs2_check_wait_flag(struct ocfs2_lock_res *lockres, 1139 int flag) 1140 { 1141 unsigned long flags; 1142 int ret; 1143 1144 spin_lock_irqsave(&lockres->l_lock, flags); 1145 ret = lockres->l_flags & flag; 1146 spin_unlock_irqrestore(&lockres->l_lock, flags); 1147 1148 return ret; 1149 } 1150 1151 static inline void ocfs2_wait_on_busy_lock(struct ocfs2_lock_res *lockres) 1152 1153 { 1154 wait_event(lockres->l_event, 1155 !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_BUSY)); 1156 } 1157 1158 static inline void ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res *lockres) 1159 1160 { 1161 wait_event(lockres->l_event, 1162 !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_REFRESHING)); 1163 } 1164 1165 /* predict what lock level we'll be dropping down to on behalf 1166 * of another node, and return true if the currently wanted 1167 * level will be compatible with it. */ 1168 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres, 1169 int wanted) 1170 { 1171 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); 1172 1173 return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking); 1174 } 1175 1176 static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw) 1177 { 1178 INIT_LIST_HEAD(&mw->mw_item); 1179 init_completion(&mw->mw_complete); 1180 ocfs2_init_start_time(mw); 1181 } 1182 1183 static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw) 1184 { 1185 wait_for_completion(&mw->mw_complete); 1186 /* Re-arm the completion in case we want to wait on it again */ 1187 INIT_COMPLETION(mw->mw_complete); 1188 return mw->mw_status; 1189 } 1190 1191 static void lockres_add_mask_waiter(struct ocfs2_lock_res *lockres, 1192 struct ocfs2_mask_waiter *mw, 1193 unsigned long mask, 1194 unsigned long goal) 1195 { 1196 BUG_ON(!list_empty(&mw->mw_item)); 1197 1198 assert_spin_locked(&lockres->l_lock); 1199 1200 list_add_tail(&mw->mw_item, &lockres->l_mask_waiters); 1201 mw->mw_mask = mask; 1202 mw->mw_goal = goal; 1203 } 1204 1205 /* returns 0 if the mw that was removed was already satisfied, -EBUSY 1206 * if the mask still hadn't reached its goal */ 1207 static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres, 1208 struct ocfs2_mask_waiter *mw) 1209 { 1210 unsigned long flags; 1211 int ret = 0; 1212 1213 spin_lock_irqsave(&lockres->l_lock, flags); 1214 if (!list_empty(&mw->mw_item)) { 1215 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal) 1216 ret = -EBUSY; 1217 1218 list_del_init(&mw->mw_item); 1219 init_completion(&mw->mw_complete); 1220 } 1221 spin_unlock_irqrestore(&lockres->l_lock, flags); 1222 1223 return ret; 1224 1225 } 1226 1227 static int ocfs2_wait_for_mask_interruptible(struct ocfs2_mask_waiter *mw, 1228 struct ocfs2_lock_res *lockres) 1229 { 1230 int ret; 1231 1232 ret = wait_for_completion_interruptible(&mw->mw_complete); 1233 if (ret) 1234 lockres_remove_mask_waiter(lockres, mw); 1235 else 1236 ret = mw->mw_status; 1237 /* Re-arm the completion in case we want to wait on it again */ 1238 INIT_COMPLETION(mw->mw_complete); 1239 return ret; 1240 } 1241 1242 static int ocfs2_cluster_lock(struct ocfs2_super *osb, 1243 struct ocfs2_lock_res *lockres, 1244 int level, 1245 u32 lkm_flags, 1246 int arg_flags) 1247 { 1248 struct ocfs2_mask_waiter mw; 1249 int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR); 1250 int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */ 1251 unsigned long flags; 1252 unsigned int gen; 1253 int noqueue_attempted = 0; 1254 1255 mlog_entry_void(); 1256 1257 ocfs2_init_mask_waiter(&mw); 1258 1259 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) 1260 lkm_flags |= DLM_LKF_VALBLK; 1261 1262 again: 1263 wait = 0; 1264 1265 if (catch_signals && signal_pending(current)) { 1266 ret = -ERESTARTSYS; 1267 goto out; 1268 } 1269 1270 spin_lock_irqsave(&lockres->l_lock, flags); 1271 1272 mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING, 1273 "Cluster lock called on freeing lockres %s! flags " 1274 "0x%lx\n", lockres->l_name, lockres->l_flags); 1275 1276 /* We only compare against the currently granted level 1277 * here. If the lock is blocked waiting on a downconvert, 1278 * we'll get caught below. */ 1279 if (lockres->l_flags & OCFS2_LOCK_BUSY && 1280 level > lockres->l_level) { 1281 /* is someone sitting in dlm_lock? If so, wait on 1282 * them. */ 1283 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1284 wait = 1; 1285 goto unlock; 1286 } 1287 1288 if (lockres->l_flags & OCFS2_LOCK_BLOCKED && 1289 !ocfs2_may_continue_on_blocked_lock(lockres, level)) { 1290 /* is the lock is currently blocked on behalf of 1291 * another node */ 1292 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BLOCKED, 0); 1293 wait = 1; 1294 goto unlock; 1295 } 1296 1297 if (level > lockres->l_level) { 1298 if (noqueue_attempted > 0) { 1299 ret = -EAGAIN; 1300 goto unlock; 1301 } 1302 if (lkm_flags & DLM_LKF_NOQUEUE) 1303 noqueue_attempted = 1; 1304 1305 if (lockres->l_action != OCFS2_AST_INVALID) 1306 mlog(ML_ERROR, "lockres %s has action %u pending\n", 1307 lockres->l_name, lockres->l_action); 1308 1309 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 1310 lockres->l_action = OCFS2_AST_ATTACH; 1311 lkm_flags &= ~DLM_LKF_CONVERT; 1312 } else { 1313 lockres->l_action = OCFS2_AST_CONVERT; 1314 lkm_flags |= DLM_LKF_CONVERT; 1315 } 1316 1317 lockres->l_requested = level; 1318 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 1319 gen = lockres_set_pending(lockres); 1320 spin_unlock_irqrestore(&lockres->l_lock, flags); 1321 1322 BUG_ON(level == DLM_LOCK_IV); 1323 BUG_ON(level == DLM_LOCK_NL); 1324 1325 mlog(0, "lock %s, convert from %d to level = %d\n", 1326 lockres->l_name, lockres->l_level, level); 1327 1328 /* call dlm_lock to upgrade lock now */ 1329 ret = ocfs2_dlm_lock(osb->cconn, 1330 level, 1331 &lockres->l_lksb, 1332 lkm_flags, 1333 lockres->l_name, 1334 OCFS2_LOCK_ID_MAX_LEN - 1, 1335 lockres); 1336 lockres_clear_pending(lockres, gen, osb); 1337 if (ret) { 1338 if (!(lkm_flags & DLM_LKF_NOQUEUE) || 1339 (ret != -EAGAIN)) { 1340 ocfs2_log_dlm_error("ocfs2_dlm_lock", 1341 ret, lockres); 1342 } 1343 ocfs2_recover_from_dlm_error(lockres, 1); 1344 goto out; 1345 } 1346 1347 mlog(0, "lock %s, successful return from ocfs2_dlm_lock\n", 1348 lockres->l_name); 1349 1350 /* At this point we've gone inside the dlm and need to 1351 * complete our work regardless. */ 1352 catch_signals = 0; 1353 1354 /* wait for busy to clear and carry on */ 1355 goto again; 1356 } 1357 1358 /* Ok, if we get here then we're good to go. */ 1359 ocfs2_inc_holders(lockres, level); 1360 1361 ret = 0; 1362 unlock: 1363 spin_unlock_irqrestore(&lockres->l_lock, flags); 1364 out: 1365 /* 1366 * This is helping work around a lock inversion between the page lock 1367 * and dlm locks. One path holds the page lock while calling aops 1368 * which block acquiring dlm locks. The voting thread holds dlm 1369 * locks while acquiring page locks while down converting data locks. 1370 * This block is helping an aop path notice the inversion and back 1371 * off to unlock its page lock before trying the dlm lock again. 1372 */ 1373 if (wait && arg_flags & OCFS2_LOCK_NONBLOCK && 1374 mw.mw_mask & (OCFS2_LOCK_BUSY|OCFS2_LOCK_BLOCKED)) { 1375 wait = 0; 1376 if (lockres_remove_mask_waiter(lockres, &mw)) 1377 ret = -EAGAIN; 1378 else 1379 goto again; 1380 } 1381 if (wait) { 1382 ret = ocfs2_wait_for_mask(&mw); 1383 if (ret == 0) 1384 goto again; 1385 mlog_errno(ret); 1386 } 1387 ocfs2_update_lock_stats(lockres, level, &mw, ret); 1388 1389 mlog_exit(ret); 1390 return ret; 1391 } 1392 1393 static void ocfs2_cluster_unlock(struct ocfs2_super *osb, 1394 struct ocfs2_lock_res *lockres, 1395 int level) 1396 { 1397 unsigned long flags; 1398 1399 mlog_entry_void(); 1400 spin_lock_irqsave(&lockres->l_lock, flags); 1401 ocfs2_dec_holders(lockres, level); 1402 ocfs2_downconvert_on_unlock(osb, lockres); 1403 spin_unlock_irqrestore(&lockres->l_lock, flags); 1404 mlog_exit_void(); 1405 } 1406 1407 static int ocfs2_create_new_lock(struct ocfs2_super *osb, 1408 struct ocfs2_lock_res *lockres, 1409 int ex, 1410 int local) 1411 { 1412 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 1413 unsigned long flags; 1414 u32 lkm_flags = local ? DLM_LKF_LOCAL : 0; 1415 1416 spin_lock_irqsave(&lockres->l_lock, flags); 1417 BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED); 1418 lockres_or_flags(lockres, OCFS2_LOCK_LOCAL); 1419 spin_unlock_irqrestore(&lockres->l_lock, flags); 1420 1421 return ocfs2_lock_create(osb, lockres, level, lkm_flags); 1422 } 1423 1424 /* Grants us an EX lock on the data and metadata resources, skipping 1425 * the normal cluster directory lookup. Use this ONLY on newly created 1426 * inodes which other nodes can't possibly see, and which haven't been 1427 * hashed in the inode hash yet. This can give us a good performance 1428 * increase as it'll skip the network broadcast normally associated 1429 * with creating a new lock resource. */ 1430 int ocfs2_create_new_inode_locks(struct inode *inode) 1431 { 1432 int ret; 1433 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1434 1435 BUG_ON(!inode); 1436 BUG_ON(!ocfs2_inode_is_new(inode)); 1437 1438 mlog_entry_void(); 1439 1440 mlog(0, "Inode %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno); 1441 1442 /* NOTE: That we don't increment any of the holder counts, nor 1443 * do we add anything to a journal handle. Since this is 1444 * supposed to be a new inode which the cluster doesn't know 1445 * about yet, there is no need to. As far as the LVB handling 1446 * is concerned, this is basically like acquiring an EX lock 1447 * on a resource which has an invalid one -- we'll set it 1448 * valid when we release the EX. */ 1449 1450 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1, 1); 1451 if (ret) { 1452 mlog_errno(ret); 1453 goto bail; 1454 } 1455 1456 /* 1457 * We don't want to use DLM_LKF_LOCAL on a meta data lock as they 1458 * don't use a generation in their lock names. 1459 */ 1460 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_inode_lockres, 1, 0); 1461 if (ret) { 1462 mlog_errno(ret); 1463 goto bail; 1464 } 1465 1466 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_open_lockres, 0, 0); 1467 if (ret) { 1468 mlog_errno(ret); 1469 goto bail; 1470 } 1471 1472 bail: 1473 mlog_exit(ret); 1474 return ret; 1475 } 1476 1477 int ocfs2_rw_lock(struct inode *inode, int write) 1478 { 1479 int status, level; 1480 struct ocfs2_lock_res *lockres; 1481 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1482 1483 BUG_ON(!inode); 1484 1485 mlog_entry_void(); 1486 1487 mlog(0, "inode %llu take %s RW lock\n", 1488 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1489 write ? "EXMODE" : "PRMODE"); 1490 1491 if (ocfs2_mount_local(osb)) 1492 return 0; 1493 1494 lockres = &OCFS2_I(inode)->ip_rw_lockres; 1495 1496 level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1497 1498 status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0, 1499 0); 1500 if (status < 0) 1501 mlog_errno(status); 1502 1503 mlog_exit(status); 1504 return status; 1505 } 1506 1507 void ocfs2_rw_unlock(struct inode *inode, int write) 1508 { 1509 int level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1510 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres; 1511 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1512 1513 mlog_entry_void(); 1514 1515 mlog(0, "inode %llu drop %s RW lock\n", 1516 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1517 write ? "EXMODE" : "PRMODE"); 1518 1519 if (!ocfs2_mount_local(osb)) 1520 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level); 1521 1522 mlog_exit_void(); 1523 } 1524 1525 /* 1526 * ocfs2_open_lock always get PR mode lock. 1527 */ 1528 int ocfs2_open_lock(struct inode *inode) 1529 { 1530 int status = 0; 1531 struct ocfs2_lock_res *lockres; 1532 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1533 1534 BUG_ON(!inode); 1535 1536 mlog_entry_void(); 1537 1538 mlog(0, "inode %llu take PRMODE open lock\n", 1539 (unsigned long long)OCFS2_I(inode)->ip_blkno); 1540 1541 if (ocfs2_mount_local(osb)) 1542 goto out; 1543 1544 lockres = &OCFS2_I(inode)->ip_open_lockres; 1545 1546 status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, 1547 DLM_LOCK_PR, 0, 0); 1548 if (status < 0) 1549 mlog_errno(status); 1550 1551 out: 1552 mlog_exit(status); 1553 return status; 1554 } 1555 1556 int ocfs2_try_open_lock(struct inode *inode, int write) 1557 { 1558 int status = 0, level; 1559 struct ocfs2_lock_res *lockres; 1560 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1561 1562 BUG_ON(!inode); 1563 1564 mlog_entry_void(); 1565 1566 mlog(0, "inode %llu try to take %s open lock\n", 1567 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1568 write ? "EXMODE" : "PRMODE"); 1569 1570 if (ocfs2_mount_local(osb)) 1571 goto out; 1572 1573 lockres = &OCFS2_I(inode)->ip_open_lockres; 1574 1575 level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1576 1577 /* 1578 * The file system may already holding a PRMODE/EXMODE open lock. 1579 * Since we pass DLM_LKF_NOQUEUE, the request won't block waiting on 1580 * other nodes and the -EAGAIN will indicate to the caller that 1581 * this inode is still in use. 1582 */ 1583 status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, 1584 level, DLM_LKF_NOQUEUE, 0); 1585 1586 out: 1587 mlog_exit(status); 1588 return status; 1589 } 1590 1591 /* 1592 * ocfs2_open_unlock unlock PR and EX mode open locks. 1593 */ 1594 void ocfs2_open_unlock(struct inode *inode) 1595 { 1596 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_open_lockres; 1597 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1598 1599 mlog_entry_void(); 1600 1601 mlog(0, "inode %llu drop open lock\n", 1602 (unsigned long long)OCFS2_I(inode)->ip_blkno); 1603 1604 if (ocfs2_mount_local(osb)) 1605 goto out; 1606 1607 if(lockres->l_ro_holders) 1608 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, 1609 DLM_LOCK_PR); 1610 if(lockres->l_ex_holders) 1611 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, 1612 DLM_LOCK_EX); 1613 1614 out: 1615 mlog_exit_void(); 1616 } 1617 1618 static int ocfs2_flock_handle_signal(struct ocfs2_lock_res *lockres, 1619 int level) 1620 { 1621 int ret; 1622 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 1623 unsigned long flags; 1624 struct ocfs2_mask_waiter mw; 1625 1626 ocfs2_init_mask_waiter(&mw); 1627 1628 retry_cancel: 1629 spin_lock_irqsave(&lockres->l_lock, flags); 1630 if (lockres->l_flags & OCFS2_LOCK_BUSY) { 1631 ret = ocfs2_prepare_cancel_convert(osb, lockres); 1632 if (ret) { 1633 spin_unlock_irqrestore(&lockres->l_lock, flags); 1634 ret = ocfs2_cancel_convert(osb, lockres); 1635 if (ret < 0) { 1636 mlog_errno(ret); 1637 goto out; 1638 } 1639 goto retry_cancel; 1640 } 1641 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1642 spin_unlock_irqrestore(&lockres->l_lock, flags); 1643 1644 ocfs2_wait_for_mask(&mw); 1645 goto retry_cancel; 1646 } 1647 1648 ret = -ERESTARTSYS; 1649 /* 1650 * We may still have gotten the lock, in which case there's no 1651 * point to restarting the syscall. 1652 */ 1653 if (lockres->l_level == level) 1654 ret = 0; 1655 1656 mlog(0, "Cancel returning %d. flags: 0x%lx, level: %d, act: %d\n", ret, 1657 lockres->l_flags, lockres->l_level, lockres->l_action); 1658 1659 spin_unlock_irqrestore(&lockres->l_lock, flags); 1660 1661 out: 1662 return ret; 1663 } 1664 1665 /* 1666 * ocfs2_file_lock() and ocfs2_file_unlock() map to a single pair of 1667 * flock() calls. The locking approach this requires is sufficiently 1668 * different from all other cluster lock types that we implement a 1669 * seperate path to the "low-level" dlm calls. In particular: 1670 * 1671 * - No optimization of lock levels is done - we take at exactly 1672 * what's been requested. 1673 * 1674 * - No lock caching is employed. We immediately downconvert to 1675 * no-lock at unlock time. This also means flock locks never go on 1676 * the blocking list). 1677 * 1678 * - Since userspace can trivially deadlock itself with flock, we make 1679 * sure to allow cancellation of a misbehaving applications flock() 1680 * request. 1681 * 1682 * - Access to any flock lockres doesn't require concurrency, so we 1683 * can simplify the code by requiring the caller to guarantee 1684 * serialization of dlmglue flock calls. 1685 */ 1686 int ocfs2_file_lock(struct file *file, int ex, int trylock) 1687 { 1688 int ret, level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 1689 unsigned int lkm_flags = trylock ? DLM_LKF_NOQUEUE : 0; 1690 unsigned long flags; 1691 struct ocfs2_file_private *fp = file->private_data; 1692 struct ocfs2_lock_res *lockres = &fp->fp_flock; 1693 struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb); 1694 struct ocfs2_mask_waiter mw; 1695 1696 ocfs2_init_mask_waiter(&mw); 1697 1698 if ((lockres->l_flags & OCFS2_LOCK_BUSY) || 1699 (lockres->l_level > DLM_LOCK_NL)) { 1700 mlog(ML_ERROR, 1701 "File lock \"%s\" has busy or locked state: flags: 0x%lx, " 1702 "level: %u\n", lockres->l_name, lockres->l_flags, 1703 lockres->l_level); 1704 return -EINVAL; 1705 } 1706 1707 spin_lock_irqsave(&lockres->l_lock, flags); 1708 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 1709 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1710 spin_unlock_irqrestore(&lockres->l_lock, flags); 1711 1712 /* 1713 * Get the lock at NLMODE to start - that way we 1714 * can cancel the upconvert request if need be. 1715 */ 1716 ret = ocfs2_lock_create(osb, lockres, DLM_LOCK_NL, 0); 1717 if (ret < 0) { 1718 mlog_errno(ret); 1719 goto out; 1720 } 1721 1722 ret = ocfs2_wait_for_mask(&mw); 1723 if (ret) { 1724 mlog_errno(ret); 1725 goto out; 1726 } 1727 spin_lock_irqsave(&lockres->l_lock, flags); 1728 } 1729 1730 lockres->l_action = OCFS2_AST_CONVERT; 1731 lkm_flags |= DLM_LKF_CONVERT; 1732 lockres->l_requested = level; 1733 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 1734 1735 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1736 spin_unlock_irqrestore(&lockres->l_lock, flags); 1737 1738 ret = ocfs2_dlm_lock(osb->cconn, level, &lockres->l_lksb, lkm_flags, 1739 lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1, 1740 lockres); 1741 if (ret) { 1742 if (!trylock || (ret != -EAGAIN)) { 1743 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 1744 ret = -EINVAL; 1745 } 1746 1747 ocfs2_recover_from_dlm_error(lockres, 1); 1748 lockres_remove_mask_waiter(lockres, &mw); 1749 goto out; 1750 } 1751 1752 ret = ocfs2_wait_for_mask_interruptible(&mw, lockres); 1753 if (ret == -ERESTARTSYS) { 1754 /* 1755 * Userspace can cause deadlock itself with 1756 * flock(). Current behavior locally is to allow the 1757 * deadlock, but abort the system call if a signal is 1758 * received. We follow this example, otherwise a 1759 * poorly written program could sit in kernel until 1760 * reboot. 1761 * 1762 * Handling this is a bit more complicated for Ocfs2 1763 * though. We can't exit this function with an 1764 * outstanding lock request, so a cancel convert is 1765 * required. We intentionally overwrite 'ret' - if the 1766 * cancel fails and the lock was granted, it's easier 1767 * to just bubble sucess back up to the user. 1768 */ 1769 ret = ocfs2_flock_handle_signal(lockres, level); 1770 } else if (!ret && (level > lockres->l_level)) { 1771 /* Trylock failed asynchronously */ 1772 BUG_ON(!trylock); 1773 ret = -EAGAIN; 1774 } 1775 1776 out: 1777 1778 mlog(0, "Lock: \"%s\" ex: %d, trylock: %d, returns: %d\n", 1779 lockres->l_name, ex, trylock, ret); 1780 return ret; 1781 } 1782 1783 void ocfs2_file_unlock(struct file *file) 1784 { 1785 int ret; 1786 unsigned int gen; 1787 unsigned long flags; 1788 struct ocfs2_file_private *fp = file->private_data; 1789 struct ocfs2_lock_res *lockres = &fp->fp_flock; 1790 struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb); 1791 struct ocfs2_mask_waiter mw; 1792 1793 ocfs2_init_mask_waiter(&mw); 1794 1795 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) 1796 return; 1797 1798 if (lockres->l_level == DLM_LOCK_NL) 1799 return; 1800 1801 mlog(0, "Unlock: \"%s\" flags: 0x%lx, level: %d, act: %d\n", 1802 lockres->l_name, lockres->l_flags, lockres->l_level, 1803 lockres->l_action); 1804 1805 spin_lock_irqsave(&lockres->l_lock, flags); 1806 /* 1807 * Fake a blocking ast for the downconvert code. 1808 */ 1809 lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED); 1810 lockres->l_blocking = DLM_LOCK_EX; 1811 1812 gen = ocfs2_prepare_downconvert(lockres, DLM_LOCK_NL); 1813 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1814 spin_unlock_irqrestore(&lockres->l_lock, flags); 1815 1816 ret = ocfs2_downconvert_lock(osb, lockres, DLM_LOCK_NL, 0, gen); 1817 if (ret) { 1818 mlog_errno(ret); 1819 return; 1820 } 1821 1822 ret = ocfs2_wait_for_mask(&mw); 1823 if (ret) 1824 mlog_errno(ret); 1825 } 1826 1827 static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb, 1828 struct ocfs2_lock_res *lockres) 1829 { 1830 int kick = 0; 1831 1832 mlog_entry_void(); 1833 1834 /* If we know that another node is waiting on our lock, kick 1835 * the downconvert thread * pre-emptively when we reach a release 1836 * condition. */ 1837 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) { 1838 switch(lockres->l_blocking) { 1839 case DLM_LOCK_EX: 1840 if (!lockres->l_ex_holders && !lockres->l_ro_holders) 1841 kick = 1; 1842 break; 1843 case DLM_LOCK_PR: 1844 if (!lockres->l_ex_holders) 1845 kick = 1; 1846 break; 1847 default: 1848 BUG(); 1849 } 1850 } 1851 1852 if (kick) 1853 ocfs2_wake_downconvert_thread(osb); 1854 1855 mlog_exit_void(); 1856 } 1857 1858 #define OCFS2_SEC_BITS 34 1859 #define OCFS2_SEC_SHIFT (64 - 34) 1860 #define OCFS2_NSEC_MASK ((1ULL << OCFS2_SEC_SHIFT) - 1) 1861 1862 /* LVB only has room for 64 bits of time here so we pack it for 1863 * now. */ 1864 static u64 ocfs2_pack_timespec(struct timespec *spec) 1865 { 1866 u64 res; 1867 u64 sec = spec->tv_sec; 1868 u32 nsec = spec->tv_nsec; 1869 1870 res = (sec << OCFS2_SEC_SHIFT) | (nsec & OCFS2_NSEC_MASK); 1871 1872 return res; 1873 } 1874 1875 /* Call this with the lockres locked. I am reasonably sure we don't 1876 * need ip_lock in this function as anyone who would be changing those 1877 * values is supposed to be blocked in ocfs2_inode_lock right now. */ 1878 static void __ocfs2_stuff_meta_lvb(struct inode *inode) 1879 { 1880 struct ocfs2_inode_info *oi = OCFS2_I(inode); 1881 struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; 1882 struct ocfs2_meta_lvb *lvb; 1883 1884 mlog_entry_void(); 1885 1886 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 1887 1888 /* 1889 * Invalidate the LVB of a deleted inode - this way other 1890 * nodes are forced to go to disk and discover the new inode 1891 * status. 1892 */ 1893 if (oi->ip_flags & OCFS2_INODE_DELETED) { 1894 lvb->lvb_version = 0; 1895 goto out; 1896 } 1897 1898 lvb->lvb_version = OCFS2_LVB_VERSION; 1899 lvb->lvb_isize = cpu_to_be64(i_size_read(inode)); 1900 lvb->lvb_iclusters = cpu_to_be32(oi->ip_clusters); 1901 lvb->lvb_iuid = cpu_to_be32(inode->i_uid); 1902 lvb->lvb_igid = cpu_to_be32(inode->i_gid); 1903 lvb->lvb_imode = cpu_to_be16(inode->i_mode); 1904 lvb->lvb_inlink = cpu_to_be16(inode->i_nlink); 1905 lvb->lvb_iatime_packed = 1906 cpu_to_be64(ocfs2_pack_timespec(&inode->i_atime)); 1907 lvb->lvb_ictime_packed = 1908 cpu_to_be64(ocfs2_pack_timespec(&inode->i_ctime)); 1909 lvb->lvb_imtime_packed = 1910 cpu_to_be64(ocfs2_pack_timespec(&inode->i_mtime)); 1911 lvb->lvb_iattr = cpu_to_be32(oi->ip_attr); 1912 lvb->lvb_idynfeatures = cpu_to_be16(oi->ip_dyn_features); 1913 lvb->lvb_igeneration = cpu_to_be32(inode->i_generation); 1914 1915 out: 1916 mlog_meta_lvb(0, lockres); 1917 1918 mlog_exit_void(); 1919 } 1920 1921 static void ocfs2_unpack_timespec(struct timespec *spec, 1922 u64 packed_time) 1923 { 1924 spec->tv_sec = packed_time >> OCFS2_SEC_SHIFT; 1925 spec->tv_nsec = packed_time & OCFS2_NSEC_MASK; 1926 } 1927 1928 static void ocfs2_refresh_inode_from_lvb(struct inode *inode) 1929 { 1930 struct ocfs2_inode_info *oi = OCFS2_I(inode); 1931 struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; 1932 struct ocfs2_meta_lvb *lvb; 1933 1934 mlog_entry_void(); 1935 1936 mlog_meta_lvb(0, lockres); 1937 1938 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 1939 1940 /* We're safe here without the lockres lock... */ 1941 spin_lock(&oi->ip_lock); 1942 oi->ip_clusters = be32_to_cpu(lvb->lvb_iclusters); 1943 i_size_write(inode, be64_to_cpu(lvb->lvb_isize)); 1944 1945 oi->ip_attr = be32_to_cpu(lvb->lvb_iattr); 1946 oi->ip_dyn_features = be16_to_cpu(lvb->lvb_idynfeatures); 1947 ocfs2_set_inode_flags(inode); 1948 1949 /* fast-symlinks are a special case */ 1950 if (S_ISLNK(inode->i_mode) && !oi->ip_clusters) 1951 inode->i_blocks = 0; 1952 else 1953 inode->i_blocks = ocfs2_inode_sector_count(inode); 1954 1955 inode->i_uid = be32_to_cpu(lvb->lvb_iuid); 1956 inode->i_gid = be32_to_cpu(lvb->lvb_igid); 1957 inode->i_mode = be16_to_cpu(lvb->lvb_imode); 1958 inode->i_nlink = be16_to_cpu(lvb->lvb_inlink); 1959 ocfs2_unpack_timespec(&inode->i_atime, 1960 be64_to_cpu(lvb->lvb_iatime_packed)); 1961 ocfs2_unpack_timespec(&inode->i_mtime, 1962 be64_to_cpu(lvb->lvb_imtime_packed)); 1963 ocfs2_unpack_timespec(&inode->i_ctime, 1964 be64_to_cpu(lvb->lvb_ictime_packed)); 1965 spin_unlock(&oi->ip_lock); 1966 1967 mlog_exit_void(); 1968 } 1969 1970 static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode, 1971 struct ocfs2_lock_res *lockres) 1972 { 1973 struct ocfs2_meta_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 1974 1975 if (lvb->lvb_version == OCFS2_LVB_VERSION 1976 && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation) 1977 return 1; 1978 return 0; 1979 } 1980 1981 /* Determine whether a lock resource needs to be refreshed, and 1982 * arbitrate who gets to refresh it. 1983 * 1984 * 0 means no refresh needed. 1985 * 1986 * > 0 means you need to refresh this and you MUST call 1987 * ocfs2_complete_lock_res_refresh afterwards. */ 1988 static int ocfs2_should_refresh_lock_res(struct ocfs2_lock_res *lockres) 1989 { 1990 unsigned long flags; 1991 int status = 0; 1992 1993 mlog_entry_void(); 1994 1995 refresh_check: 1996 spin_lock_irqsave(&lockres->l_lock, flags); 1997 if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) { 1998 spin_unlock_irqrestore(&lockres->l_lock, flags); 1999 goto bail; 2000 } 2001 2002 if (lockres->l_flags & OCFS2_LOCK_REFRESHING) { 2003 spin_unlock_irqrestore(&lockres->l_lock, flags); 2004 2005 ocfs2_wait_on_refreshing_lock(lockres); 2006 goto refresh_check; 2007 } 2008 2009 /* Ok, I'll be the one to refresh this lock. */ 2010 lockres_or_flags(lockres, OCFS2_LOCK_REFRESHING); 2011 spin_unlock_irqrestore(&lockres->l_lock, flags); 2012 2013 status = 1; 2014 bail: 2015 mlog_exit(status); 2016 return status; 2017 } 2018 2019 /* If status is non zero, I'll mark it as not being in refresh 2020 * anymroe, but i won't clear the needs refresh flag. */ 2021 static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockres, 2022 int status) 2023 { 2024 unsigned long flags; 2025 mlog_entry_void(); 2026 2027 spin_lock_irqsave(&lockres->l_lock, flags); 2028 lockres_clear_flags(lockres, OCFS2_LOCK_REFRESHING); 2029 if (!status) 2030 lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 2031 spin_unlock_irqrestore(&lockres->l_lock, flags); 2032 2033 wake_up(&lockres->l_event); 2034 2035 mlog_exit_void(); 2036 } 2037 2038 /* may or may not return a bh if it went to disk. */ 2039 static int ocfs2_inode_lock_update(struct inode *inode, 2040 struct buffer_head **bh) 2041 { 2042 int status = 0; 2043 struct ocfs2_inode_info *oi = OCFS2_I(inode); 2044 struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; 2045 struct ocfs2_dinode *fe; 2046 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 2047 2048 mlog_entry_void(); 2049 2050 if (ocfs2_mount_local(osb)) 2051 goto bail; 2052 2053 spin_lock(&oi->ip_lock); 2054 if (oi->ip_flags & OCFS2_INODE_DELETED) { 2055 mlog(0, "Orphaned inode %llu was deleted while we " 2056 "were waiting on a lock. ip_flags = 0x%x\n", 2057 (unsigned long long)oi->ip_blkno, oi->ip_flags); 2058 spin_unlock(&oi->ip_lock); 2059 status = -ENOENT; 2060 goto bail; 2061 } 2062 spin_unlock(&oi->ip_lock); 2063 2064 if (!ocfs2_should_refresh_lock_res(lockres)) 2065 goto bail; 2066 2067 /* This will discard any caching information we might have had 2068 * for the inode metadata. */ 2069 ocfs2_metadata_cache_purge(inode); 2070 2071 ocfs2_extent_map_trunc(inode, 0); 2072 2073 if (ocfs2_meta_lvb_is_trustable(inode, lockres)) { 2074 mlog(0, "Trusting LVB on inode %llu\n", 2075 (unsigned long long)oi->ip_blkno); 2076 ocfs2_refresh_inode_from_lvb(inode); 2077 } else { 2078 /* Boo, we have to go to disk. */ 2079 /* read bh, cast, ocfs2_refresh_inode */ 2080 status = ocfs2_read_inode_block(inode, bh); 2081 if (status < 0) { 2082 mlog_errno(status); 2083 goto bail_refresh; 2084 } 2085 fe = (struct ocfs2_dinode *) (*bh)->b_data; 2086 2087 /* This is a good chance to make sure we're not 2088 * locking an invalid object. ocfs2_read_inode_block() 2089 * already checked that the inode block is sane. 2090 * 2091 * We bug on a stale inode here because we checked 2092 * above whether it was wiped from disk. The wiping 2093 * node provides a guarantee that we receive that 2094 * message and can mark the inode before dropping any 2095 * locks associated with it. */ 2096 mlog_bug_on_msg(inode->i_generation != 2097 le32_to_cpu(fe->i_generation), 2098 "Invalid dinode %llu disk generation: %u " 2099 "inode->i_generation: %u\n", 2100 (unsigned long long)oi->ip_blkno, 2101 le32_to_cpu(fe->i_generation), 2102 inode->i_generation); 2103 mlog_bug_on_msg(le64_to_cpu(fe->i_dtime) || 2104 !(fe->i_flags & cpu_to_le32(OCFS2_VALID_FL)), 2105 "Stale dinode %llu dtime: %llu flags: 0x%x\n", 2106 (unsigned long long)oi->ip_blkno, 2107 (unsigned long long)le64_to_cpu(fe->i_dtime), 2108 le32_to_cpu(fe->i_flags)); 2109 2110 ocfs2_refresh_inode(inode, fe); 2111 ocfs2_track_lock_refresh(lockres); 2112 } 2113 2114 status = 0; 2115 bail_refresh: 2116 ocfs2_complete_lock_res_refresh(lockres, status); 2117 bail: 2118 mlog_exit(status); 2119 return status; 2120 } 2121 2122 static int ocfs2_assign_bh(struct inode *inode, 2123 struct buffer_head **ret_bh, 2124 struct buffer_head *passed_bh) 2125 { 2126 int status; 2127 2128 if (passed_bh) { 2129 /* Ok, the update went to disk for us, use the 2130 * returned bh. */ 2131 *ret_bh = passed_bh; 2132 get_bh(*ret_bh); 2133 2134 return 0; 2135 } 2136 2137 status = ocfs2_read_inode_block(inode, ret_bh); 2138 if (status < 0) 2139 mlog_errno(status); 2140 2141 return status; 2142 } 2143 2144 /* 2145 * returns < 0 error if the callback will never be called, otherwise 2146 * the result of the lock will be communicated via the callback. 2147 */ 2148 int ocfs2_inode_lock_full(struct inode *inode, 2149 struct buffer_head **ret_bh, 2150 int ex, 2151 int arg_flags) 2152 { 2153 int status, level, acquired; 2154 u32 dlm_flags; 2155 struct ocfs2_lock_res *lockres = NULL; 2156 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 2157 struct buffer_head *local_bh = NULL; 2158 2159 BUG_ON(!inode); 2160 2161 mlog_entry_void(); 2162 2163 mlog(0, "inode %llu, take %s META lock\n", 2164 (unsigned long long)OCFS2_I(inode)->ip_blkno, 2165 ex ? "EXMODE" : "PRMODE"); 2166 2167 status = 0; 2168 acquired = 0; 2169 /* We'll allow faking a readonly metadata lock for 2170 * rodevices. */ 2171 if (ocfs2_is_hard_readonly(osb)) { 2172 if (ex) 2173 status = -EROFS; 2174 goto bail; 2175 } 2176 2177 if (ocfs2_mount_local(osb)) 2178 goto local; 2179 2180 if (!(arg_flags & OCFS2_META_LOCK_RECOVERY)) 2181 ocfs2_wait_for_recovery(osb); 2182 2183 lockres = &OCFS2_I(inode)->ip_inode_lockres; 2184 level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2185 dlm_flags = 0; 2186 if (arg_flags & OCFS2_META_LOCK_NOQUEUE) 2187 dlm_flags |= DLM_LKF_NOQUEUE; 2188 2189 status = ocfs2_cluster_lock(osb, lockres, level, dlm_flags, arg_flags); 2190 if (status < 0) { 2191 if (status != -EAGAIN && status != -EIOCBRETRY) 2192 mlog_errno(status); 2193 goto bail; 2194 } 2195 2196 /* Notify the error cleanup path to drop the cluster lock. */ 2197 acquired = 1; 2198 2199 /* We wait twice because a node may have died while we were in 2200 * the lower dlm layers. The second time though, we've 2201 * committed to owning this lock so we don't allow signals to 2202 * abort the operation. */ 2203 if (!(arg_flags & OCFS2_META_LOCK_RECOVERY)) 2204 ocfs2_wait_for_recovery(osb); 2205 2206 local: 2207 /* 2208 * We only see this flag if we're being called from 2209 * ocfs2_read_locked_inode(). It means we're locking an inode 2210 * which hasn't been populated yet, so clear the refresh flag 2211 * and let the caller handle it. 2212 */ 2213 if (inode->i_state & I_NEW) { 2214 status = 0; 2215 if (lockres) 2216 ocfs2_complete_lock_res_refresh(lockres, 0); 2217 goto bail; 2218 } 2219 2220 /* This is fun. The caller may want a bh back, or it may 2221 * not. ocfs2_inode_lock_update definitely wants one in, but 2222 * may or may not read one, depending on what's in the 2223 * LVB. The result of all of this is that we've *only* gone to 2224 * disk if we have to, so the complexity is worthwhile. */ 2225 status = ocfs2_inode_lock_update(inode, &local_bh); 2226 if (status < 0) { 2227 if (status != -ENOENT) 2228 mlog_errno(status); 2229 goto bail; 2230 } 2231 2232 if (ret_bh) { 2233 status = ocfs2_assign_bh(inode, ret_bh, local_bh); 2234 if (status < 0) { 2235 mlog_errno(status); 2236 goto bail; 2237 } 2238 } 2239 2240 bail: 2241 if (status < 0) { 2242 if (ret_bh && (*ret_bh)) { 2243 brelse(*ret_bh); 2244 *ret_bh = NULL; 2245 } 2246 if (acquired) 2247 ocfs2_inode_unlock(inode, ex); 2248 } 2249 2250 if (local_bh) 2251 brelse(local_bh); 2252 2253 mlog_exit(status); 2254 return status; 2255 } 2256 2257 /* 2258 * This is working around a lock inversion between tasks acquiring DLM 2259 * locks while holding a page lock and the downconvert thread which 2260 * blocks dlm lock acquiry while acquiring page locks. 2261 * 2262 * ** These _with_page variantes are only intended to be called from aop 2263 * methods that hold page locks and return a very specific *positive* error 2264 * code that aop methods pass up to the VFS -- test for errors with != 0. ** 2265 * 2266 * The DLM is called such that it returns -EAGAIN if it would have 2267 * blocked waiting for the downconvert thread. In that case we unlock 2268 * our page so the downconvert thread can make progress. Once we've 2269 * done this we have to return AOP_TRUNCATED_PAGE so the aop method 2270 * that called us can bubble that back up into the VFS who will then 2271 * immediately retry the aop call. 2272 * 2273 * We do a blocking lock and immediate unlock before returning, though, so that 2274 * the lock has a great chance of being cached on this node by the time the VFS 2275 * calls back to retry the aop. This has a potential to livelock as nodes 2276 * ping locks back and forth, but that's a risk we're willing to take to avoid 2277 * the lock inversion simply. 2278 */ 2279 int ocfs2_inode_lock_with_page(struct inode *inode, 2280 struct buffer_head **ret_bh, 2281 int ex, 2282 struct page *page) 2283 { 2284 int ret; 2285 2286 ret = ocfs2_inode_lock_full(inode, ret_bh, ex, OCFS2_LOCK_NONBLOCK); 2287 if (ret == -EAGAIN) { 2288 unlock_page(page); 2289 if (ocfs2_inode_lock(inode, ret_bh, ex) == 0) 2290 ocfs2_inode_unlock(inode, ex); 2291 ret = AOP_TRUNCATED_PAGE; 2292 } 2293 2294 return ret; 2295 } 2296 2297 int ocfs2_inode_lock_atime(struct inode *inode, 2298 struct vfsmount *vfsmnt, 2299 int *level) 2300 { 2301 int ret; 2302 2303 mlog_entry_void(); 2304 ret = ocfs2_inode_lock(inode, NULL, 0); 2305 if (ret < 0) { 2306 mlog_errno(ret); 2307 return ret; 2308 } 2309 2310 /* 2311 * If we should update atime, we will get EX lock, 2312 * otherwise we just get PR lock. 2313 */ 2314 if (ocfs2_should_update_atime(inode, vfsmnt)) { 2315 struct buffer_head *bh = NULL; 2316 2317 ocfs2_inode_unlock(inode, 0); 2318 ret = ocfs2_inode_lock(inode, &bh, 1); 2319 if (ret < 0) { 2320 mlog_errno(ret); 2321 return ret; 2322 } 2323 *level = 1; 2324 if (ocfs2_should_update_atime(inode, vfsmnt)) 2325 ocfs2_update_inode_atime(inode, bh); 2326 if (bh) 2327 brelse(bh); 2328 } else 2329 *level = 0; 2330 2331 mlog_exit(ret); 2332 return ret; 2333 } 2334 2335 void ocfs2_inode_unlock(struct inode *inode, 2336 int ex) 2337 { 2338 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2339 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_inode_lockres; 2340 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 2341 2342 mlog_entry_void(); 2343 2344 mlog(0, "inode %llu drop %s META lock\n", 2345 (unsigned long long)OCFS2_I(inode)->ip_blkno, 2346 ex ? "EXMODE" : "PRMODE"); 2347 2348 if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)) && 2349 !ocfs2_mount_local(osb)) 2350 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level); 2351 2352 mlog_exit_void(); 2353 } 2354 2355 int ocfs2_super_lock(struct ocfs2_super *osb, 2356 int ex) 2357 { 2358 int status = 0; 2359 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2360 struct ocfs2_lock_res *lockres = &osb->osb_super_lockres; 2361 2362 mlog_entry_void(); 2363 2364 if (ocfs2_is_hard_readonly(osb)) 2365 return -EROFS; 2366 2367 if (ocfs2_mount_local(osb)) 2368 goto bail; 2369 2370 status = ocfs2_cluster_lock(osb, lockres, level, 0, 0); 2371 if (status < 0) { 2372 mlog_errno(status); 2373 goto bail; 2374 } 2375 2376 /* The super block lock path is really in the best position to 2377 * know when resources covered by the lock need to be 2378 * refreshed, so we do it here. Of course, making sense of 2379 * everything is up to the caller :) */ 2380 status = ocfs2_should_refresh_lock_res(lockres); 2381 if (status < 0) { 2382 mlog_errno(status); 2383 goto bail; 2384 } 2385 if (status) { 2386 status = ocfs2_refresh_slot_info(osb); 2387 2388 ocfs2_complete_lock_res_refresh(lockres, status); 2389 2390 if (status < 0) 2391 mlog_errno(status); 2392 ocfs2_track_lock_refresh(lockres); 2393 } 2394 bail: 2395 mlog_exit(status); 2396 return status; 2397 } 2398 2399 void ocfs2_super_unlock(struct ocfs2_super *osb, 2400 int ex) 2401 { 2402 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2403 struct ocfs2_lock_res *lockres = &osb->osb_super_lockres; 2404 2405 if (!ocfs2_mount_local(osb)) 2406 ocfs2_cluster_unlock(osb, lockres, level); 2407 } 2408 2409 int ocfs2_rename_lock(struct ocfs2_super *osb) 2410 { 2411 int status; 2412 struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres; 2413 2414 if (ocfs2_is_hard_readonly(osb)) 2415 return -EROFS; 2416 2417 if (ocfs2_mount_local(osb)) 2418 return 0; 2419 2420 status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0); 2421 if (status < 0) 2422 mlog_errno(status); 2423 2424 return status; 2425 } 2426 2427 void ocfs2_rename_unlock(struct ocfs2_super *osb) 2428 { 2429 struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres; 2430 2431 if (!ocfs2_mount_local(osb)) 2432 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX); 2433 } 2434 2435 int ocfs2_nfs_sync_lock(struct ocfs2_super *osb, int ex) 2436 { 2437 int status; 2438 struct ocfs2_lock_res *lockres = &osb->osb_nfs_sync_lockres; 2439 2440 if (ocfs2_is_hard_readonly(osb)) 2441 return -EROFS; 2442 2443 if (ocfs2_mount_local(osb)) 2444 return 0; 2445 2446 status = ocfs2_cluster_lock(osb, lockres, ex ? LKM_EXMODE : LKM_PRMODE, 2447 0, 0); 2448 if (status < 0) 2449 mlog(ML_ERROR, "lock on nfs sync lock failed %d\n", status); 2450 2451 return status; 2452 } 2453 2454 void ocfs2_nfs_sync_unlock(struct ocfs2_super *osb, int ex) 2455 { 2456 struct ocfs2_lock_res *lockres = &osb->osb_nfs_sync_lockres; 2457 2458 if (!ocfs2_mount_local(osb)) 2459 ocfs2_cluster_unlock(osb, lockres, 2460 ex ? LKM_EXMODE : LKM_PRMODE); 2461 } 2462 2463 int ocfs2_dentry_lock(struct dentry *dentry, int ex) 2464 { 2465 int ret; 2466 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2467 struct ocfs2_dentry_lock *dl = dentry->d_fsdata; 2468 struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb); 2469 2470 BUG_ON(!dl); 2471 2472 if (ocfs2_is_hard_readonly(osb)) 2473 return -EROFS; 2474 2475 if (ocfs2_mount_local(osb)) 2476 return 0; 2477 2478 ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0); 2479 if (ret < 0) 2480 mlog_errno(ret); 2481 2482 return ret; 2483 } 2484 2485 void ocfs2_dentry_unlock(struct dentry *dentry, int ex) 2486 { 2487 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2488 struct ocfs2_dentry_lock *dl = dentry->d_fsdata; 2489 struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb); 2490 2491 if (!ocfs2_mount_local(osb)) 2492 ocfs2_cluster_unlock(osb, &dl->dl_lockres, level); 2493 } 2494 2495 /* Reference counting of the dlm debug structure. We want this because 2496 * open references on the debug inodes can live on after a mount, so 2497 * we can't rely on the ocfs2_super to always exist. */ 2498 static void ocfs2_dlm_debug_free(struct kref *kref) 2499 { 2500 struct ocfs2_dlm_debug *dlm_debug; 2501 2502 dlm_debug = container_of(kref, struct ocfs2_dlm_debug, d_refcnt); 2503 2504 kfree(dlm_debug); 2505 } 2506 2507 void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug) 2508 { 2509 if (dlm_debug) 2510 kref_put(&dlm_debug->d_refcnt, ocfs2_dlm_debug_free); 2511 } 2512 2513 static void ocfs2_get_dlm_debug(struct ocfs2_dlm_debug *debug) 2514 { 2515 kref_get(&debug->d_refcnt); 2516 } 2517 2518 struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void) 2519 { 2520 struct ocfs2_dlm_debug *dlm_debug; 2521 2522 dlm_debug = kmalloc(sizeof(struct ocfs2_dlm_debug), GFP_KERNEL); 2523 if (!dlm_debug) { 2524 mlog_errno(-ENOMEM); 2525 goto out; 2526 } 2527 2528 kref_init(&dlm_debug->d_refcnt); 2529 INIT_LIST_HEAD(&dlm_debug->d_lockres_tracking); 2530 dlm_debug->d_locking_state = NULL; 2531 out: 2532 return dlm_debug; 2533 } 2534 2535 /* Access to this is arbitrated for us via seq_file->sem. */ 2536 struct ocfs2_dlm_seq_priv { 2537 struct ocfs2_dlm_debug *p_dlm_debug; 2538 struct ocfs2_lock_res p_iter_res; 2539 struct ocfs2_lock_res p_tmp_res; 2540 }; 2541 2542 static struct ocfs2_lock_res *ocfs2_dlm_next_res(struct ocfs2_lock_res *start, 2543 struct ocfs2_dlm_seq_priv *priv) 2544 { 2545 struct ocfs2_lock_res *iter, *ret = NULL; 2546 struct ocfs2_dlm_debug *dlm_debug = priv->p_dlm_debug; 2547 2548 assert_spin_locked(&ocfs2_dlm_tracking_lock); 2549 2550 list_for_each_entry(iter, &start->l_debug_list, l_debug_list) { 2551 /* discover the head of the list */ 2552 if (&iter->l_debug_list == &dlm_debug->d_lockres_tracking) { 2553 mlog(0, "End of list found, %p\n", ret); 2554 break; 2555 } 2556 2557 /* We track our "dummy" iteration lockres' by a NULL 2558 * l_ops field. */ 2559 if (iter->l_ops != NULL) { 2560 ret = iter; 2561 break; 2562 } 2563 } 2564 2565 return ret; 2566 } 2567 2568 static void *ocfs2_dlm_seq_start(struct seq_file *m, loff_t *pos) 2569 { 2570 struct ocfs2_dlm_seq_priv *priv = m->private; 2571 struct ocfs2_lock_res *iter; 2572 2573 spin_lock(&ocfs2_dlm_tracking_lock); 2574 iter = ocfs2_dlm_next_res(&priv->p_iter_res, priv); 2575 if (iter) { 2576 /* Since lockres' have the lifetime of their container 2577 * (which can be inodes, ocfs2_supers, etc) we want to 2578 * copy this out to a temporary lockres while still 2579 * under the spinlock. Obviously after this we can't 2580 * trust any pointers on the copy returned, but that's 2581 * ok as the information we want isn't typically held 2582 * in them. */ 2583 priv->p_tmp_res = *iter; 2584 iter = &priv->p_tmp_res; 2585 } 2586 spin_unlock(&ocfs2_dlm_tracking_lock); 2587 2588 return iter; 2589 } 2590 2591 static void ocfs2_dlm_seq_stop(struct seq_file *m, void *v) 2592 { 2593 } 2594 2595 static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos) 2596 { 2597 struct ocfs2_dlm_seq_priv *priv = m->private; 2598 struct ocfs2_lock_res *iter = v; 2599 struct ocfs2_lock_res *dummy = &priv->p_iter_res; 2600 2601 spin_lock(&ocfs2_dlm_tracking_lock); 2602 iter = ocfs2_dlm_next_res(iter, priv); 2603 list_del_init(&dummy->l_debug_list); 2604 if (iter) { 2605 list_add(&dummy->l_debug_list, &iter->l_debug_list); 2606 priv->p_tmp_res = *iter; 2607 iter = &priv->p_tmp_res; 2608 } 2609 spin_unlock(&ocfs2_dlm_tracking_lock); 2610 2611 return iter; 2612 } 2613 2614 /* So that debugfs.ocfs2 can determine which format is being used */ 2615 #define OCFS2_DLM_DEBUG_STR_VERSION 2 2616 static int ocfs2_dlm_seq_show(struct seq_file *m, void *v) 2617 { 2618 int i; 2619 char *lvb; 2620 struct ocfs2_lock_res *lockres = v; 2621 2622 if (!lockres) 2623 return -EINVAL; 2624 2625 seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION); 2626 2627 if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY) 2628 seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1, 2629 lockres->l_name, 2630 (unsigned int)ocfs2_get_dentry_lock_ino(lockres)); 2631 else 2632 seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name); 2633 2634 seq_printf(m, "%d\t" 2635 "0x%lx\t" 2636 "0x%x\t" 2637 "0x%x\t" 2638 "%u\t" 2639 "%u\t" 2640 "%d\t" 2641 "%d\t", 2642 lockres->l_level, 2643 lockres->l_flags, 2644 lockres->l_action, 2645 lockres->l_unlock_action, 2646 lockres->l_ro_holders, 2647 lockres->l_ex_holders, 2648 lockres->l_requested, 2649 lockres->l_blocking); 2650 2651 /* Dump the raw LVB */ 2652 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2653 for(i = 0; i < DLM_LVB_LEN; i++) 2654 seq_printf(m, "0x%x\t", lvb[i]); 2655 2656 #ifdef CONFIG_OCFS2_FS_STATS 2657 # define lock_num_prmode(_l) (_l)->l_lock_num_prmode 2658 # define lock_num_exmode(_l) (_l)->l_lock_num_exmode 2659 # define lock_num_prmode_failed(_l) (_l)->l_lock_num_prmode_failed 2660 # define lock_num_exmode_failed(_l) (_l)->l_lock_num_exmode_failed 2661 # define lock_total_prmode(_l) (_l)->l_lock_total_prmode 2662 # define lock_total_exmode(_l) (_l)->l_lock_total_exmode 2663 # define lock_max_prmode(_l) (_l)->l_lock_max_prmode 2664 # define lock_max_exmode(_l) (_l)->l_lock_max_exmode 2665 # define lock_refresh(_l) (_l)->l_lock_refresh 2666 #else 2667 # define lock_num_prmode(_l) (0ULL) 2668 # define lock_num_exmode(_l) (0ULL) 2669 # define lock_num_prmode_failed(_l) (0) 2670 # define lock_num_exmode_failed(_l) (0) 2671 # define lock_total_prmode(_l) (0ULL) 2672 # define lock_total_exmode(_l) (0ULL) 2673 # define lock_max_prmode(_l) (0) 2674 # define lock_max_exmode(_l) (0) 2675 # define lock_refresh(_l) (0) 2676 #endif 2677 /* The following seq_print was added in version 2 of this output */ 2678 seq_printf(m, "%llu\t" 2679 "%llu\t" 2680 "%u\t" 2681 "%u\t" 2682 "%llu\t" 2683 "%llu\t" 2684 "%u\t" 2685 "%u\t" 2686 "%u\t", 2687 lock_num_prmode(lockres), 2688 lock_num_exmode(lockres), 2689 lock_num_prmode_failed(lockres), 2690 lock_num_exmode_failed(lockres), 2691 lock_total_prmode(lockres), 2692 lock_total_exmode(lockres), 2693 lock_max_prmode(lockres), 2694 lock_max_exmode(lockres), 2695 lock_refresh(lockres)); 2696 2697 /* End the line */ 2698 seq_printf(m, "\n"); 2699 return 0; 2700 } 2701 2702 static const struct seq_operations ocfs2_dlm_seq_ops = { 2703 .start = ocfs2_dlm_seq_start, 2704 .stop = ocfs2_dlm_seq_stop, 2705 .next = ocfs2_dlm_seq_next, 2706 .show = ocfs2_dlm_seq_show, 2707 }; 2708 2709 static int ocfs2_dlm_debug_release(struct inode *inode, struct file *file) 2710 { 2711 struct seq_file *seq = (struct seq_file *) file->private_data; 2712 struct ocfs2_dlm_seq_priv *priv = seq->private; 2713 struct ocfs2_lock_res *res = &priv->p_iter_res; 2714 2715 ocfs2_remove_lockres_tracking(res); 2716 ocfs2_put_dlm_debug(priv->p_dlm_debug); 2717 return seq_release_private(inode, file); 2718 } 2719 2720 static int ocfs2_dlm_debug_open(struct inode *inode, struct file *file) 2721 { 2722 int ret; 2723 struct ocfs2_dlm_seq_priv *priv; 2724 struct seq_file *seq; 2725 struct ocfs2_super *osb; 2726 2727 priv = kzalloc(sizeof(struct ocfs2_dlm_seq_priv), GFP_KERNEL); 2728 if (!priv) { 2729 ret = -ENOMEM; 2730 mlog_errno(ret); 2731 goto out; 2732 } 2733 osb = inode->i_private; 2734 ocfs2_get_dlm_debug(osb->osb_dlm_debug); 2735 priv->p_dlm_debug = osb->osb_dlm_debug; 2736 INIT_LIST_HEAD(&priv->p_iter_res.l_debug_list); 2737 2738 ret = seq_open(file, &ocfs2_dlm_seq_ops); 2739 if (ret) { 2740 kfree(priv); 2741 mlog_errno(ret); 2742 goto out; 2743 } 2744 2745 seq = (struct seq_file *) file->private_data; 2746 seq->private = priv; 2747 2748 ocfs2_add_lockres_tracking(&priv->p_iter_res, 2749 priv->p_dlm_debug); 2750 2751 out: 2752 return ret; 2753 } 2754 2755 static const struct file_operations ocfs2_dlm_debug_fops = { 2756 .open = ocfs2_dlm_debug_open, 2757 .release = ocfs2_dlm_debug_release, 2758 .read = seq_read, 2759 .llseek = seq_lseek, 2760 }; 2761 2762 static int ocfs2_dlm_init_debug(struct ocfs2_super *osb) 2763 { 2764 int ret = 0; 2765 struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug; 2766 2767 dlm_debug->d_locking_state = debugfs_create_file("locking_state", 2768 S_IFREG|S_IRUSR, 2769 osb->osb_debug_root, 2770 osb, 2771 &ocfs2_dlm_debug_fops); 2772 if (!dlm_debug->d_locking_state) { 2773 ret = -EINVAL; 2774 mlog(ML_ERROR, 2775 "Unable to create locking state debugfs file.\n"); 2776 goto out; 2777 } 2778 2779 ocfs2_get_dlm_debug(dlm_debug); 2780 out: 2781 return ret; 2782 } 2783 2784 static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb) 2785 { 2786 struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug; 2787 2788 if (dlm_debug) { 2789 debugfs_remove(dlm_debug->d_locking_state); 2790 ocfs2_put_dlm_debug(dlm_debug); 2791 } 2792 } 2793 2794 int ocfs2_dlm_init(struct ocfs2_super *osb) 2795 { 2796 int status = 0; 2797 struct ocfs2_cluster_connection *conn = NULL; 2798 2799 mlog_entry_void(); 2800 2801 if (ocfs2_mount_local(osb)) { 2802 osb->node_num = 0; 2803 goto local; 2804 } 2805 2806 status = ocfs2_dlm_init_debug(osb); 2807 if (status < 0) { 2808 mlog_errno(status); 2809 goto bail; 2810 } 2811 2812 /* launch downconvert thread */ 2813 osb->dc_task = kthread_run(ocfs2_downconvert_thread, osb, "ocfs2dc"); 2814 if (IS_ERR(osb->dc_task)) { 2815 status = PTR_ERR(osb->dc_task); 2816 osb->dc_task = NULL; 2817 mlog_errno(status); 2818 goto bail; 2819 } 2820 2821 /* for now, uuid == domain */ 2822 status = ocfs2_cluster_connect(osb->osb_cluster_stack, 2823 osb->uuid_str, 2824 strlen(osb->uuid_str), 2825 ocfs2_do_node_down, osb, 2826 &conn); 2827 if (status) { 2828 mlog_errno(status); 2829 goto bail; 2830 } 2831 2832 status = ocfs2_cluster_this_node(&osb->node_num); 2833 if (status < 0) { 2834 mlog_errno(status); 2835 mlog(ML_ERROR, 2836 "could not find this host's node number\n"); 2837 ocfs2_cluster_disconnect(conn, 0); 2838 goto bail; 2839 } 2840 2841 local: 2842 ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb); 2843 ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb); 2844 ocfs2_nfs_sync_lock_res_init(&osb->osb_nfs_sync_lockres, osb); 2845 2846 osb->cconn = conn; 2847 2848 status = 0; 2849 bail: 2850 if (status < 0) { 2851 ocfs2_dlm_shutdown_debug(osb); 2852 if (osb->dc_task) 2853 kthread_stop(osb->dc_task); 2854 } 2855 2856 mlog_exit(status); 2857 return status; 2858 } 2859 2860 void ocfs2_dlm_shutdown(struct ocfs2_super *osb, 2861 int hangup_pending) 2862 { 2863 mlog_entry_void(); 2864 2865 ocfs2_drop_osb_locks(osb); 2866 2867 /* 2868 * Now that we have dropped all locks and ocfs2_dismount_volume() 2869 * has disabled recovery, the DLM won't be talking to us. It's 2870 * safe to tear things down before disconnecting the cluster. 2871 */ 2872 2873 if (osb->dc_task) { 2874 kthread_stop(osb->dc_task); 2875 osb->dc_task = NULL; 2876 } 2877 2878 ocfs2_lock_res_free(&osb->osb_super_lockres); 2879 ocfs2_lock_res_free(&osb->osb_rename_lockres); 2880 ocfs2_lock_res_free(&osb->osb_nfs_sync_lockres); 2881 2882 ocfs2_cluster_disconnect(osb->cconn, hangup_pending); 2883 osb->cconn = NULL; 2884 2885 ocfs2_dlm_shutdown_debug(osb); 2886 2887 mlog_exit_void(); 2888 } 2889 2890 static void ocfs2_unlock_ast(void *opaque, int error) 2891 { 2892 struct ocfs2_lock_res *lockres = opaque; 2893 unsigned long flags; 2894 2895 mlog_entry_void(); 2896 2897 mlog(0, "UNLOCK AST called on lock %s, action = %d\n", lockres->l_name, 2898 lockres->l_unlock_action); 2899 2900 spin_lock_irqsave(&lockres->l_lock, flags); 2901 if (error) { 2902 mlog(ML_ERROR, "Dlm passes error %d for lock %s, " 2903 "unlock_action %d\n", error, lockres->l_name, 2904 lockres->l_unlock_action); 2905 spin_unlock_irqrestore(&lockres->l_lock, flags); 2906 return; 2907 } 2908 2909 switch(lockres->l_unlock_action) { 2910 case OCFS2_UNLOCK_CANCEL_CONVERT: 2911 mlog(0, "Cancel convert success for %s\n", lockres->l_name); 2912 lockres->l_action = OCFS2_AST_INVALID; 2913 /* Downconvert thread may have requeued this lock, we 2914 * need to wake it. */ 2915 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) 2916 ocfs2_wake_downconvert_thread(ocfs2_get_lockres_osb(lockres)); 2917 break; 2918 case OCFS2_UNLOCK_DROP_LOCK: 2919 lockres->l_level = DLM_LOCK_IV; 2920 break; 2921 default: 2922 BUG(); 2923 } 2924 2925 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 2926 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 2927 wake_up(&lockres->l_event); 2928 spin_unlock_irqrestore(&lockres->l_lock, flags); 2929 2930 mlog_exit_void(); 2931 } 2932 2933 static int ocfs2_drop_lock(struct ocfs2_super *osb, 2934 struct ocfs2_lock_res *lockres) 2935 { 2936 int ret; 2937 unsigned long flags; 2938 u32 lkm_flags = 0; 2939 2940 /* We didn't get anywhere near actually using this lockres. */ 2941 if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED)) 2942 goto out; 2943 2944 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) 2945 lkm_flags |= DLM_LKF_VALBLK; 2946 2947 spin_lock_irqsave(&lockres->l_lock, flags); 2948 2949 mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_FREEING), 2950 "lockres %s, flags 0x%lx\n", 2951 lockres->l_name, lockres->l_flags); 2952 2953 while (lockres->l_flags & OCFS2_LOCK_BUSY) { 2954 mlog(0, "waiting on busy lock \"%s\": flags = %lx, action = " 2955 "%u, unlock_action = %u\n", 2956 lockres->l_name, lockres->l_flags, lockres->l_action, 2957 lockres->l_unlock_action); 2958 2959 spin_unlock_irqrestore(&lockres->l_lock, flags); 2960 2961 /* XXX: Today we just wait on any busy 2962 * locks... Perhaps we need to cancel converts in the 2963 * future? */ 2964 ocfs2_wait_on_busy_lock(lockres); 2965 2966 spin_lock_irqsave(&lockres->l_lock, flags); 2967 } 2968 2969 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) { 2970 if (lockres->l_flags & OCFS2_LOCK_ATTACHED && 2971 lockres->l_level == DLM_LOCK_EX && 2972 !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) 2973 lockres->l_ops->set_lvb(lockres); 2974 } 2975 2976 if (lockres->l_flags & OCFS2_LOCK_BUSY) 2977 mlog(ML_ERROR, "destroying busy lock: \"%s\"\n", 2978 lockres->l_name); 2979 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) 2980 mlog(0, "destroying blocked lock: \"%s\"\n", lockres->l_name); 2981 2982 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 2983 spin_unlock_irqrestore(&lockres->l_lock, flags); 2984 goto out; 2985 } 2986 2987 lockres_clear_flags(lockres, OCFS2_LOCK_ATTACHED); 2988 2989 /* make sure we never get here while waiting for an ast to 2990 * fire. */ 2991 BUG_ON(lockres->l_action != OCFS2_AST_INVALID); 2992 2993 /* is this necessary? */ 2994 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 2995 lockres->l_unlock_action = OCFS2_UNLOCK_DROP_LOCK; 2996 spin_unlock_irqrestore(&lockres->l_lock, flags); 2997 2998 mlog(0, "lock %s\n", lockres->l_name); 2999 3000 ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, lkm_flags, 3001 lockres); 3002 if (ret) { 3003 ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres); 3004 mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags); 3005 ocfs2_dlm_dump_lksb(&lockres->l_lksb); 3006 BUG(); 3007 } 3008 mlog(0, "lock %s, successful return from ocfs2_dlm_unlock\n", 3009 lockres->l_name); 3010 3011 ocfs2_wait_on_busy_lock(lockres); 3012 out: 3013 mlog_exit(0); 3014 return 0; 3015 } 3016 3017 /* Mark the lockres as being dropped. It will no longer be 3018 * queued if blocking, but we still may have to wait on it 3019 * being dequeued from the downconvert thread before we can consider 3020 * it safe to drop. 3021 * 3022 * You can *not* attempt to call cluster_lock on this lockres anymore. */ 3023 void ocfs2_mark_lockres_freeing(struct ocfs2_lock_res *lockres) 3024 { 3025 int status; 3026 struct ocfs2_mask_waiter mw; 3027 unsigned long flags; 3028 3029 ocfs2_init_mask_waiter(&mw); 3030 3031 spin_lock_irqsave(&lockres->l_lock, flags); 3032 lockres->l_flags |= OCFS2_LOCK_FREEING; 3033 while (lockres->l_flags & OCFS2_LOCK_QUEUED) { 3034 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_QUEUED, 0); 3035 spin_unlock_irqrestore(&lockres->l_lock, flags); 3036 3037 mlog(0, "Waiting on lockres %s\n", lockres->l_name); 3038 3039 status = ocfs2_wait_for_mask(&mw); 3040 if (status) 3041 mlog_errno(status); 3042 3043 spin_lock_irqsave(&lockres->l_lock, flags); 3044 } 3045 spin_unlock_irqrestore(&lockres->l_lock, flags); 3046 } 3047 3048 void ocfs2_simple_drop_lockres(struct ocfs2_super *osb, 3049 struct ocfs2_lock_res *lockres) 3050 { 3051 int ret; 3052 3053 ocfs2_mark_lockres_freeing(lockres); 3054 ret = ocfs2_drop_lock(osb, lockres); 3055 if (ret) 3056 mlog_errno(ret); 3057 } 3058 3059 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb) 3060 { 3061 ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres); 3062 ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres); 3063 ocfs2_simple_drop_lockres(osb, &osb->osb_nfs_sync_lockres); 3064 } 3065 3066 int ocfs2_drop_inode_locks(struct inode *inode) 3067 { 3068 int status, err; 3069 3070 mlog_entry_void(); 3071 3072 /* No need to call ocfs2_mark_lockres_freeing here - 3073 * ocfs2_clear_inode has done it for us. */ 3074 3075 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 3076 &OCFS2_I(inode)->ip_open_lockres); 3077 if (err < 0) 3078 mlog_errno(err); 3079 3080 status = err; 3081 3082 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 3083 &OCFS2_I(inode)->ip_inode_lockres); 3084 if (err < 0) 3085 mlog_errno(err); 3086 if (err < 0 && !status) 3087 status = err; 3088 3089 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 3090 &OCFS2_I(inode)->ip_rw_lockres); 3091 if (err < 0) 3092 mlog_errno(err); 3093 if (err < 0 && !status) 3094 status = err; 3095 3096 mlog_exit(status); 3097 return status; 3098 } 3099 3100 static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres, 3101 int new_level) 3102 { 3103 assert_spin_locked(&lockres->l_lock); 3104 3105 BUG_ON(lockres->l_blocking <= DLM_LOCK_NL); 3106 3107 if (lockres->l_level <= new_level) { 3108 mlog(ML_ERROR, "lockres->l_level (%d) <= new_level (%d)\n", 3109 lockres->l_level, new_level); 3110 BUG(); 3111 } 3112 3113 mlog(0, "lock %s, new_level = %d, l_blocking = %d\n", 3114 lockres->l_name, new_level, lockres->l_blocking); 3115 3116 lockres->l_action = OCFS2_AST_DOWNCONVERT; 3117 lockres->l_requested = new_level; 3118 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 3119 return lockres_set_pending(lockres); 3120 } 3121 3122 static int ocfs2_downconvert_lock(struct ocfs2_super *osb, 3123 struct ocfs2_lock_res *lockres, 3124 int new_level, 3125 int lvb, 3126 unsigned int generation) 3127 { 3128 int ret; 3129 u32 dlm_flags = DLM_LKF_CONVERT; 3130 3131 mlog_entry_void(); 3132 3133 if (lvb) 3134 dlm_flags |= DLM_LKF_VALBLK; 3135 3136 ret = ocfs2_dlm_lock(osb->cconn, 3137 new_level, 3138 &lockres->l_lksb, 3139 dlm_flags, 3140 lockres->l_name, 3141 OCFS2_LOCK_ID_MAX_LEN - 1, 3142 lockres); 3143 lockres_clear_pending(lockres, generation, osb); 3144 if (ret) { 3145 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 3146 ocfs2_recover_from_dlm_error(lockres, 1); 3147 goto bail; 3148 } 3149 3150 ret = 0; 3151 bail: 3152 mlog_exit(ret); 3153 return ret; 3154 } 3155 3156 /* returns 1 when the caller should unlock and call ocfs2_dlm_unlock */ 3157 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb, 3158 struct ocfs2_lock_res *lockres) 3159 { 3160 assert_spin_locked(&lockres->l_lock); 3161 3162 mlog_entry_void(); 3163 mlog(0, "lock %s\n", lockres->l_name); 3164 3165 if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) { 3166 /* If we're already trying to cancel a lock conversion 3167 * then just drop the spinlock and allow the caller to 3168 * requeue this lock. */ 3169 3170 mlog(0, "Lockres %s, skip convert\n", lockres->l_name); 3171 return 0; 3172 } 3173 3174 /* were we in a convert when we got the bast fire? */ 3175 BUG_ON(lockres->l_action != OCFS2_AST_CONVERT && 3176 lockres->l_action != OCFS2_AST_DOWNCONVERT); 3177 /* set things up for the unlockast to know to just 3178 * clear out the ast_action and unset busy, etc. */ 3179 lockres->l_unlock_action = OCFS2_UNLOCK_CANCEL_CONVERT; 3180 3181 mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_BUSY), 3182 "lock %s, invalid flags: 0x%lx\n", 3183 lockres->l_name, lockres->l_flags); 3184 3185 return 1; 3186 } 3187 3188 static int ocfs2_cancel_convert(struct ocfs2_super *osb, 3189 struct ocfs2_lock_res *lockres) 3190 { 3191 int ret; 3192 3193 mlog_entry_void(); 3194 mlog(0, "lock %s\n", lockres->l_name); 3195 3196 ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, 3197 DLM_LKF_CANCEL, lockres); 3198 if (ret) { 3199 ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres); 3200 ocfs2_recover_from_dlm_error(lockres, 0); 3201 } 3202 3203 mlog(0, "lock %s return from ocfs2_dlm_unlock\n", lockres->l_name); 3204 3205 mlog_exit(ret); 3206 return ret; 3207 } 3208 3209 static int ocfs2_unblock_lock(struct ocfs2_super *osb, 3210 struct ocfs2_lock_res *lockres, 3211 struct ocfs2_unblock_ctl *ctl) 3212 { 3213 unsigned long flags; 3214 int blocking; 3215 int new_level; 3216 int ret = 0; 3217 int set_lvb = 0; 3218 unsigned int gen; 3219 3220 mlog_entry_void(); 3221 3222 spin_lock_irqsave(&lockres->l_lock, flags); 3223 3224 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); 3225 3226 recheck: 3227 if (lockres->l_flags & OCFS2_LOCK_BUSY) { 3228 /* XXX 3229 * This is a *big* race. The OCFS2_LOCK_PENDING flag 3230 * exists entirely for one reason - another thread has set 3231 * OCFS2_LOCK_BUSY, but has *NOT* yet called dlm_lock(). 3232 * 3233 * If we do ocfs2_cancel_convert() before the other thread 3234 * calls dlm_lock(), our cancel will do nothing. We will 3235 * get no ast, and we will have no way of knowing the 3236 * cancel failed. Meanwhile, the other thread will call 3237 * into dlm_lock() and wait...forever. 3238 * 3239 * Why forever? Because another node has asked for the 3240 * lock first; that's why we're here in unblock_lock(). 3241 * 3242 * The solution is OCFS2_LOCK_PENDING. When PENDING is 3243 * set, we just requeue the unblock. Only when the other 3244 * thread has called dlm_lock() and cleared PENDING will 3245 * we then cancel their request. 3246 * 3247 * All callers of dlm_lock() must set OCFS2_DLM_PENDING 3248 * at the same time they set OCFS2_DLM_BUSY. They must 3249 * clear OCFS2_DLM_PENDING after dlm_lock() returns. 3250 */ 3251 if (lockres->l_flags & OCFS2_LOCK_PENDING) 3252 goto leave_requeue; 3253 3254 ctl->requeue = 1; 3255 ret = ocfs2_prepare_cancel_convert(osb, lockres); 3256 spin_unlock_irqrestore(&lockres->l_lock, flags); 3257 if (ret) { 3258 ret = ocfs2_cancel_convert(osb, lockres); 3259 if (ret < 0) 3260 mlog_errno(ret); 3261 } 3262 goto leave; 3263 } 3264 3265 /* if we're blocking an exclusive and we have *any* holders, 3266 * then requeue. */ 3267 if ((lockres->l_blocking == DLM_LOCK_EX) 3268 && (lockres->l_ex_holders || lockres->l_ro_holders)) 3269 goto leave_requeue; 3270 3271 /* If it's a PR we're blocking, then only 3272 * requeue if we've got any EX holders */ 3273 if (lockres->l_blocking == DLM_LOCK_PR && 3274 lockres->l_ex_holders) 3275 goto leave_requeue; 3276 3277 /* 3278 * Can we get a lock in this state if the holder counts are 3279 * zero? The meta data unblock code used to check this. 3280 */ 3281 if ((lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 3282 && (lockres->l_flags & OCFS2_LOCK_REFRESHING)) 3283 goto leave_requeue; 3284 3285 new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking); 3286 3287 if (lockres->l_ops->check_downconvert 3288 && !lockres->l_ops->check_downconvert(lockres, new_level)) 3289 goto leave_requeue; 3290 3291 /* If we get here, then we know that there are no more 3292 * incompatible holders (and anyone asking for an incompatible 3293 * lock is blocked). We can now downconvert the lock */ 3294 if (!lockres->l_ops->downconvert_worker) 3295 goto downconvert; 3296 3297 /* Some lockres types want to do a bit of work before 3298 * downconverting a lock. Allow that here. The worker function 3299 * may sleep, so we save off a copy of what we're blocking as 3300 * it may change while we're not holding the spin lock. */ 3301 blocking = lockres->l_blocking; 3302 spin_unlock_irqrestore(&lockres->l_lock, flags); 3303 3304 ctl->unblock_action = lockres->l_ops->downconvert_worker(lockres, blocking); 3305 3306 if (ctl->unblock_action == UNBLOCK_STOP_POST) 3307 goto leave; 3308 3309 spin_lock_irqsave(&lockres->l_lock, flags); 3310 if (blocking != lockres->l_blocking) { 3311 /* If this changed underneath us, then we can't drop 3312 * it just yet. */ 3313 goto recheck; 3314 } 3315 3316 downconvert: 3317 ctl->requeue = 0; 3318 3319 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) { 3320 if (lockres->l_level == DLM_LOCK_EX) 3321 set_lvb = 1; 3322 3323 /* 3324 * We only set the lvb if the lock has been fully 3325 * refreshed - otherwise we risk setting stale 3326 * data. Otherwise, there's no need to actually clear 3327 * out the lvb here as it's value is still valid. 3328 */ 3329 if (set_lvb && !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) 3330 lockres->l_ops->set_lvb(lockres); 3331 } 3332 3333 gen = ocfs2_prepare_downconvert(lockres, new_level); 3334 spin_unlock_irqrestore(&lockres->l_lock, flags); 3335 ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb, 3336 gen); 3337 3338 leave: 3339 mlog_exit(ret); 3340 return ret; 3341 3342 leave_requeue: 3343 spin_unlock_irqrestore(&lockres->l_lock, flags); 3344 ctl->requeue = 1; 3345 3346 mlog_exit(0); 3347 return 0; 3348 } 3349 3350 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres, 3351 int blocking) 3352 { 3353 struct inode *inode; 3354 struct address_space *mapping; 3355 3356 inode = ocfs2_lock_res_inode(lockres); 3357 mapping = inode->i_mapping; 3358 3359 if (!S_ISREG(inode->i_mode)) 3360 goto out; 3361 3362 /* 3363 * We need this before the filemap_fdatawrite() so that it can 3364 * transfer the dirty bit from the PTE to the 3365 * page. Unfortunately this means that even for EX->PR 3366 * downconverts, we'll lose our mappings and have to build 3367 * them up again. 3368 */ 3369 unmap_mapping_range(mapping, 0, 0, 0); 3370 3371 if (filemap_fdatawrite(mapping)) { 3372 mlog(ML_ERROR, "Could not sync inode %llu for downconvert!", 3373 (unsigned long long)OCFS2_I(inode)->ip_blkno); 3374 } 3375 sync_mapping_buffers(mapping); 3376 if (blocking == DLM_LOCK_EX) { 3377 truncate_inode_pages(mapping, 0); 3378 } else { 3379 /* We only need to wait on the I/O if we're not also 3380 * truncating pages because truncate_inode_pages waits 3381 * for us above. We don't truncate pages if we're 3382 * blocking anything < EXMODE because we want to keep 3383 * them around in that case. */ 3384 filemap_fdatawait(mapping); 3385 } 3386 3387 out: 3388 return UNBLOCK_CONTINUE; 3389 } 3390 3391 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres, 3392 int new_level) 3393 { 3394 struct inode *inode = ocfs2_lock_res_inode(lockres); 3395 int checkpointed = ocfs2_inode_fully_checkpointed(inode); 3396 3397 BUG_ON(new_level != DLM_LOCK_NL && new_level != DLM_LOCK_PR); 3398 BUG_ON(lockres->l_level != DLM_LOCK_EX && !checkpointed); 3399 3400 if (checkpointed) 3401 return 1; 3402 3403 ocfs2_start_checkpoint(OCFS2_SB(inode->i_sb)); 3404 return 0; 3405 } 3406 3407 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres) 3408 { 3409 struct inode *inode = ocfs2_lock_res_inode(lockres); 3410 3411 __ocfs2_stuff_meta_lvb(inode); 3412 } 3413 3414 /* 3415 * Does the final reference drop on our dentry lock. Right now this 3416 * happens in the downconvert thread, but we could choose to simplify the 3417 * dlmglue API and push these off to the ocfs2_wq in the future. 3418 */ 3419 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb, 3420 struct ocfs2_lock_res *lockres) 3421 { 3422 struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres); 3423 ocfs2_dentry_lock_put(osb, dl); 3424 } 3425 3426 /* 3427 * d_delete() matching dentries before the lock downconvert. 3428 * 3429 * At this point, any process waiting to destroy the 3430 * dentry_lock due to last ref count is stopped by the 3431 * OCFS2_LOCK_QUEUED flag. 3432 * 3433 * We have two potential problems 3434 * 3435 * 1) If we do the last reference drop on our dentry_lock (via dput) 3436 * we'll wind up in ocfs2_release_dentry_lock(), waiting on 3437 * the downconvert to finish. Instead we take an elevated 3438 * reference and push the drop until after we've completed our 3439 * unblock processing. 3440 * 3441 * 2) There might be another process with a final reference, 3442 * waiting on us to finish processing. If this is the case, we 3443 * detect it and exit out - there's no more dentries anyway. 3444 */ 3445 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres, 3446 int blocking) 3447 { 3448 struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres); 3449 struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode); 3450 struct dentry *dentry; 3451 unsigned long flags; 3452 int extra_ref = 0; 3453 3454 /* 3455 * This node is blocking another node from getting a read 3456 * lock. This happens when we've renamed within a 3457 * directory. We've forced the other nodes to d_delete(), but 3458 * we never actually dropped our lock because it's still 3459 * valid. The downconvert code will retain a PR for this node, 3460 * so there's no further work to do. 3461 */ 3462 if (blocking == DLM_LOCK_PR) 3463 return UNBLOCK_CONTINUE; 3464 3465 /* 3466 * Mark this inode as potentially orphaned. The code in 3467 * ocfs2_delete_inode() will figure out whether it actually 3468 * needs to be freed or not. 3469 */ 3470 spin_lock(&oi->ip_lock); 3471 oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED; 3472 spin_unlock(&oi->ip_lock); 3473 3474 /* 3475 * Yuck. We need to make sure however that the check of 3476 * OCFS2_LOCK_FREEING and the extra reference are atomic with 3477 * respect to a reference decrement or the setting of that 3478 * flag. 3479 */ 3480 spin_lock_irqsave(&lockres->l_lock, flags); 3481 spin_lock(&dentry_attach_lock); 3482 if (!(lockres->l_flags & OCFS2_LOCK_FREEING) 3483 && dl->dl_count) { 3484 dl->dl_count++; 3485 extra_ref = 1; 3486 } 3487 spin_unlock(&dentry_attach_lock); 3488 spin_unlock_irqrestore(&lockres->l_lock, flags); 3489 3490 mlog(0, "extra_ref = %d\n", extra_ref); 3491 3492 /* 3493 * We have a process waiting on us in ocfs2_dentry_iput(), 3494 * which means we can't have any more outstanding 3495 * aliases. There's no need to do any more work. 3496 */ 3497 if (!extra_ref) 3498 return UNBLOCK_CONTINUE; 3499 3500 spin_lock(&dentry_attach_lock); 3501 while (1) { 3502 dentry = ocfs2_find_local_alias(dl->dl_inode, 3503 dl->dl_parent_blkno, 1); 3504 if (!dentry) 3505 break; 3506 spin_unlock(&dentry_attach_lock); 3507 3508 mlog(0, "d_delete(%.*s);\n", dentry->d_name.len, 3509 dentry->d_name.name); 3510 3511 /* 3512 * The following dcache calls may do an 3513 * iput(). Normally we don't want that from the 3514 * downconverting thread, but in this case it's ok 3515 * because the requesting node already has an 3516 * exclusive lock on the inode, so it can't be queued 3517 * for a downconvert. 3518 */ 3519 d_delete(dentry); 3520 dput(dentry); 3521 3522 spin_lock(&dentry_attach_lock); 3523 } 3524 spin_unlock(&dentry_attach_lock); 3525 3526 /* 3527 * If we are the last holder of this dentry lock, there is no 3528 * reason to downconvert so skip straight to the unlock. 3529 */ 3530 if (dl->dl_count == 1) 3531 return UNBLOCK_STOP_POST; 3532 3533 return UNBLOCK_CONTINUE_POST; 3534 } 3535 3536 static void ocfs2_set_qinfo_lvb(struct ocfs2_lock_res *lockres) 3537 { 3538 struct ocfs2_qinfo_lvb *lvb; 3539 struct ocfs2_mem_dqinfo *oinfo = ocfs2_lock_res_qinfo(lockres); 3540 struct mem_dqinfo *info = sb_dqinfo(oinfo->dqi_gi.dqi_sb, 3541 oinfo->dqi_gi.dqi_type); 3542 3543 mlog_entry_void(); 3544 3545 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 3546 lvb->lvb_version = OCFS2_QINFO_LVB_VERSION; 3547 lvb->lvb_bgrace = cpu_to_be32(info->dqi_bgrace); 3548 lvb->lvb_igrace = cpu_to_be32(info->dqi_igrace); 3549 lvb->lvb_syncms = cpu_to_be32(oinfo->dqi_syncms); 3550 lvb->lvb_blocks = cpu_to_be32(oinfo->dqi_gi.dqi_blocks); 3551 lvb->lvb_free_blk = cpu_to_be32(oinfo->dqi_gi.dqi_free_blk); 3552 lvb->lvb_free_entry = cpu_to_be32(oinfo->dqi_gi.dqi_free_entry); 3553 3554 mlog_exit_void(); 3555 } 3556 3557 void ocfs2_qinfo_unlock(struct ocfs2_mem_dqinfo *oinfo, int ex) 3558 { 3559 struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock; 3560 struct ocfs2_super *osb = OCFS2_SB(oinfo->dqi_gi.dqi_sb); 3561 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 3562 3563 mlog_entry_void(); 3564 if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb)) 3565 ocfs2_cluster_unlock(osb, lockres, level); 3566 mlog_exit_void(); 3567 } 3568 3569 static int ocfs2_refresh_qinfo(struct ocfs2_mem_dqinfo *oinfo) 3570 { 3571 struct mem_dqinfo *info = sb_dqinfo(oinfo->dqi_gi.dqi_sb, 3572 oinfo->dqi_gi.dqi_type); 3573 struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock; 3574 struct ocfs2_qinfo_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 3575 struct buffer_head *bh = NULL; 3576 struct ocfs2_global_disk_dqinfo *gdinfo; 3577 int status = 0; 3578 3579 if (lvb->lvb_version == OCFS2_QINFO_LVB_VERSION) { 3580 info->dqi_bgrace = be32_to_cpu(lvb->lvb_bgrace); 3581 info->dqi_igrace = be32_to_cpu(lvb->lvb_igrace); 3582 oinfo->dqi_syncms = be32_to_cpu(lvb->lvb_syncms); 3583 oinfo->dqi_gi.dqi_blocks = be32_to_cpu(lvb->lvb_blocks); 3584 oinfo->dqi_gi.dqi_free_blk = be32_to_cpu(lvb->lvb_free_blk); 3585 oinfo->dqi_gi.dqi_free_entry = 3586 be32_to_cpu(lvb->lvb_free_entry); 3587 } else { 3588 status = ocfs2_read_quota_block(oinfo->dqi_gqinode, 0, &bh); 3589 if (status) { 3590 mlog_errno(status); 3591 goto bail; 3592 } 3593 gdinfo = (struct ocfs2_global_disk_dqinfo *) 3594 (bh->b_data + OCFS2_GLOBAL_INFO_OFF); 3595 info->dqi_bgrace = le32_to_cpu(gdinfo->dqi_bgrace); 3596 info->dqi_igrace = le32_to_cpu(gdinfo->dqi_igrace); 3597 oinfo->dqi_syncms = le32_to_cpu(gdinfo->dqi_syncms); 3598 oinfo->dqi_gi.dqi_blocks = le32_to_cpu(gdinfo->dqi_blocks); 3599 oinfo->dqi_gi.dqi_free_blk = le32_to_cpu(gdinfo->dqi_free_blk); 3600 oinfo->dqi_gi.dqi_free_entry = 3601 le32_to_cpu(gdinfo->dqi_free_entry); 3602 brelse(bh); 3603 ocfs2_track_lock_refresh(lockres); 3604 } 3605 3606 bail: 3607 return status; 3608 } 3609 3610 /* Lock quota info, this function expects at least shared lock on the quota file 3611 * so that we can safely refresh quota info from disk. */ 3612 int ocfs2_qinfo_lock(struct ocfs2_mem_dqinfo *oinfo, int ex) 3613 { 3614 struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock; 3615 struct ocfs2_super *osb = OCFS2_SB(oinfo->dqi_gi.dqi_sb); 3616 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 3617 int status = 0; 3618 3619 mlog_entry_void(); 3620 3621 /* On RO devices, locking really isn't needed... */ 3622 if (ocfs2_is_hard_readonly(osb)) { 3623 if (ex) 3624 status = -EROFS; 3625 goto bail; 3626 } 3627 if (ocfs2_mount_local(osb)) 3628 goto bail; 3629 3630 status = ocfs2_cluster_lock(osb, lockres, level, 0, 0); 3631 if (status < 0) { 3632 mlog_errno(status); 3633 goto bail; 3634 } 3635 if (!ocfs2_should_refresh_lock_res(lockres)) 3636 goto bail; 3637 /* OK, we have the lock but we need to refresh the quota info */ 3638 status = ocfs2_refresh_qinfo(oinfo); 3639 if (status) 3640 ocfs2_qinfo_unlock(oinfo, ex); 3641 ocfs2_complete_lock_res_refresh(lockres, status); 3642 bail: 3643 mlog_exit(status); 3644 return status; 3645 } 3646 3647 /* 3648 * This is the filesystem locking protocol. It provides the lock handling 3649 * hooks for the underlying DLM. It has a maximum version number. 3650 * The version number allows interoperability with systems running at 3651 * the same major number and an equal or smaller minor number. 3652 * 3653 * Whenever the filesystem does new things with locks (adds or removes a 3654 * lock, orders them differently, does different things underneath a lock), 3655 * the version must be changed. The protocol is negotiated when joining 3656 * the dlm domain. A node may join the domain if its major version is 3657 * identical to all other nodes and its minor version is greater than 3658 * or equal to all other nodes. When its minor version is greater than 3659 * the other nodes, it will run at the minor version specified by the 3660 * other nodes. 3661 * 3662 * If a locking change is made that will not be compatible with older 3663 * versions, the major number must be increased and the minor version set 3664 * to zero. If a change merely adds a behavior that can be disabled when 3665 * speaking to older versions, the minor version must be increased. If a 3666 * change adds a fully backwards compatible change (eg, LVB changes that 3667 * are just ignored by older versions), the version does not need to be 3668 * updated. 3669 */ 3670 static struct ocfs2_locking_protocol lproto = { 3671 .lp_max_version = { 3672 .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR, 3673 .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR, 3674 }, 3675 .lp_lock_ast = ocfs2_locking_ast, 3676 .lp_blocking_ast = ocfs2_blocking_ast, 3677 .lp_unlock_ast = ocfs2_unlock_ast, 3678 }; 3679 3680 void ocfs2_set_locking_protocol(void) 3681 { 3682 ocfs2_stack_glue_set_locking_protocol(&lproto); 3683 } 3684 3685 3686 static void ocfs2_process_blocked_lock(struct ocfs2_super *osb, 3687 struct ocfs2_lock_res *lockres) 3688 { 3689 int status; 3690 struct ocfs2_unblock_ctl ctl = {0, 0,}; 3691 unsigned long flags; 3692 3693 /* Our reference to the lockres in this function can be 3694 * considered valid until we remove the OCFS2_LOCK_QUEUED 3695 * flag. */ 3696 3697 mlog_entry_void(); 3698 3699 BUG_ON(!lockres); 3700 BUG_ON(!lockres->l_ops); 3701 3702 mlog(0, "lockres %s blocked.\n", lockres->l_name); 3703 3704 /* Detect whether a lock has been marked as going away while 3705 * the downconvert thread was processing other things. A lock can 3706 * still be marked with OCFS2_LOCK_FREEING after this check, 3707 * but short circuiting here will still save us some 3708 * performance. */ 3709 spin_lock_irqsave(&lockres->l_lock, flags); 3710 if (lockres->l_flags & OCFS2_LOCK_FREEING) 3711 goto unqueue; 3712 spin_unlock_irqrestore(&lockres->l_lock, flags); 3713 3714 status = ocfs2_unblock_lock(osb, lockres, &ctl); 3715 if (status < 0) 3716 mlog_errno(status); 3717 3718 spin_lock_irqsave(&lockres->l_lock, flags); 3719 unqueue: 3720 if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) { 3721 lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED); 3722 } else 3723 ocfs2_schedule_blocked_lock(osb, lockres); 3724 3725 mlog(0, "lockres %s, requeue = %s.\n", lockres->l_name, 3726 ctl.requeue ? "yes" : "no"); 3727 spin_unlock_irqrestore(&lockres->l_lock, flags); 3728 3729 if (ctl.unblock_action != UNBLOCK_CONTINUE 3730 && lockres->l_ops->post_unlock) 3731 lockres->l_ops->post_unlock(osb, lockres); 3732 3733 mlog_exit_void(); 3734 } 3735 3736 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb, 3737 struct ocfs2_lock_res *lockres) 3738 { 3739 mlog_entry_void(); 3740 3741 assert_spin_locked(&lockres->l_lock); 3742 3743 if (lockres->l_flags & OCFS2_LOCK_FREEING) { 3744 /* Do not schedule a lock for downconvert when it's on 3745 * the way to destruction - any nodes wanting access 3746 * to the resource will get it soon. */ 3747 mlog(0, "Lockres %s won't be scheduled: flags 0x%lx\n", 3748 lockres->l_name, lockres->l_flags); 3749 return; 3750 } 3751 3752 lockres_or_flags(lockres, OCFS2_LOCK_QUEUED); 3753 3754 spin_lock(&osb->dc_task_lock); 3755 if (list_empty(&lockres->l_blocked_list)) { 3756 list_add_tail(&lockres->l_blocked_list, 3757 &osb->blocked_lock_list); 3758 osb->blocked_lock_count++; 3759 } 3760 spin_unlock(&osb->dc_task_lock); 3761 3762 mlog_exit_void(); 3763 } 3764 3765 static void ocfs2_downconvert_thread_do_work(struct ocfs2_super *osb) 3766 { 3767 unsigned long processed; 3768 struct ocfs2_lock_res *lockres; 3769 3770 mlog_entry_void(); 3771 3772 spin_lock(&osb->dc_task_lock); 3773 /* grab this early so we know to try again if a state change and 3774 * wake happens part-way through our work */ 3775 osb->dc_work_sequence = osb->dc_wake_sequence; 3776 3777 processed = osb->blocked_lock_count; 3778 while (processed) { 3779 BUG_ON(list_empty(&osb->blocked_lock_list)); 3780 3781 lockres = list_entry(osb->blocked_lock_list.next, 3782 struct ocfs2_lock_res, l_blocked_list); 3783 list_del_init(&lockres->l_blocked_list); 3784 osb->blocked_lock_count--; 3785 spin_unlock(&osb->dc_task_lock); 3786 3787 BUG_ON(!processed); 3788 processed--; 3789 3790 ocfs2_process_blocked_lock(osb, lockres); 3791 3792 spin_lock(&osb->dc_task_lock); 3793 } 3794 spin_unlock(&osb->dc_task_lock); 3795 3796 mlog_exit_void(); 3797 } 3798 3799 static int ocfs2_downconvert_thread_lists_empty(struct ocfs2_super *osb) 3800 { 3801 int empty = 0; 3802 3803 spin_lock(&osb->dc_task_lock); 3804 if (list_empty(&osb->blocked_lock_list)) 3805 empty = 1; 3806 3807 spin_unlock(&osb->dc_task_lock); 3808 return empty; 3809 } 3810 3811 static int ocfs2_downconvert_thread_should_wake(struct ocfs2_super *osb) 3812 { 3813 int should_wake = 0; 3814 3815 spin_lock(&osb->dc_task_lock); 3816 if (osb->dc_work_sequence != osb->dc_wake_sequence) 3817 should_wake = 1; 3818 spin_unlock(&osb->dc_task_lock); 3819 3820 return should_wake; 3821 } 3822 3823 static int ocfs2_downconvert_thread(void *arg) 3824 { 3825 int status = 0; 3826 struct ocfs2_super *osb = arg; 3827 3828 /* only quit once we've been asked to stop and there is no more 3829 * work available */ 3830 while (!(kthread_should_stop() && 3831 ocfs2_downconvert_thread_lists_empty(osb))) { 3832 3833 wait_event_interruptible(osb->dc_event, 3834 ocfs2_downconvert_thread_should_wake(osb) || 3835 kthread_should_stop()); 3836 3837 mlog(0, "downconvert_thread: awoken\n"); 3838 3839 ocfs2_downconvert_thread_do_work(osb); 3840 } 3841 3842 osb->dc_task = NULL; 3843 return status; 3844 } 3845 3846 void ocfs2_wake_downconvert_thread(struct ocfs2_super *osb) 3847 { 3848 spin_lock(&osb->dc_task_lock); 3849 /* make sure the voting thread gets a swipe at whatever changes 3850 * the caller may have made to the voting state */ 3851 osb->dc_wake_sequence++; 3852 spin_unlock(&osb->dc_task_lock); 3853 wake_up(&osb->dc_event); 3854 } 3855