1 /* -*- mode: c; c-basic-offset: 8; -*- 2 * vim: noexpandtab sw=8 ts=8 sts=0: 3 * 4 * dlmglue.c 5 * 6 * Code which implements an OCFS2 specific interface to our DLM. 7 * 8 * Copyright (C) 2003, 2004 Oracle. All rights reserved. 9 * 10 * This program is free software; you can redistribute it and/or 11 * modify it under the terms of the GNU General Public 12 * License as published by the Free Software Foundation; either 13 * version 2 of the License, or (at your option) any later version. 14 * 15 * This program is distributed in the hope that it will be useful, 16 * but WITHOUT ANY WARRANTY; without even the implied warranty of 17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 18 * General Public License for more details. 19 * 20 * You should have received a copy of the GNU General Public 21 * License along with this program; if not, write to the 22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 23 * Boston, MA 021110-1307, USA. 24 */ 25 26 #include <linux/types.h> 27 #include <linux/slab.h> 28 #include <linux/highmem.h> 29 #include <linux/mm.h> 30 #include <linux/kthread.h> 31 #include <linux/pagemap.h> 32 #include <linux/debugfs.h> 33 #include <linux/seq_file.h> 34 #include <linux/time.h> 35 #include <linux/quotaops.h> 36 37 #define MLOG_MASK_PREFIX ML_DLM_GLUE 38 #include <cluster/masklog.h> 39 40 #include "ocfs2.h" 41 #include "ocfs2_lockingver.h" 42 43 #include "alloc.h" 44 #include "dcache.h" 45 #include "dlmglue.h" 46 #include "extent_map.h" 47 #include "file.h" 48 #include "heartbeat.h" 49 #include "inode.h" 50 #include "journal.h" 51 #include "stackglue.h" 52 #include "slot_map.h" 53 #include "super.h" 54 #include "uptodate.h" 55 #include "quota.h" 56 #include "refcounttree.h" 57 58 #include "buffer_head_io.h" 59 60 struct ocfs2_mask_waiter { 61 struct list_head mw_item; 62 int mw_status; 63 struct completion mw_complete; 64 unsigned long mw_mask; 65 unsigned long mw_goal; 66 #ifdef CONFIG_OCFS2_FS_STATS 67 unsigned long long mw_lock_start; 68 #endif 69 }; 70 71 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres); 72 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres); 73 static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres); 74 static struct ocfs2_super *ocfs2_get_qinfo_osb(struct ocfs2_lock_res *lockres); 75 76 /* 77 * Return value from ->downconvert_worker functions. 78 * 79 * These control the precise actions of ocfs2_unblock_lock() 80 * and ocfs2_process_blocked_lock() 81 * 82 */ 83 enum ocfs2_unblock_action { 84 UNBLOCK_CONTINUE = 0, /* Continue downconvert */ 85 UNBLOCK_CONTINUE_POST = 1, /* Continue downconvert, fire 86 * ->post_unlock callback */ 87 UNBLOCK_STOP_POST = 2, /* Do not downconvert, fire 88 * ->post_unlock() callback. */ 89 }; 90 91 struct ocfs2_unblock_ctl { 92 int requeue; 93 enum ocfs2_unblock_action unblock_action; 94 }; 95 96 /* Lockdep class keys */ 97 struct lock_class_key lockdep_keys[OCFS2_NUM_LOCK_TYPES]; 98 99 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres, 100 int new_level); 101 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres); 102 103 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres, 104 int blocking); 105 106 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres, 107 int blocking); 108 109 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb, 110 struct ocfs2_lock_res *lockres); 111 112 static void ocfs2_set_qinfo_lvb(struct ocfs2_lock_res *lockres); 113 114 static int ocfs2_check_refcount_downconvert(struct ocfs2_lock_res *lockres, 115 int new_level); 116 static int ocfs2_refcount_convert_worker(struct ocfs2_lock_res *lockres, 117 int blocking); 118 119 #define mlog_meta_lvb(__level, __lockres) ocfs2_dump_meta_lvb_info(__level, __PRETTY_FUNCTION__, __LINE__, __lockres) 120 121 /* This aids in debugging situations where a bad LVB might be involved. */ 122 static void ocfs2_dump_meta_lvb_info(u64 level, 123 const char *function, 124 unsigned int line, 125 struct ocfs2_lock_res *lockres) 126 { 127 struct ocfs2_meta_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 128 129 mlog(level, "LVB information for %s (called from %s:%u):\n", 130 lockres->l_name, function, line); 131 mlog(level, "version: %u, clusters: %u, generation: 0x%x\n", 132 lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters), 133 be32_to_cpu(lvb->lvb_igeneration)); 134 mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n", 135 (unsigned long long)be64_to_cpu(lvb->lvb_isize), 136 be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid), 137 be16_to_cpu(lvb->lvb_imode)); 138 mlog(level, "nlink %u, atime_packed 0x%llx, ctime_packed 0x%llx, " 139 "mtime_packed 0x%llx iattr 0x%x\n", be16_to_cpu(lvb->lvb_inlink), 140 (long long)be64_to_cpu(lvb->lvb_iatime_packed), 141 (long long)be64_to_cpu(lvb->lvb_ictime_packed), 142 (long long)be64_to_cpu(lvb->lvb_imtime_packed), 143 be32_to_cpu(lvb->lvb_iattr)); 144 } 145 146 147 /* 148 * OCFS2 Lock Resource Operations 149 * 150 * These fine tune the behavior of the generic dlmglue locking infrastructure. 151 * 152 * The most basic of lock types can point ->l_priv to their respective 153 * struct ocfs2_super and allow the default actions to manage things. 154 * 155 * Right now, each lock type also needs to implement an init function, 156 * and trivial lock/unlock wrappers. ocfs2_simple_drop_lockres() 157 * should be called when the lock is no longer needed (i.e., object 158 * destruction time). 159 */ 160 struct ocfs2_lock_res_ops { 161 /* 162 * Translate an ocfs2_lock_res * into an ocfs2_super *. Define 163 * this callback if ->l_priv is not an ocfs2_super pointer 164 */ 165 struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *); 166 167 /* 168 * Optionally called in the downconvert thread after a 169 * successful downconvert. The lockres will not be referenced 170 * after this callback is called, so it is safe to free 171 * memory, etc. 172 * 173 * The exact semantics of when this is called are controlled 174 * by ->downconvert_worker() 175 */ 176 void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *); 177 178 /* 179 * Allow a lock type to add checks to determine whether it is 180 * safe to downconvert a lock. Return 0 to re-queue the 181 * downconvert at a later time, nonzero to continue. 182 * 183 * For most locks, the default checks that there are no 184 * incompatible holders are sufficient. 185 * 186 * Called with the lockres spinlock held. 187 */ 188 int (*check_downconvert)(struct ocfs2_lock_res *, int); 189 190 /* 191 * Allows a lock type to populate the lock value block. This 192 * is called on downconvert, and when we drop a lock. 193 * 194 * Locks that want to use this should set LOCK_TYPE_USES_LVB 195 * in the flags field. 196 * 197 * Called with the lockres spinlock held. 198 */ 199 void (*set_lvb)(struct ocfs2_lock_res *); 200 201 /* 202 * Called from the downconvert thread when it is determined 203 * that a lock will be downconverted. This is called without 204 * any locks held so the function can do work that might 205 * schedule (syncing out data, etc). 206 * 207 * This should return any one of the ocfs2_unblock_action 208 * values, depending on what it wants the thread to do. 209 */ 210 int (*downconvert_worker)(struct ocfs2_lock_res *, int); 211 212 /* 213 * LOCK_TYPE_* flags which describe the specific requirements 214 * of a lock type. Descriptions of each individual flag follow. 215 */ 216 int flags; 217 }; 218 219 /* 220 * Some locks want to "refresh" potentially stale data when a 221 * meaningful (PRMODE or EXMODE) lock level is first obtained. If this 222 * flag is set, the OCFS2_LOCK_NEEDS_REFRESH flag will be set on the 223 * individual lockres l_flags member from the ast function. It is 224 * expected that the locking wrapper will clear the 225 * OCFS2_LOCK_NEEDS_REFRESH flag when done. 226 */ 227 #define LOCK_TYPE_REQUIRES_REFRESH 0x1 228 229 /* 230 * Indicate that a lock type makes use of the lock value block. The 231 * ->set_lvb lock type callback must be defined. 232 */ 233 #define LOCK_TYPE_USES_LVB 0x2 234 235 static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = { 236 .get_osb = ocfs2_get_inode_osb, 237 .flags = 0, 238 }; 239 240 static struct ocfs2_lock_res_ops ocfs2_inode_inode_lops = { 241 .get_osb = ocfs2_get_inode_osb, 242 .check_downconvert = ocfs2_check_meta_downconvert, 243 .set_lvb = ocfs2_set_meta_lvb, 244 .downconvert_worker = ocfs2_data_convert_worker, 245 .flags = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB, 246 }; 247 248 static struct ocfs2_lock_res_ops ocfs2_super_lops = { 249 .flags = LOCK_TYPE_REQUIRES_REFRESH, 250 }; 251 252 static struct ocfs2_lock_res_ops ocfs2_rename_lops = { 253 .flags = 0, 254 }; 255 256 static struct ocfs2_lock_res_ops ocfs2_nfs_sync_lops = { 257 .flags = 0, 258 }; 259 260 static struct ocfs2_lock_res_ops ocfs2_orphan_scan_lops = { 261 .flags = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB, 262 }; 263 264 static struct ocfs2_lock_res_ops ocfs2_dentry_lops = { 265 .get_osb = ocfs2_get_dentry_osb, 266 .post_unlock = ocfs2_dentry_post_unlock, 267 .downconvert_worker = ocfs2_dentry_convert_worker, 268 .flags = 0, 269 }; 270 271 static struct ocfs2_lock_res_ops ocfs2_inode_open_lops = { 272 .get_osb = ocfs2_get_inode_osb, 273 .flags = 0, 274 }; 275 276 static struct ocfs2_lock_res_ops ocfs2_flock_lops = { 277 .get_osb = ocfs2_get_file_osb, 278 .flags = 0, 279 }; 280 281 static struct ocfs2_lock_res_ops ocfs2_qinfo_lops = { 282 .set_lvb = ocfs2_set_qinfo_lvb, 283 .get_osb = ocfs2_get_qinfo_osb, 284 .flags = LOCK_TYPE_REQUIRES_REFRESH | LOCK_TYPE_USES_LVB, 285 }; 286 287 static struct ocfs2_lock_res_ops ocfs2_refcount_block_lops = { 288 .check_downconvert = ocfs2_check_refcount_downconvert, 289 .downconvert_worker = ocfs2_refcount_convert_worker, 290 .flags = 0, 291 }; 292 293 static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres) 294 { 295 return lockres->l_type == OCFS2_LOCK_TYPE_META || 296 lockres->l_type == OCFS2_LOCK_TYPE_RW || 297 lockres->l_type == OCFS2_LOCK_TYPE_OPEN; 298 } 299 300 static inline struct ocfs2_lock_res *ocfs2_lksb_to_lock_res(struct ocfs2_dlm_lksb *lksb) 301 { 302 return container_of(lksb, struct ocfs2_lock_res, l_lksb); 303 } 304 305 static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres) 306 { 307 BUG_ON(!ocfs2_is_inode_lock(lockres)); 308 309 return (struct inode *) lockres->l_priv; 310 } 311 312 static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres) 313 { 314 BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY); 315 316 return (struct ocfs2_dentry_lock *)lockres->l_priv; 317 } 318 319 static inline struct ocfs2_mem_dqinfo *ocfs2_lock_res_qinfo(struct ocfs2_lock_res *lockres) 320 { 321 BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_QINFO); 322 323 return (struct ocfs2_mem_dqinfo *)lockres->l_priv; 324 } 325 326 static inline struct ocfs2_refcount_tree * 327 ocfs2_lock_res_refcount_tree(struct ocfs2_lock_res *res) 328 { 329 return container_of(res, struct ocfs2_refcount_tree, rf_lockres); 330 } 331 332 static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *lockres) 333 { 334 if (lockres->l_ops->get_osb) 335 return lockres->l_ops->get_osb(lockres); 336 337 return (struct ocfs2_super *)lockres->l_priv; 338 } 339 340 static int ocfs2_lock_create(struct ocfs2_super *osb, 341 struct ocfs2_lock_res *lockres, 342 int level, 343 u32 dlm_flags); 344 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres, 345 int wanted); 346 static void __ocfs2_cluster_unlock(struct ocfs2_super *osb, 347 struct ocfs2_lock_res *lockres, 348 int level, unsigned long caller_ip); 349 static inline void ocfs2_cluster_unlock(struct ocfs2_super *osb, 350 struct ocfs2_lock_res *lockres, 351 int level) 352 { 353 __ocfs2_cluster_unlock(osb, lockres, level, _RET_IP_); 354 } 355 356 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres); 357 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres); 358 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres); 359 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, int level); 360 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb, 361 struct ocfs2_lock_res *lockres); 362 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres, 363 int convert); 364 #define ocfs2_log_dlm_error(_func, _err, _lockres) do { \ 365 if ((_lockres)->l_type != OCFS2_LOCK_TYPE_DENTRY) \ 366 mlog(ML_ERROR, "DLM error %d while calling %s on resource %s\n", \ 367 _err, _func, _lockres->l_name); \ 368 else \ 369 mlog(ML_ERROR, "DLM error %d while calling %s on resource %.*s%08x\n", \ 370 _err, _func, OCFS2_DENTRY_LOCK_INO_START - 1, (_lockres)->l_name, \ 371 (unsigned int)ocfs2_get_dentry_lock_ino(_lockres)); \ 372 } while (0) 373 static int ocfs2_downconvert_thread(void *arg); 374 static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb, 375 struct ocfs2_lock_res *lockres); 376 static int ocfs2_inode_lock_update(struct inode *inode, 377 struct buffer_head **bh); 378 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb); 379 static inline int ocfs2_highest_compat_lock_level(int level); 380 static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres, 381 int new_level); 382 static int ocfs2_downconvert_lock(struct ocfs2_super *osb, 383 struct ocfs2_lock_res *lockres, 384 int new_level, 385 int lvb, 386 unsigned int generation); 387 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb, 388 struct ocfs2_lock_res *lockres); 389 static int ocfs2_cancel_convert(struct ocfs2_super *osb, 390 struct ocfs2_lock_res *lockres); 391 392 393 static void ocfs2_build_lock_name(enum ocfs2_lock_type type, 394 u64 blkno, 395 u32 generation, 396 char *name) 397 { 398 int len; 399 400 mlog_entry_void(); 401 402 BUG_ON(type >= OCFS2_NUM_LOCK_TYPES); 403 404 len = snprintf(name, OCFS2_LOCK_ID_MAX_LEN, "%c%s%016llx%08x", 405 ocfs2_lock_type_char(type), OCFS2_LOCK_ID_PAD, 406 (long long)blkno, generation); 407 408 BUG_ON(len != (OCFS2_LOCK_ID_MAX_LEN - 1)); 409 410 mlog(0, "built lock resource with name: %s\n", name); 411 412 mlog_exit_void(); 413 } 414 415 static DEFINE_SPINLOCK(ocfs2_dlm_tracking_lock); 416 417 static void ocfs2_add_lockres_tracking(struct ocfs2_lock_res *res, 418 struct ocfs2_dlm_debug *dlm_debug) 419 { 420 mlog(0, "Add tracking for lockres %s\n", res->l_name); 421 422 spin_lock(&ocfs2_dlm_tracking_lock); 423 list_add(&res->l_debug_list, &dlm_debug->d_lockres_tracking); 424 spin_unlock(&ocfs2_dlm_tracking_lock); 425 } 426 427 static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res) 428 { 429 spin_lock(&ocfs2_dlm_tracking_lock); 430 if (!list_empty(&res->l_debug_list)) 431 list_del_init(&res->l_debug_list); 432 spin_unlock(&ocfs2_dlm_tracking_lock); 433 } 434 435 #ifdef CONFIG_OCFS2_FS_STATS 436 static void ocfs2_init_lock_stats(struct ocfs2_lock_res *res) 437 { 438 res->l_lock_num_prmode = 0; 439 res->l_lock_num_prmode_failed = 0; 440 res->l_lock_total_prmode = 0; 441 res->l_lock_max_prmode = 0; 442 res->l_lock_num_exmode = 0; 443 res->l_lock_num_exmode_failed = 0; 444 res->l_lock_total_exmode = 0; 445 res->l_lock_max_exmode = 0; 446 res->l_lock_refresh = 0; 447 } 448 449 static void ocfs2_update_lock_stats(struct ocfs2_lock_res *res, int level, 450 struct ocfs2_mask_waiter *mw, int ret) 451 { 452 unsigned long long *num, *sum; 453 unsigned int *max, *failed; 454 struct timespec ts = current_kernel_time(); 455 unsigned long long time = timespec_to_ns(&ts) - mw->mw_lock_start; 456 457 if (level == LKM_PRMODE) { 458 num = &res->l_lock_num_prmode; 459 sum = &res->l_lock_total_prmode; 460 max = &res->l_lock_max_prmode; 461 failed = &res->l_lock_num_prmode_failed; 462 } else if (level == LKM_EXMODE) { 463 num = &res->l_lock_num_exmode; 464 sum = &res->l_lock_total_exmode; 465 max = &res->l_lock_max_exmode; 466 failed = &res->l_lock_num_exmode_failed; 467 } else 468 return; 469 470 (*num)++; 471 (*sum) += time; 472 if (time > *max) 473 *max = time; 474 if (ret) 475 (*failed)++; 476 } 477 478 static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres) 479 { 480 lockres->l_lock_refresh++; 481 } 482 483 static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw) 484 { 485 struct timespec ts = current_kernel_time(); 486 mw->mw_lock_start = timespec_to_ns(&ts); 487 } 488 #else 489 static inline void ocfs2_init_lock_stats(struct ocfs2_lock_res *res) 490 { 491 } 492 static inline void ocfs2_update_lock_stats(struct ocfs2_lock_res *res, 493 int level, struct ocfs2_mask_waiter *mw, int ret) 494 { 495 } 496 static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres) 497 { 498 } 499 static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw) 500 { 501 } 502 #endif 503 504 static void ocfs2_lock_res_init_common(struct ocfs2_super *osb, 505 struct ocfs2_lock_res *res, 506 enum ocfs2_lock_type type, 507 struct ocfs2_lock_res_ops *ops, 508 void *priv) 509 { 510 res->l_type = type; 511 res->l_ops = ops; 512 res->l_priv = priv; 513 514 res->l_level = DLM_LOCK_IV; 515 res->l_requested = DLM_LOCK_IV; 516 res->l_blocking = DLM_LOCK_IV; 517 res->l_action = OCFS2_AST_INVALID; 518 res->l_unlock_action = OCFS2_UNLOCK_INVALID; 519 520 res->l_flags = OCFS2_LOCK_INITIALIZED; 521 522 ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug); 523 524 ocfs2_init_lock_stats(res); 525 #ifdef CONFIG_DEBUG_LOCK_ALLOC 526 if (type != OCFS2_LOCK_TYPE_OPEN) 527 lockdep_init_map(&res->l_lockdep_map, ocfs2_lock_type_strings[type], 528 &lockdep_keys[type], 0); 529 else 530 res->l_lockdep_map.key = NULL; 531 #endif 532 } 533 534 void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res) 535 { 536 /* This also clears out the lock status block */ 537 memset(res, 0, sizeof(struct ocfs2_lock_res)); 538 spin_lock_init(&res->l_lock); 539 init_waitqueue_head(&res->l_event); 540 INIT_LIST_HEAD(&res->l_blocked_list); 541 INIT_LIST_HEAD(&res->l_mask_waiters); 542 } 543 544 void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res, 545 enum ocfs2_lock_type type, 546 unsigned int generation, 547 struct inode *inode) 548 { 549 struct ocfs2_lock_res_ops *ops; 550 551 switch(type) { 552 case OCFS2_LOCK_TYPE_RW: 553 ops = &ocfs2_inode_rw_lops; 554 break; 555 case OCFS2_LOCK_TYPE_META: 556 ops = &ocfs2_inode_inode_lops; 557 break; 558 case OCFS2_LOCK_TYPE_OPEN: 559 ops = &ocfs2_inode_open_lops; 560 break; 561 default: 562 mlog_bug_on_msg(1, "type: %d\n", type); 563 ops = NULL; /* thanks, gcc */ 564 break; 565 }; 566 567 ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno, 568 generation, res->l_name); 569 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode); 570 } 571 572 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres) 573 { 574 struct inode *inode = ocfs2_lock_res_inode(lockres); 575 576 return OCFS2_SB(inode->i_sb); 577 } 578 579 static struct ocfs2_super *ocfs2_get_qinfo_osb(struct ocfs2_lock_res *lockres) 580 { 581 struct ocfs2_mem_dqinfo *info = lockres->l_priv; 582 583 return OCFS2_SB(info->dqi_gi.dqi_sb); 584 } 585 586 static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres) 587 { 588 struct ocfs2_file_private *fp = lockres->l_priv; 589 590 return OCFS2_SB(fp->fp_file->f_mapping->host->i_sb); 591 } 592 593 static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres) 594 { 595 __be64 inode_blkno_be; 596 597 memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], 598 sizeof(__be64)); 599 600 return be64_to_cpu(inode_blkno_be); 601 } 602 603 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres) 604 { 605 struct ocfs2_dentry_lock *dl = lockres->l_priv; 606 607 return OCFS2_SB(dl->dl_inode->i_sb); 608 } 609 610 void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl, 611 u64 parent, struct inode *inode) 612 { 613 int len; 614 u64 inode_blkno = OCFS2_I(inode)->ip_blkno; 615 __be64 inode_blkno_be = cpu_to_be64(inode_blkno); 616 struct ocfs2_lock_res *lockres = &dl->dl_lockres; 617 618 ocfs2_lock_res_init_once(lockres); 619 620 /* 621 * Unfortunately, the standard lock naming scheme won't work 622 * here because we have two 16 byte values to use. Instead, 623 * we'll stuff the inode number as a binary value. We still 624 * want error prints to show something without garbling the 625 * display, so drop a null byte in there before the inode 626 * number. A future version of OCFS2 will likely use all 627 * binary lock names. The stringified names have been a 628 * tremendous aid in debugging, but now that the debugfs 629 * interface exists, we can mangle things there if need be. 630 * 631 * NOTE: We also drop the standard "pad" value (the total lock 632 * name size stays the same though - the last part is all 633 * zeros due to the memset in ocfs2_lock_res_init_once() 634 */ 635 len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START, 636 "%c%016llx", 637 ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY), 638 (long long)parent); 639 640 BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1)); 641 642 memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be, 643 sizeof(__be64)); 644 645 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres, 646 OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops, 647 dl); 648 } 649 650 static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res, 651 struct ocfs2_super *osb) 652 { 653 /* Superblock lockres doesn't come from a slab so we call init 654 * once on it manually. */ 655 ocfs2_lock_res_init_once(res); 656 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO, 657 0, res->l_name); 658 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER, 659 &ocfs2_super_lops, osb); 660 } 661 662 static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res, 663 struct ocfs2_super *osb) 664 { 665 /* Rename lockres doesn't come from a slab so we call init 666 * once on it manually. */ 667 ocfs2_lock_res_init_once(res); 668 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name); 669 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME, 670 &ocfs2_rename_lops, osb); 671 } 672 673 static void ocfs2_nfs_sync_lock_res_init(struct ocfs2_lock_res *res, 674 struct ocfs2_super *osb) 675 { 676 /* nfs_sync lockres doesn't come from a slab so we call init 677 * once on it manually. */ 678 ocfs2_lock_res_init_once(res); 679 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_NFS_SYNC, 0, 0, res->l_name); 680 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_NFS_SYNC, 681 &ocfs2_nfs_sync_lops, osb); 682 } 683 684 static void ocfs2_orphan_scan_lock_res_init(struct ocfs2_lock_res *res, 685 struct ocfs2_super *osb) 686 { 687 ocfs2_lock_res_init_once(res); 688 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_ORPHAN_SCAN, 0, 0, res->l_name); 689 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_ORPHAN_SCAN, 690 &ocfs2_orphan_scan_lops, osb); 691 } 692 693 void ocfs2_file_lock_res_init(struct ocfs2_lock_res *lockres, 694 struct ocfs2_file_private *fp) 695 { 696 struct inode *inode = fp->fp_file->f_mapping->host; 697 struct ocfs2_inode_info *oi = OCFS2_I(inode); 698 699 ocfs2_lock_res_init_once(lockres); 700 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_FLOCK, oi->ip_blkno, 701 inode->i_generation, lockres->l_name); 702 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres, 703 OCFS2_LOCK_TYPE_FLOCK, &ocfs2_flock_lops, 704 fp); 705 lockres->l_flags |= OCFS2_LOCK_NOCACHE; 706 } 707 708 void ocfs2_qinfo_lock_res_init(struct ocfs2_lock_res *lockres, 709 struct ocfs2_mem_dqinfo *info) 710 { 711 ocfs2_lock_res_init_once(lockres); 712 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_QINFO, info->dqi_gi.dqi_type, 713 0, lockres->l_name); 714 ocfs2_lock_res_init_common(OCFS2_SB(info->dqi_gi.dqi_sb), lockres, 715 OCFS2_LOCK_TYPE_QINFO, &ocfs2_qinfo_lops, 716 info); 717 } 718 719 void ocfs2_refcount_lock_res_init(struct ocfs2_lock_res *lockres, 720 struct ocfs2_super *osb, u64 ref_blkno, 721 unsigned int generation) 722 { 723 ocfs2_lock_res_init_once(lockres); 724 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_REFCOUNT, ref_blkno, 725 generation, lockres->l_name); 726 ocfs2_lock_res_init_common(osb, lockres, OCFS2_LOCK_TYPE_REFCOUNT, 727 &ocfs2_refcount_block_lops, osb); 728 } 729 730 void ocfs2_lock_res_free(struct ocfs2_lock_res *res) 731 { 732 mlog_entry_void(); 733 734 if (!(res->l_flags & OCFS2_LOCK_INITIALIZED)) 735 return; 736 737 ocfs2_remove_lockres_tracking(res); 738 739 mlog_bug_on_msg(!list_empty(&res->l_blocked_list), 740 "Lockres %s is on the blocked list\n", 741 res->l_name); 742 mlog_bug_on_msg(!list_empty(&res->l_mask_waiters), 743 "Lockres %s has mask waiters pending\n", 744 res->l_name); 745 mlog_bug_on_msg(spin_is_locked(&res->l_lock), 746 "Lockres %s is locked\n", 747 res->l_name); 748 mlog_bug_on_msg(res->l_ro_holders, 749 "Lockres %s has %u ro holders\n", 750 res->l_name, res->l_ro_holders); 751 mlog_bug_on_msg(res->l_ex_holders, 752 "Lockres %s has %u ex holders\n", 753 res->l_name, res->l_ex_holders); 754 755 /* Need to clear out the lock status block for the dlm */ 756 memset(&res->l_lksb, 0, sizeof(res->l_lksb)); 757 758 res->l_flags = 0UL; 759 mlog_exit_void(); 760 } 761 762 static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres, 763 int level) 764 { 765 mlog_entry_void(); 766 767 BUG_ON(!lockres); 768 769 switch(level) { 770 case DLM_LOCK_EX: 771 lockres->l_ex_holders++; 772 break; 773 case DLM_LOCK_PR: 774 lockres->l_ro_holders++; 775 break; 776 default: 777 BUG(); 778 } 779 780 mlog_exit_void(); 781 } 782 783 static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres, 784 int level) 785 { 786 mlog_entry_void(); 787 788 BUG_ON(!lockres); 789 790 switch(level) { 791 case DLM_LOCK_EX: 792 BUG_ON(!lockres->l_ex_holders); 793 lockres->l_ex_holders--; 794 break; 795 case DLM_LOCK_PR: 796 BUG_ON(!lockres->l_ro_holders); 797 lockres->l_ro_holders--; 798 break; 799 default: 800 BUG(); 801 } 802 mlog_exit_void(); 803 } 804 805 /* WARNING: This function lives in a world where the only three lock 806 * levels are EX, PR, and NL. It *will* have to be adjusted when more 807 * lock types are added. */ 808 static inline int ocfs2_highest_compat_lock_level(int level) 809 { 810 int new_level = DLM_LOCK_EX; 811 812 if (level == DLM_LOCK_EX) 813 new_level = DLM_LOCK_NL; 814 else if (level == DLM_LOCK_PR) 815 new_level = DLM_LOCK_PR; 816 return new_level; 817 } 818 819 static void lockres_set_flags(struct ocfs2_lock_res *lockres, 820 unsigned long newflags) 821 { 822 struct ocfs2_mask_waiter *mw, *tmp; 823 824 assert_spin_locked(&lockres->l_lock); 825 826 lockres->l_flags = newflags; 827 828 list_for_each_entry_safe(mw, tmp, &lockres->l_mask_waiters, mw_item) { 829 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal) 830 continue; 831 832 list_del_init(&mw->mw_item); 833 mw->mw_status = 0; 834 complete(&mw->mw_complete); 835 } 836 } 837 static void lockres_or_flags(struct ocfs2_lock_res *lockres, unsigned long or) 838 { 839 lockres_set_flags(lockres, lockres->l_flags | or); 840 } 841 static void lockres_clear_flags(struct ocfs2_lock_res *lockres, 842 unsigned long clear) 843 { 844 lockres_set_flags(lockres, lockres->l_flags & ~clear); 845 } 846 847 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres) 848 { 849 mlog_entry_void(); 850 851 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 852 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED)); 853 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); 854 BUG_ON(lockres->l_blocking <= DLM_LOCK_NL); 855 856 lockres->l_level = lockres->l_requested; 857 if (lockres->l_level <= 858 ocfs2_highest_compat_lock_level(lockres->l_blocking)) { 859 lockres->l_blocking = DLM_LOCK_NL; 860 lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED); 861 } 862 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 863 864 mlog_exit_void(); 865 } 866 867 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres) 868 { 869 mlog_entry_void(); 870 871 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 872 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED)); 873 874 /* Convert from RO to EX doesn't really need anything as our 875 * information is already up to data. Convert from NL to 876 * *anything* however should mark ourselves as needing an 877 * update */ 878 if (lockres->l_level == DLM_LOCK_NL && 879 lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 880 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 881 882 lockres->l_level = lockres->l_requested; 883 884 /* 885 * We set the OCFS2_LOCK_UPCONVERT_FINISHING flag before clearing 886 * the OCFS2_LOCK_BUSY flag to prevent the dc thread from 887 * downconverting the lock before the upconvert has fully completed. 888 */ 889 lockres_or_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING); 890 891 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 892 893 mlog_exit_void(); 894 } 895 896 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres) 897 { 898 mlog_entry_void(); 899 900 BUG_ON((!(lockres->l_flags & OCFS2_LOCK_BUSY))); 901 BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED); 902 903 if (lockres->l_requested > DLM_LOCK_NL && 904 !(lockres->l_flags & OCFS2_LOCK_LOCAL) && 905 lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 906 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 907 908 lockres->l_level = lockres->l_requested; 909 lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED); 910 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 911 912 mlog_exit_void(); 913 } 914 915 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, 916 int level) 917 { 918 int needs_downconvert = 0; 919 mlog_entry_void(); 920 921 assert_spin_locked(&lockres->l_lock); 922 923 if (level > lockres->l_blocking) { 924 /* only schedule a downconvert if we haven't already scheduled 925 * one that goes low enough to satisfy the level we're 926 * blocking. this also catches the case where we get 927 * duplicate BASTs */ 928 if (ocfs2_highest_compat_lock_level(level) < 929 ocfs2_highest_compat_lock_level(lockres->l_blocking)) 930 needs_downconvert = 1; 931 932 lockres->l_blocking = level; 933 } 934 935 mlog(ML_BASTS, "lockres %s, block %d, level %d, l_block %d, dwn %d\n", 936 lockres->l_name, level, lockres->l_level, lockres->l_blocking, 937 needs_downconvert); 938 939 if (needs_downconvert) 940 lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED); 941 942 mlog_exit(needs_downconvert); 943 return needs_downconvert; 944 } 945 946 /* 947 * OCFS2_LOCK_PENDING and l_pending_gen. 948 * 949 * Why does OCFS2_LOCK_PENDING exist? To close a race between setting 950 * OCFS2_LOCK_BUSY and calling ocfs2_dlm_lock(). See ocfs2_unblock_lock() 951 * for more details on the race. 952 * 953 * OCFS2_LOCK_PENDING closes the race quite nicely. However, it introduces 954 * a race on itself. In o2dlm, we can get the ast before ocfs2_dlm_lock() 955 * returns. The ast clears OCFS2_LOCK_BUSY, and must therefore clear 956 * OCFS2_LOCK_PENDING at the same time. When ocfs2_dlm_lock() returns, 957 * the caller is going to try to clear PENDING again. If nothing else is 958 * happening, __lockres_clear_pending() sees PENDING is unset and does 959 * nothing. 960 * 961 * But what if another path (eg downconvert thread) has just started a 962 * new locking action? The other path has re-set PENDING. Our path 963 * cannot clear PENDING, because that will re-open the original race 964 * window. 965 * 966 * [Example] 967 * 968 * ocfs2_meta_lock() 969 * ocfs2_cluster_lock() 970 * set BUSY 971 * set PENDING 972 * drop l_lock 973 * ocfs2_dlm_lock() 974 * ocfs2_locking_ast() ocfs2_downconvert_thread() 975 * clear PENDING ocfs2_unblock_lock() 976 * take_l_lock 977 * !BUSY 978 * ocfs2_prepare_downconvert() 979 * set BUSY 980 * set PENDING 981 * drop l_lock 982 * take l_lock 983 * clear PENDING 984 * drop l_lock 985 * <window> 986 * ocfs2_dlm_lock() 987 * 988 * So as you can see, we now have a window where l_lock is not held, 989 * PENDING is not set, and ocfs2_dlm_lock() has not been called. 990 * 991 * The core problem is that ocfs2_cluster_lock() has cleared the PENDING 992 * set by ocfs2_prepare_downconvert(). That wasn't nice. 993 * 994 * To solve this we introduce l_pending_gen. A call to 995 * lockres_clear_pending() will only do so when it is passed a generation 996 * number that matches the lockres. lockres_set_pending() will return the 997 * current generation number. When ocfs2_cluster_lock() goes to clear 998 * PENDING, it passes the generation it got from set_pending(). In our 999 * example above, the generation numbers will *not* match. Thus, 1000 * ocfs2_cluster_lock() will not clear the PENDING set by 1001 * ocfs2_prepare_downconvert(). 1002 */ 1003 1004 /* Unlocked version for ocfs2_locking_ast() */ 1005 static void __lockres_clear_pending(struct ocfs2_lock_res *lockres, 1006 unsigned int generation, 1007 struct ocfs2_super *osb) 1008 { 1009 assert_spin_locked(&lockres->l_lock); 1010 1011 /* 1012 * The ast and locking functions can race us here. The winner 1013 * will clear pending, the loser will not. 1014 */ 1015 if (!(lockres->l_flags & OCFS2_LOCK_PENDING) || 1016 (lockres->l_pending_gen != generation)) 1017 return; 1018 1019 lockres_clear_flags(lockres, OCFS2_LOCK_PENDING); 1020 lockres->l_pending_gen++; 1021 1022 /* 1023 * The downconvert thread may have skipped us because we 1024 * were PENDING. Wake it up. 1025 */ 1026 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) 1027 ocfs2_wake_downconvert_thread(osb); 1028 } 1029 1030 /* Locked version for callers of ocfs2_dlm_lock() */ 1031 static void lockres_clear_pending(struct ocfs2_lock_res *lockres, 1032 unsigned int generation, 1033 struct ocfs2_super *osb) 1034 { 1035 unsigned long flags; 1036 1037 spin_lock_irqsave(&lockres->l_lock, flags); 1038 __lockres_clear_pending(lockres, generation, osb); 1039 spin_unlock_irqrestore(&lockres->l_lock, flags); 1040 } 1041 1042 static unsigned int lockres_set_pending(struct ocfs2_lock_res *lockres) 1043 { 1044 assert_spin_locked(&lockres->l_lock); 1045 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 1046 1047 lockres_or_flags(lockres, OCFS2_LOCK_PENDING); 1048 1049 return lockres->l_pending_gen; 1050 } 1051 1052 static void ocfs2_blocking_ast(struct ocfs2_dlm_lksb *lksb, int level) 1053 { 1054 struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb); 1055 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 1056 int needs_downconvert; 1057 unsigned long flags; 1058 1059 BUG_ON(level <= DLM_LOCK_NL); 1060 1061 mlog(ML_BASTS, "BAST fired for lockres %s, blocking %d, level %d, " 1062 "type %s\n", lockres->l_name, level, lockres->l_level, 1063 ocfs2_lock_type_string(lockres->l_type)); 1064 1065 /* 1066 * We can skip the bast for locks which don't enable caching - 1067 * they'll be dropped at the earliest possible time anyway. 1068 */ 1069 if (lockres->l_flags & OCFS2_LOCK_NOCACHE) 1070 return; 1071 1072 spin_lock_irqsave(&lockres->l_lock, flags); 1073 needs_downconvert = ocfs2_generic_handle_bast(lockres, level); 1074 if (needs_downconvert) 1075 ocfs2_schedule_blocked_lock(osb, lockres); 1076 spin_unlock_irqrestore(&lockres->l_lock, flags); 1077 1078 wake_up(&lockres->l_event); 1079 1080 ocfs2_wake_downconvert_thread(osb); 1081 } 1082 1083 static void ocfs2_locking_ast(struct ocfs2_dlm_lksb *lksb) 1084 { 1085 struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb); 1086 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 1087 unsigned long flags; 1088 int status; 1089 1090 spin_lock_irqsave(&lockres->l_lock, flags); 1091 1092 status = ocfs2_dlm_lock_status(&lockres->l_lksb); 1093 1094 if (status == -EAGAIN) { 1095 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 1096 goto out; 1097 } 1098 1099 if (status) { 1100 mlog(ML_ERROR, "lockres %s: lksb status value of %d!\n", 1101 lockres->l_name, status); 1102 spin_unlock_irqrestore(&lockres->l_lock, flags); 1103 return; 1104 } 1105 1106 mlog(ML_BASTS, "AST fired for lockres %s, action %d, unlock %d, " 1107 "level %d => %d\n", lockres->l_name, lockres->l_action, 1108 lockres->l_unlock_action, lockres->l_level, lockres->l_requested); 1109 1110 switch(lockres->l_action) { 1111 case OCFS2_AST_ATTACH: 1112 ocfs2_generic_handle_attach_action(lockres); 1113 lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL); 1114 break; 1115 case OCFS2_AST_CONVERT: 1116 ocfs2_generic_handle_convert_action(lockres); 1117 break; 1118 case OCFS2_AST_DOWNCONVERT: 1119 ocfs2_generic_handle_downconvert_action(lockres); 1120 break; 1121 default: 1122 mlog(ML_ERROR, "lockres %s: AST fired with invalid action: %u, " 1123 "flags 0x%lx, unlock: %u\n", 1124 lockres->l_name, lockres->l_action, lockres->l_flags, 1125 lockres->l_unlock_action); 1126 BUG(); 1127 } 1128 out: 1129 /* set it to something invalid so if we get called again we 1130 * can catch it. */ 1131 lockres->l_action = OCFS2_AST_INVALID; 1132 1133 /* Did we try to cancel this lock? Clear that state */ 1134 if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) 1135 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 1136 1137 /* 1138 * We may have beaten the locking functions here. We certainly 1139 * know that dlm_lock() has been called :-) 1140 * Because we can't have two lock calls in flight at once, we 1141 * can use lockres->l_pending_gen. 1142 */ 1143 __lockres_clear_pending(lockres, lockres->l_pending_gen, osb); 1144 1145 wake_up(&lockres->l_event); 1146 spin_unlock_irqrestore(&lockres->l_lock, flags); 1147 } 1148 1149 static void ocfs2_unlock_ast(struct ocfs2_dlm_lksb *lksb, int error) 1150 { 1151 struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb); 1152 unsigned long flags; 1153 1154 mlog_entry_void(); 1155 1156 mlog(ML_BASTS, "UNLOCK AST fired for lockres %s, action = %d\n", 1157 lockres->l_name, lockres->l_unlock_action); 1158 1159 spin_lock_irqsave(&lockres->l_lock, flags); 1160 if (error) { 1161 mlog(ML_ERROR, "Dlm passes error %d for lock %s, " 1162 "unlock_action %d\n", error, lockres->l_name, 1163 lockres->l_unlock_action); 1164 spin_unlock_irqrestore(&lockres->l_lock, flags); 1165 mlog_exit_void(); 1166 return; 1167 } 1168 1169 switch(lockres->l_unlock_action) { 1170 case OCFS2_UNLOCK_CANCEL_CONVERT: 1171 mlog(0, "Cancel convert success for %s\n", lockres->l_name); 1172 lockres->l_action = OCFS2_AST_INVALID; 1173 /* Downconvert thread may have requeued this lock, we 1174 * need to wake it. */ 1175 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) 1176 ocfs2_wake_downconvert_thread(ocfs2_get_lockres_osb(lockres)); 1177 break; 1178 case OCFS2_UNLOCK_DROP_LOCK: 1179 lockres->l_level = DLM_LOCK_IV; 1180 break; 1181 default: 1182 BUG(); 1183 } 1184 1185 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 1186 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 1187 wake_up(&lockres->l_event); 1188 spin_unlock_irqrestore(&lockres->l_lock, flags); 1189 1190 mlog_exit_void(); 1191 } 1192 1193 /* 1194 * This is the filesystem locking protocol. It provides the lock handling 1195 * hooks for the underlying DLM. It has a maximum version number. 1196 * The version number allows interoperability with systems running at 1197 * the same major number and an equal or smaller minor number. 1198 * 1199 * Whenever the filesystem does new things with locks (adds or removes a 1200 * lock, orders them differently, does different things underneath a lock), 1201 * the version must be changed. The protocol is negotiated when joining 1202 * the dlm domain. A node may join the domain if its major version is 1203 * identical to all other nodes and its minor version is greater than 1204 * or equal to all other nodes. When its minor version is greater than 1205 * the other nodes, it will run at the minor version specified by the 1206 * other nodes. 1207 * 1208 * If a locking change is made that will not be compatible with older 1209 * versions, the major number must be increased and the minor version set 1210 * to zero. If a change merely adds a behavior that can be disabled when 1211 * speaking to older versions, the minor version must be increased. If a 1212 * change adds a fully backwards compatible change (eg, LVB changes that 1213 * are just ignored by older versions), the version does not need to be 1214 * updated. 1215 */ 1216 static struct ocfs2_locking_protocol lproto = { 1217 .lp_max_version = { 1218 .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR, 1219 .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR, 1220 }, 1221 .lp_lock_ast = ocfs2_locking_ast, 1222 .lp_blocking_ast = ocfs2_blocking_ast, 1223 .lp_unlock_ast = ocfs2_unlock_ast, 1224 }; 1225 1226 void ocfs2_set_locking_protocol(void) 1227 { 1228 ocfs2_stack_glue_set_max_proto_version(&lproto.lp_max_version); 1229 } 1230 1231 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres, 1232 int convert) 1233 { 1234 unsigned long flags; 1235 1236 mlog_entry_void(); 1237 spin_lock_irqsave(&lockres->l_lock, flags); 1238 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 1239 lockres_clear_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING); 1240 if (convert) 1241 lockres->l_action = OCFS2_AST_INVALID; 1242 else 1243 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 1244 spin_unlock_irqrestore(&lockres->l_lock, flags); 1245 1246 wake_up(&lockres->l_event); 1247 mlog_exit_void(); 1248 } 1249 1250 /* Note: If we detect another process working on the lock (i.e., 1251 * OCFS2_LOCK_BUSY), we'll bail out returning 0. It's up to the caller 1252 * to do the right thing in that case. 1253 */ 1254 static int ocfs2_lock_create(struct ocfs2_super *osb, 1255 struct ocfs2_lock_res *lockres, 1256 int level, 1257 u32 dlm_flags) 1258 { 1259 int ret = 0; 1260 unsigned long flags; 1261 unsigned int gen; 1262 1263 mlog_entry_void(); 1264 1265 mlog(0, "lock %s, level = %d, flags = %u\n", lockres->l_name, level, 1266 dlm_flags); 1267 1268 spin_lock_irqsave(&lockres->l_lock, flags); 1269 if ((lockres->l_flags & OCFS2_LOCK_ATTACHED) || 1270 (lockres->l_flags & OCFS2_LOCK_BUSY)) { 1271 spin_unlock_irqrestore(&lockres->l_lock, flags); 1272 goto bail; 1273 } 1274 1275 lockres->l_action = OCFS2_AST_ATTACH; 1276 lockres->l_requested = level; 1277 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 1278 gen = lockres_set_pending(lockres); 1279 spin_unlock_irqrestore(&lockres->l_lock, flags); 1280 1281 ret = ocfs2_dlm_lock(osb->cconn, 1282 level, 1283 &lockres->l_lksb, 1284 dlm_flags, 1285 lockres->l_name, 1286 OCFS2_LOCK_ID_MAX_LEN - 1); 1287 lockres_clear_pending(lockres, gen, osb); 1288 if (ret) { 1289 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 1290 ocfs2_recover_from_dlm_error(lockres, 1); 1291 } 1292 1293 mlog(0, "lock %s, return from ocfs2_dlm_lock\n", lockres->l_name); 1294 1295 bail: 1296 mlog_exit(ret); 1297 return ret; 1298 } 1299 1300 static inline int ocfs2_check_wait_flag(struct ocfs2_lock_res *lockres, 1301 int flag) 1302 { 1303 unsigned long flags; 1304 int ret; 1305 1306 spin_lock_irqsave(&lockres->l_lock, flags); 1307 ret = lockres->l_flags & flag; 1308 spin_unlock_irqrestore(&lockres->l_lock, flags); 1309 1310 return ret; 1311 } 1312 1313 static inline void ocfs2_wait_on_busy_lock(struct ocfs2_lock_res *lockres) 1314 1315 { 1316 wait_event(lockres->l_event, 1317 !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_BUSY)); 1318 } 1319 1320 static inline void ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res *lockres) 1321 1322 { 1323 wait_event(lockres->l_event, 1324 !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_REFRESHING)); 1325 } 1326 1327 /* predict what lock level we'll be dropping down to on behalf 1328 * of another node, and return true if the currently wanted 1329 * level will be compatible with it. */ 1330 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres, 1331 int wanted) 1332 { 1333 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); 1334 1335 return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking); 1336 } 1337 1338 static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw) 1339 { 1340 INIT_LIST_HEAD(&mw->mw_item); 1341 init_completion(&mw->mw_complete); 1342 ocfs2_init_start_time(mw); 1343 } 1344 1345 static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw) 1346 { 1347 wait_for_completion(&mw->mw_complete); 1348 /* Re-arm the completion in case we want to wait on it again */ 1349 INIT_COMPLETION(mw->mw_complete); 1350 return mw->mw_status; 1351 } 1352 1353 static void lockres_add_mask_waiter(struct ocfs2_lock_res *lockres, 1354 struct ocfs2_mask_waiter *mw, 1355 unsigned long mask, 1356 unsigned long goal) 1357 { 1358 BUG_ON(!list_empty(&mw->mw_item)); 1359 1360 assert_spin_locked(&lockres->l_lock); 1361 1362 list_add_tail(&mw->mw_item, &lockres->l_mask_waiters); 1363 mw->mw_mask = mask; 1364 mw->mw_goal = goal; 1365 } 1366 1367 /* returns 0 if the mw that was removed was already satisfied, -EBUSY 1368 * if the mask still hadn't reached its goal */ 1369 static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres, 1370 struct ocfs2_mask_waiter *mw) 1371 { 1372 unsigned long flags; 1373 int ret = 0; 1374 1375 spin_lock_irqsave(&lockres->l_lock, flags); 1376 if (!list_empty(&mw->mw_item)) { 1377 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal) 1378 ret = -EBUSY; 1379 1380 list_del_init(&mw->mw_item); 1381 init_completion(&mw->mw_complete); 1382 } 1383 spin_unlock_irqrestore(&lockres->l_lock, flags); 1384 1385 return ret; 1386 1387 } 1388 1389 static int ocfs2_wait_for_mask_interruptible(struct ocfs2_mask_waiter *mw, 1390 struct ocfs2_lock_res *lockres) 1391 { 1392 int ret; 1393 1394 ret = wait_for_completion_interruptible(&mw->mw_complete); 1395 if (ret) 1396 lockres_remove_mask_waiter(lockres, mw); 1397 else 1398 ret = mw->mw_status; 1399 /* Re-arm the completion in case we want to wait on it again */ 1400 INIT_COMPLETION(mw->mw_complete); 1401 return ret; 1402 } 1403 1404 static int __ocfs2_cluster_lock(struct ocfs2_super *osb, 1405 struct ocfs2_lock_res *lockres, 1406 int level, 1407 u32 lkm_flags, 1408 int arg_flags, 1409 int l_subclass, 1410 unsigned long caller_ip) 1411 { 1412 struct ocfs2_mask_waiter mw; 1413 int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR); 1414 int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */ 1415 unsigned long flags; 1416 unsigned int gen; 1417 int noqueue_attempted = 0; 1418 1419 mlog_entry_void(); 1420 1421 ocfs2_init_mask_waiter(&mw); 1422 1423 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) 1424 lkm_flags |= DLM_LKF_VALBLK; 1425 1426 again: 1427 wait = 0; 1428 1429 spin_lock_irqsave(&lockres->l_lock, flags); 1430 1431 if (catch_signals && signal_pending(current)) { 1432 ret = -ERESTARTSYS; 1433 goto unlock; 1434 } 1435 1436 mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING, 1437 "Cluster lock called on freeing lockres %s! flags " 1438 "0x%lx\n", lockres->l_name, lockres->l_flags); 1439 1440 /* We only compare against the currently granted level 1441 * here. If the lock is blocked waiting on a downconvert, 1442 * we'll get caught below. */ 1443 if (lockres->l_flags & OCFS2_LOCK_BUSY && 1444 level > lockres->l_level) { 1445 /* is someone sitting in dlm_lock? If so, wait on 1446 * them. */ 1447 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1448 wait = 1; 1449 goto unlock; 1450 } 1451 1452 if (lockres->l_flags & OCFS2_LOCK_UPCONVERT_FINISHING) { 1453 /* 1454 * We've upconverted. If the lock now has a level we can 1455 * work with, we take it. If, however, the lock is not at the 1456 * required level, we go thru the full cycle. One way this could 1457 * happen is if a process requesting an upconvert to PR is 1458 * closely followed by another requesting upconvert to an EX. 1459 * If the process requesting EX lands here, we want it to 1460 * continue attempting to upconvert and let the process 1461 * requesting PR take the lock. 1462 * If multiple processes request upconvert to PR, the first one 1463 * here will take the lock. The others will have to go thru the 1464 * OCFS2_LOCK_BLOCKED check to ensure that there is no pending 1465 * downconvert request. 1466 */ 1467 if (level <= lockres->l_level) 1468 goto update_holders; 1469 } 1470 1471 if (lockres->l_flags & OCFS2_LOCK_BLOCKED && 1472 !ocfs2_may_continue_on_blocked_lock(lockres, level)) { 1473 /* is the lock is currently blocked on behalf of 1474 * another node */ 1475 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BLOCKED, 0); 1476 wait = 1; 1477 goto unlock; 1478 } 1479 1480 if (level > lockres->l_level) { 1481 if (noqueue_attempted > 0) { 1482 ret = -EAGAIN; 1483 goto unlock; 1484 } 1485 if (lkm_flags & DLM_LKF_NOQUEUE) 1486 noqueue_attempted = 1; 1487 1488 if (lockres->l_action != OCFS2_AST_INVALID) 1489 mlog(ML_ERROR, "lockres %s has action %u pending\n", 1490 lockres->l_name, lockres->l_action); 1491 1492 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 1493 lockres->l_action = OCFS2_AST_ATTACH; 1494 lkm_flags &= ~DLM_LKF_CONVERT; 1495 } else { 1496 lockres->l_action = OCFS2_AST_CONVERT; 1497 lkm_flags |= DLM_LKF_CONVERT; 1498 } 1499 1500 lockres->l_requested = level; 1501 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 1502 gen = lockres_set_pending(lockres); 1503 spin_unlock_irqrestore(&lockres->l_lock, flags); 1504 1505 BUG_ON(level == DLM_LOCK_IV); 1506 BUG_ON(level == DLM_LOCK_NL); 1507 1508 mlog(ML_BASTS, "lockres %s, convert from %d to %d\n", 1509 lockres->l_name, lockres->l_level, level); 1510 1511 /* call dlm_lock to upgrade lock now */ 1512 ret = ocfs2_dlm_lock(osb->cconn, 1513 level, 1514 &lockres->l_lksb, 1515 lkm_flags, 1516 lockres->l_name, 1517 OCFS2_LOCK_ID_MAX_LEN - 1); 1518 lockres_clear_pending(lockres, gen, osb); 1519 if (ret) { 1520 if (!(lkm_flags & DLM_LKF_NOQUEUE) || 1521 (ret != -EAGAIN)) { 1522 ocfs2_log_dlm_error("ocfs2_dlm_lock", 1523 ret, lockres); 1524 } 1525 ocfs2_recover_from_dlm_error(lockres, 1); 1526 goto out; 1527 } 1528 1529 mlog(0, "lock %s, successful return from ocfs2_dlm_lock\n", 1530 lockres->l_name); 1531 1532 /* At this point we've gone inside the dlm and need to 1533 * complete our work regardless. */ 1534 catch_signals = 0; 1535 1536 /* wait for busy to clear and carry on */ 1537 goto again; 1538 } 1539 1540 update_holders: 1541 /* Ok, if we get here then we're good to go. */ 1542 ocfs2_inc_holders(lockres, level); 1543 1544 ret = 0; 1545 unlock: 1546 lockres_clear_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING); 1547 1548 spin_unlock_irqrestore(&lockres->l_lock, flags); 1549 out: 1550 /* 1551 * This is helping work around a lock inversion between the page lock 1552 * and dlm locks. One path holds the page lock while calling aops 1553 * which block acquiring dlm locks. The voting thread holds dlm 1554 * locks while acquiring page locks while down converting data locks. 1555 * This block is helping an aop path notice the inversion and back 1556 * off to unlock its page lock before trying the dlm lock again. 1557 */ 1558 if (wait && arg_flags & OCFS2_LOCK_NONBLOCK && 1559 mw.mw_mask & (OCFS2_LOCK_BUSY|OCFS2_LOCK_BLOCKED)) { 1560 wait = 0; 1561 if (lockres_remove_mask_waiter(lockres, &mw)) 1562 ret = -EAGAIN; 1563 else 1564 goto again; 1565 } 1566 if (wait) { 1567 ret = ocfs2_wait_for_mask(&mw); 1568 if (ret == 0) 1569 goto again; 1570 mlog_errno(ret); 1571 } 1572 ocfs2_update_lock_stats(lockres, level, &mw, ret); 1573 1574 #ifdef CONFIG_DEBUG_LOCK_ALLOC 1575 if (!ret && lockres->l_lockdep_map.key != NULL) { 1576 if (level == DLM_LOCK_PR) 1577 rwsem_acquire_read(&lockres->l_lockdep_map, l_subclass, 1578 !!(arg_flags & OCFS2_META_LOCK_NOQUEUE), 1579 caller_ip); 1580 else 1581 rwsem_acquire(&lockres->l_lockdep_map, l_subclass, 1582 !!(arg_flags & OCFS2_META_LOCK_NOQUEUE), 1583 caller_ip); 1584 } 1585 #endif 1586 mlog_exit(ret); 1587 return ret; 1588 } 1589 1590 static inline int ocfs2_cluster_lock(struct ocfs2_super *osb, 1591 struct ocfs2_lock_res *lockres, 1592 int level, 1593 u32 lkm_flags, 1594 int arg_flags) 1595 { 1596 return __ocfs2_cluster_lock(osb, lockres, level, lkm_flags, arg_flags, 1597 0, _RET_IP_); 1598 } 1599 1600 1601 static void __ocfs2_cluster_unlock(struct ocfs2_super *osb, 1602 struct ocfs2_lock_res *lockres, 1603 int level, 1604 unsigned long caller_ip) 1605 { 1606 unsigned long flags; 1607 1608 mlog_entry_void(); 1609 spin_lock_irqsave(&lockres->l_lock, flags); 1610 ocfs2_dec_holders(lockres, level); 1611 ocfs2_downconvert_on_unlock(osb, lockres); 1612 spin_unlock_irqrestore(&lockres->l_lock, flags); 1613 #ifdef CONFIG_DEBUG_LOCK_ALLOC 1614 if (lockres->l_lockdep_map.key != NULL) 1615 rwsem_release(&lockres->l_lockdep_map, 1, caller_ip); 1616 #endif 1617 mlog_exit_void(); 1618 } 1619 1620 static int ocfs2_create_new_lock(struct ocfs2_super *osb, 1621 struct ocfs2_lock_res *lockres, 1622 int ex, 1623 int local) 1624 { 1625 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 1626 unsigned long flags; 1627 u32 lkm_flags = local ? DLM_LKF_LOCAL : 0; 1628 1629 spin_lock_irqsave(&lockres->l_lock, flags); 1630 BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED); 1631 lockres_or_flags(lockres, OCFS2_LOCK_LOCAL); 1632 spin_unlock_irqrestore(&lockres->l_lock, flags); 1633 1634 return ocfs2_lock_create(osb, lockres, level, lkm_flags); 1635 } 1636 1637 /* Grants us an EX lock on the data and metadata resources, skipping 1638 * the normal cluster directory lookup. Use this ONLY on newly created 1639 * inodes which other nodes can't possibly see, and which haven't been 1640 * hashed in the inode hash yet. This can give us a good performance 1641 * increase as it'll skip the network broadcast normally associated 1642 * with creating a new lock resource. */ 1643 int ocfs2_create_new_inode_locks(struct inode *inode) 1644 { 1645 int ret; 1646 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1647 1648 BUG_ON(!inode); 1649 BUG_ON(!ocfs2_inode_is_new(inode)); 1650 1651 mlog_entry_void(); 1652 1653 mlog(0, "Inode %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno); 1654 1655 /* NOTE: That we don't increment any of the holder counts, nor 1656 * do we add anything to a journal handle. Since this is 1657 * supposed to be a new inode which the cluster doesn't know 1658 * about yet, there is no need to. As far as the LVB handling 1659 * is concerned, this is basically like acquiring an EX lock 1660 * on a resource which has an invalid one -- we'll set it 1661 * valid when we release the EX. */ 1662 1663 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1, 1); 1664 if (ret) { 1665 mlog_errno(ret); 1666 goto bail; 1667 } 1668 1669 /* 1670 * We don't want to use DLM_LKF_LOCAL on a meta data lock as they 1671 * don't use a generation in their lock names. 1672 */ 1673 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_inode_lockres, 1, 0); 1674 if (ret) { 1675 mlog_errno(ret); 1676 goto bail; 1677 } 1678 1679 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_open_lockres, 0, 0); 1680 if (ret) { 1681 mlog_errno(ret); 1682 goto bail; 1683 } 1684 1685 bail: 1686 mlog_exit(ret); 1687 return ret; 1688 } 1689 1690 int ocfs2_rw_lock(struct inode *inode, int write) 1691 { 1692 int status, level; 1693 struct ocfs2_lock_res *lockres; 1694 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1695 1696 BUG_ON(!inode); 1697 1698 mlog_entry_void(); 1699 1700 mlog(0, "inode %llu take %s RW lock\n", 1701 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1702 write ? "EXMODE" : "PRMODE"); 1703 1704 if (ocfs2_mount_local(osb)) { 1705 mlog_exit(0); 1706 return 0; 1707 } 1708 1709 lockres = &OCFS2_I(inode)->ip_rw_lockres; 1710 1711 level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1712 1713 status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0, 1714 0); 1715 if (status < 0) 1716 mlog_errno(status); 1717 1718 mlog_exit(status); 1719 return status; 1720 } 1721 1722 void ocfs2_rw_unlock(struct inode *inode, int write) 1723 { 1724 int level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1725 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres; 1726 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1727 1728 mlog_entry_void(); 1729 1730 mlog(0, "inode %llu drop %s RW lock\n", 1731 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1732 write ? "EXMODE" : "PRMODE"); 1733 1734 if (!ocfs2_mount_local(osb)) 1735 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level); 1736 1737 mlog_exit_void(); 1738 } 1739 1740 /* 1741 * ocfs2_open_lock always get PR mode lock. 1742 */ 1743 int ocfs2_open_lock(struct inode *inode) 1744 { 1745 int status = 0; 1746 struct ocfs2_lock_res *lockres; 1747 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1748 1749 BUG_ON(!inode); 1750 1751 mlog_entry_void(); 1752 1753 mlog(0, "inode %llu take PRMODE open lock\n", 1754 (unsigned long long)OCFS2_I(inode)->ip_blkno); 1755 1756 if (ocfs2_mount_local(osb)) 1757 goto out; 1758 1759 lockres = &OCFS2_I(inode)->ip_open_lockres; 1760 1761 status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, 1762 DLM_LOCK_PR, 0, 0); 1763 if (status < 0) 1764 mlog_errno(status); 1765 1766 out: 1767 mlog_exit(status); 1768 return status; 1769 } 1770 1771 int ocfs2_try_open_lock(struct inode *inode, int write) 1772 { 1773 int status = 0, level; 1774 struct ocfs2_lock_res *lockres; 1775 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1776 1777 BUG_ON(!inode); 1778 1779 mlog_entry_void(); 1780 1781 mlog(0, "inode %llu try to take %s open lock\n", 1782 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1783 write ? "EXMODE" : "PRMODE"); 1784 1785 if (ocfs2_mount_local(osb)) 1786 goto out; 1787 1788 lockres = &OCFS2_I(inode)->ip_open_lockres; 1789 1790 level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1791 1792 /* 1793 * The file system may already holding a PRMODE/EXMODE open lock. 1794 * Since we pass DLM_LKF_NOQUEUE, the request won't block waiting on 1795 * other nodes and the -EAGAIN will indicate to the caller that 1796 * this inode is still in use. 1797 */ 1798 status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, 1799 level, DLM_LKF_NOQUEUE, 0); 1800 1801 out: 1802 mlog_exit(status); 1803 return status; 1804 } 1805 1806 /* 1807 * ocfs2_open_unlock unlock PR and EX mode open locks. 1808 */ 1809 void ocfs2_open_unlock(struct inode *inode) 1810 { 1811 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_open_lockres; 1812 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1813 1814 mlog_entry_void(); 1815 1816 mlog(0, "inode %llu drop open lock\n", 1817 (unsigned long long)OCFS2_I(inode)->ip_blkno); 1818 1819 if (ocfs2_mount_local(osb)) 1820 goto out; 1821 1822 if(lockres->l_ro_holders) 1823 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, 1824 DLM_LOCK_PR); 1825 if(lockres->l_ex_holders) 1826 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, 1827 DLM_LOCK_EX); 1828 1829 out: 1830 mlog_exit_void(); 1831 } 1832 1833 static int ocfs2_flock_handle_signal(struct ocfs2_lock_res *lockres, 1834 int level) 1835 { 1836 int ret; 1837 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 1838 unsigned long flags; 1839 struct ocfs2_mask_waiter mw; 1840 1841 ocfs2_init_mask_waiter(&mw); 1842 1843 retry_cancel: 1844 spin_lock_irqsave(&lockres->l_lock, flags); 1845 if (lockres->l_flags & OCFS2_LOCK_BUSY) { 1846 ret = ocfs2_prepare_cancel_convert(osb, lockres); 1847 if (ret) { 1848 spin_unlock_irqrestore(&lockres->l_lock, flags); 1849 ret = ocfs2_cancel_convert(osb, lockres); 1850 if (ret < 0) { 1851 mlog_errno(ret); 1852 goto out; 1853 } 1854 goto retry_cancel; 1855 } 1856 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1857 spin_unlock_irqrestore(&lockres->l_lock, flags); 1858 1859 ocfs2_wait_for_mask(&mw); 1860 goto retry_cancel; 1861 } 1862 1863 ret = -ERESTARTSYS; 1864 /* 1865 * We may still have gotten the lock, in which case there's no 1866 * point to restarting the syscall. 1867 */ 1868 if (lockres->l_level == level) 1869 ret = 0; 1870 1871 mlog(0, "Cancel returning %d. flags: 0x%lx, level: %d, act: %d\n", ret, 1872 lockres->l_flags, lockres->l_level, lockres->l_action); 1873 1874 spin_unlock_irqrestore(&lockres->l_lock, flags); 1875 1876 out: 1877 return ret; 1878 } 1879 1880 /* 1881 * ocfs2_file_lock() and ocfs2_file_unlock() map to a single pair of 1882 * flock() calls. The locking approach this requires is sufficiently 1883 * different from all other cluster lock types that we implement a 1884 * separate path to the "low-level" dlm calls. In particular: 1885 * 1886 * - No optimization of lock levels is done - we take at exactly 1887 * what's been requested. 1888 * 1889 * - No lock caching is employed. We immediately downconvert to 1890 * no-lock at unlock time. This also means flock locks never go on 1891 * the blocking list). 1892 * 1893 * - Since userspace can trivially deadlock itself with flock, we make 1894 * sure to allow cancellation of a misbehaving applications flock() 1895 * request. 1896 * 1897 * - Access to any flock lockres doesn't require concurrency, so we 1898 * can simplify the code by requiring the caller to guarantee 1899 * serialization of dlmglue flock calls. 1900 */ 1901 int ocfs2_file_lock(struct file *file, int ex, int trylock) 1902 { 1903 int ret, level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 1904 unsigned int lkm_flags = trylock ? DLM_LKF_NOQUEUE : 0; 1905 unsigned long flags; 1906 struct ocfs2_file_private *fp = file->private_data; 1907 struct ocfs2_lock_res *lockres = &fp->fp_flock; 1908 struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb); 1909 struct ocfs2_mask_waiter mw; 1910 1911 ocfs2_init_mask_waiter(&mw); 1912 1913 if ((lockres->l_flags & OCFS2_LOCK_BUSY) || 1914 (lockres->l_level > DLM_LOCK_NL)) { 1915 mlog(ML_ERROR, 1916 "File lock \"%s\" has busy or locked state: flags: 0x%lx, " 1917 "level: %u\n", lockres->l_name, lockres->l_flags, 1918 lockres->l_level); 1919 return -EINVAL; 1920 } 1921 1922 spin_lock_irqsave(&lockres->l_lock, flags); 1923 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 1924 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1925 spin_unlock_irqrestore(&lockres->l_lock, flags); 1926 1927 /* 1928 * Get the lock at NLMODE to start - that way we 1929 * can cancel the upconvert request if need be. 1930 */ 1931 ret = ocfs2_lock_create(osb, lockres, DLM_LOCK_NL, 0); 1932 if (ret < 0) { 1933 mlog_errno(ret); 1934 goto out; 1935 } 1936 1937 ret = ocfs2_wait_for_mask(&mw); 1938 if (ret) { 1939 mlog_errno(ret); 1940 goto out; 1941 } 1942 spin_lock_irqsave(&lockres->l_lock, flags); 1943 } 1944 1945 lockres->l_action = OCFS2_AST_CONVERT; 1946 lkm_flags |= DLM_LKF_CONVERT; 1947 lockres->l_requested = level; 1948 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 1949 1950 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1951 spin_unlock_irqrestore(&lockres->l_lock, flags); 1952 1953 ret = ocfs2_dlm_lock(osb->cconn, level, &lockres->l_lksb, lkm_flags, 1954 lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1); 1955 if (ret) { 1956 if (!trylock || (ret != -EAGAIN)) { 1957 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 1958 ret = -EINVAL; 1959 } 1960 1961 ocfs2_recover_from_dlm_error(lockres, 1); 1962 lockres_remove_mask_waiter(lockres, &mw); 1963 goto out; 1964 } 1965 1966 ret = ocfs2_wait_for_mask_interruptible(&mw, lockres); 1967 if (ret == -ERESTARTSYS) { 1968 /* 1969 * Userspace can cause deadlock itself with 1970 * flock(). Current behavior locally is to allow the 1971 * deadlock, but abort the system call if a signal is 1972 * received. We follow this example, otherwise a 1973 * poorly written program could sit in kernel until 1974 * reboot. 1975 * 1976 * Handling this is a bit more complicated for Ocfs2 1977 * though. We can't exit this function with an 1978 * outstanding lock request, so a cancel convert is 1979 * required. We intentionally overwrite 'ret' - if the 1980 * cancel fails and the lock was granted, it's easier 1981 * to just bubble success back up to the user. 1982 */ 1983 ret = ocfs2_flock_handle_signal(lockres, level); 1984 } else if (!ret && (level > lockres->l_level)) { 1985 /* Trylock failed asynchronously */ 1986 BUG_ON(!trylock); 1987 ret = -EAGAIN; 1988 } 1989 1990 out: 1991 1992 mlog(0, "Lock: \"%s\" ex: %d, trylock: %d, returns: %d\n", 1993 lockres->l_name, ex, trylock, ret); 1994 return ret; 1995 } 1996 1997 void ocfs2_file_unlock(struct file *file) 1998 { 1999 int ret; 2000 unsigned int gen; 2001 unsigned long flags; 2002 struct ocfs2_file_private *fp = file->private_data; 2003 struct ocfs2_lock_res *lockres = &fp->fp_flock; 2004 struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb); 2005 struct ocfs2_mask_waiter mw; 2006 2007 ocfs2_init_mask_waiter(&mw); 2008 2009 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) 2010 return; 2011 2012 if (lockres->l_level == DLM_LOCK_NL) 2013 return; 2014 2015 mlog(0, "Unlock: \"%s\" flags: 0x%lx, level: %d, act: %d\n", 2016 lockres->l_name, lockres->l_flags, lockres->l_level, 2017 lockres->l_action); 2018 2019 spin_lock_irqsave(&lockres->l_lock, flags); 2020 /* 2021 * Fake a blocking ast for the downconvert code. 2022 */ 2023 lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED); 2024 lockres->l_blocking = DLM_LOCK_EX; 2025 2026 gen = ocfs2_prepare_downconvert(lockres, DLM_LOCK_NL); 2027 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 2028 spin_unlock_irqrestore(&lockres->l_lock, flags); 2029 2030 ret = ocfs2_downconvert_lock(osb, lockres, DLM_LOCK_NL, 0, gen); 2031 if (ret) { 2032 mlog_errno(ret); 2033 return; 2034 } 2035 2036 ret = ocfs2_wait_for_mask(&mw); 2037 if (ret) 2038 mlog_errno(ret); 2039 } 2040 2041 static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb, 2042 struct ocfs2_lock_res *lockres) 2043 { 2044 int kick = 0; 2045 2046 mlog_entry_void(); 2047 2048 /* If we know that another node is waiting on our lock, kick 2049 * the downconvert thread * pre-emptively when we reach a release 2050 * condition. */ 2051 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) { 2052 switch(lockres->l_blocking) { 2053 case DLM_LOCK_EX: 2054 if (!lockres->l_ex_holders && !lockres->l_ro_holders) 2055 kick = 1; 2056 break; 2057 case DLM_LOCK_PR: 2058 if (!lockres->l_ex_holders) 2059 kick = 1; 2060 break; 2061 default: 2062 BUG(); 2063 } 2064 } 2065 2066 if (kick) 2067 ocfs2_wake_downconvert_thread(osb); 2068 2069 mlog_exit_void(); 2070 } 2071 2072 #define OCFS2_SEC_BITS 34 2073 #define OCFS2_SEC_SHIFT (64 - 34) 2074 #define OCFS2_NSEC_MASK ((1ULL << OCFS2_SEC_SHIFT) - 1) 2075 2076 /* LVB only has room for 64 bits of time here so we pack it for 2077 * now. */ 2078 static u64 ocfs2_pack_timespec(struct timespec *spec) 2079 { 2080 u64 res; 2081 u64 sec = spec->tv_sec; 2082 u32 nsec = spec->tv_nsec; 2083 2084 res = (sec << OCFS2_SEC_SHIFT) | (nsec & OCFS2_NSEC_MASK); 2085 2086 return res; 2087 } 2088 2089 /* Call this with the lockres locked. I am reasonably sure we don't 2090 * need ip_lock in this function as anyone who would be changing those 2091 * values is supposed to be blocked in ocfs2_inode_lock right now. */ 2092 static void __ocfs2_stuff_meta_lvb(struct inode *inode) 2093 { 2094 struct ocfs2_inode_info *oi = OCFS2_I(inode); 2095 struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; 2096 struct ocfs2_meta_lvb *lvb; 2097 2098 mlog_entry_void(); 2099 2100 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2101 2102 /* 2103 * Invalidate the LVB of a deleted inode - this way other 2104 * nodes are forced to go to disk and discover the new inode 2105 * status. 2106 */ 2107 if (oi->ip_flags & OCFS2_INODE_DELETED) { 2108 lvb->lvb_version = 0; 2109 goto out; 2110 } 2111 2112 lvb->lvb_version = OCFS2_LVB_VERSION; 2113 lvb->lvb_isize = cpu_to_be64(i_size_read(inode)); 2114 lvb->lvb_iclusters = cpu_to_be32(oi->ip_clusters); 2115 lvb->lvb_iuid = cpu_to_be32(inode->i_uid); 2116 lvb->lvb_igid = cpu_to_be32(inode->i_gid); 2117 lvb->lvb_imode = cpu_to_be16(inode->i_mode); 2118 lvb->lvb_inlink = cpu_to_be16(inode->i_nlink); 2119 lvb->lvb_iatime_packed = 2120 cpu_to_be64(ocfs2_pack_timespec(&inode->i_atime)); 2121 lvb->lvb_ictime_packed = 2122 cpu_to_be64(ocfs2_pack_timespec(&inode->i_ctime)); 2123 lvb->lvb_imtime_packed = 2124 cpu_to_be64(ocfs2_pack_timespec(&inode->i_mtime)); 2125 lvb->lvb_iattr = cpu_to_be32(oi->ip_attr); 2126 lvb->lvb_idynfeatures = cpu_to_be16(oi->ip_dyn_features); 2127 lvb->lvb_igeneration = cpu_to_be32(inode->i_generation); 2128 2129 out: 2130 mlog_meta_lvb(0, lockres); 2131 2132 mlog_exit_void(); 2133 } 2134 2135 static void ocfs2_unpack_timespec(struct timespec *spec, 2136 u64 packed_time) 2137 { 2138 spec->tv_sec = packed_time >> OCFS2_SEC_SHIFT; 2139 spec->tv_nsec = packed_time & OCFS2_NSEC_MASK; 2140 } 2141 2142 static void ocfs2_refresh_inode_from_lvb(struct inode *inode) 2143 { 2144 struct ocfs2_inode_info *oi = OCFS2_I(inode); 2145 struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; 2146 struct ocfs2_meta_lvb *lvb; 2147 2148 mlog_entry_void(); 2149 2150 mlog_meta_lvb(0, lockres); 2151 2152 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2153 2154 /* We're safe here without the lockres lock... */ 2155 spin_lock(&oi->ip_lock); 2156 oi->ip_clusters = be32_to_cpu(lvb->lvb_iclusters); 2157 i_size_write(inode, be64_to_cpu(lvb->lvb_isize)); 2158 2159 oi->ip_attr = be32_to_cpu(lvb->lvb_iattr); 2160 oi->ip_dyn_features = be16_to_cpu(lvb->lvb_idynfeatures); 2161 ocfs2_set_inode_flags(inode); 2162 2163 /* fast-symlinks are a special case */ 2164 if (S_ISLNK(inode->i_mode) && !oi->ip_clusters) 2165 inode->i_blocks = 0; 2166 else 2167 inode->i_blocks = ocfs2_inode_sector_count(inode); 2168 2169 inode->i_uid = be32_to_cpu(lvb->lvb_iuid); 2170 inode->i_gid = be32_to_cpu(lvb->lvb_igid); 2171 inode->i_mode = be16_to_cpu(lvb->lvb_imode); 2172 inode->i_nlink = be16_to_cpu(lvb->lvb_inlink); 2173 ocfs2_unpack_timespec(&inode->i_atime, 2174 be64_to_cpu(lvb->lvb_iatime_packed)); 2175 ocfs2_unpack_timespec(&inode->i_mtime, 2176 be64_to_cpu(lvb->lvb_imtime_packed)); 2177 ocfs2_unpack_timespec(&inode->i_ctime, 2178 be64_to_cpu(lvb->lvb_ictime_packed)); 2179 spin_unlock(&oi->ip_lock); 2180 2181 mlog_exit_void(); 2182 } 2183 2184 static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode, 2185 struct ocfs2_lock_res *lockres) 2186 { 2187 struct ocfs2_meta_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2188 2189 if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) 2190 && lvb->lvb_version == OCFS2_LVB_VERSION 2191 && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation) 2192 return 1; 2193 return 0; 2194 } 2195 2196 /* Determine whether a lock resource needs to be refreshed, and 2197 * arbitrate who gets to refresh it. 2198 * 2199 * 0 means no refresh needed. 2200 * 2201 * > 0 means you need to refresh this and you MUST call 2202 * ocfs2_complete_lock_res_refresh afterwards. */ 2203 static int ocfs2_should_refresh_lock_res(struct ocfs2_lock_res *lockres) 2204 { 2205 unsigned long flags; 2206 int status = 0; 2207 2208 mlog_entry_void(); 2209 2210 refresh_check: 2211 spin_lock_irqsave(&lockres->l_lock, flags); 2212 if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) { 2213 spin_unlock_irqrestore(&lockres->l_lock, flags); 2214 goto bail; 2215 } 2216 2217 if (lockres->l_flags & OCFS2_LOCK_REFRESHING) { 2218 spin_unlock_irqrestore(&lockres->l_lock, flags); 2219 2220 ocfs2_wait_on_refreshing_lock(lockres); 2221 goto refresh_check; 2222 } 2223 2224 /* Ok, I'll be the one to refresh this lock. */ 2225 lockres_or_flags(lockres, OCFS2_LOCK_REFRESHING); 2226 spin_unlock_irqrestore(&lockres->l_lock, flags); 2227 2228 status = 1; 2229 bail: 2230 mlog_exit(status); 2231 return status; 2232 } 2233 2234 /* If status is non zero, I'll mark it as not being in refresh 2235 * anymroe, but i won't clear the needs refresh flag. */ 2236 static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockres, 2237 int status) 2238 { 2239 unsigned long flags; 2240 mlog_entry_void(); 2241 2242 spin_lock_irqsave(&lockres->l_lock, flags); 2243 lockres_clear_flags(lockres, OCFS2_LOCK_REFRESHING); 2244 if (!status) 2245 lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 2246 spin_unlock_irqrestore(&lockres->l_lock, flags); 2247 2248 wake_up(&lockres->l_event); 2249 2250 mlog_exit_void(); 2251 } 2252 2253 /* may or may not return a bh if it went to disk. */ 2254 static int ocfs2_inode_lock_update(struct inode *inode, 2255 struct buffer_head **bh) 2256 { 2257 int status = 0; 2258 struct ocfs2_inode_info *oi = OCFS2_I(inode); 2259 struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; 2260 struct ocfs2_dinode *fe; 2261 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 2262 2263 mlog_entry_void(); 2264 2265 if (ocfs2_mount_local(osb)) 2266 goto bail; 2267 2268 spin_lock(&oi->ip_lock); 2269 if (oi->ip_flags & OCFS2_INODE_DELETED) { 2270 mlog(0, "Orphaned inode %llu was deleted while we " 2271 "were waiting on a lock. ip_flags = 0x%x\n", 2272 (unsigned long long)oi->ip_blkno, oi->ip_flags); 2273 spin_unlock(&oi->ip_lock); 2274 status = -ENOENT; 2275 goto bail; 2276 } 2277 spin_unlock(&oi->ip_lock); 2278 2279 if (!ocfs2_should_refresh_lock_res(lockres)) 2280 goto bail; 2281 2282 /* This will discard any caching information we might have had 2283 * for the inode metadata. */ 2284 ocfs2_metadata_cache_purge(INODE_CACHE(inode)); 2285 2286 ocfs2_extent_map_trunc(inode, 0); 2287 2288 if (ocfs2_meta_lvb_is_trustable(inode, lockres)) { 2289 mlog(0, "Trusting LVB on inode %llu\n", 2290 (unsigned long long)oi->ip_blkno); 2291 ocfs2_refresh_inode_from_lvb(inode); 2292 } else { 2293 /* Boo, we have to go to disk. */ 2294 /* read bh, cast, ocfs2_refresh_inode */ 2295 status = ocfs2_read_inode_block(inode, bh); 2296 if (status < 0) { 2297 mlog_errno(status); 2298 goto bail_refresh; 2299 } 2300 fe = (struct ocfs2_dinode *) (*bh)->b_data; 2301 2302 /* This is a good chance to make sure we're not 2303 * locking an invalid object. ocfs2_read_inode_block() 2304 * already checked that the inode block is sane. 2305 * 2306 * We bug on a stale inode here because we checked 2307 * above whether it was wiped from disk. The wiping 2308 * node provides a guarantee that we receive that 2309 * message and can mark the inode before dropping any 2310 * locks associated with it. */ 2311 mlog_bug_on_msg(inode->i_generation != 2312 le32_to_cpu(fe->i_generation), 2313 "Invalid dinode %llu disk generation: %u " 2314 "inode->i_generation: %u\n", 2315 (unsigned long long)oi->ip_blkno, 2316 le32_to_cpu(fe->i_generation), 2317 inode->i_generation); 2318 mlog_bug_on_msg(le64_to_cpu(fe->i_dtime) || 2319 !(fe->i_flags & cpu_to_le32(OCFS2_VALID_FL)), 2320 "Stale dinode %llu dtime: %llu flags: 0x%x\n", 2321 (unsigned long long)oi->ip_blkno, 2322 (unsigned long long)le64_to_cpu(fe->i_dtime), 2323 le32_to_cpu(fe->i_flags)); 2324 2325 ocfs2_refresh_inode(inode, fe); 2326 ocfs2_track_lock_refresh(lockres); 2327 } 2328 2329 status = 0; 2330 bail_refresh: 2331 ocfs2_complete_lock_res_refresh(lockres, status); 2332 bail: 2333 mlog_exit(status); 2334 return status; 2335 } 2336 2337 static int ocfs2_assign_bh(struct inode *inode, 2338 struct buffer_head **ret_bh, 2339 struct buffer_head *passed_bh) 2340 { 2341 int status; 2342 2343 if (passed_bh) { 2344 /* Ok, the update went to disk for us, use the 2345 * returned bh. */ 2346 *ret_bh = passed_bh; 2347 get_bh(*ret_bh); 2348 2349 return 0; 2350 } 2351 2352 status = ocfs2_read_inode_block(inode, ret_bh); 2353 if (status < 0) 2354 mlog_errno(status); 2355 2356 return status; 2357 } 2358 2359 /* 2360 * returns < 0 error if the callback will never be called, otherwise 2361 * the result of the lock will be communicated via the callback. 2362 */ 2363 int ocfs2_inode_lock_full_nested(struct inode *inode, 2364 struct buffer_head **ret_bh, 2365 int ex, 2366 int arg_flags, 2367 int subclass) 2368 { 2369 int status, level, acquired; 2370 u32 dlm_flags; 2371 struct ocfs2_lock_res *lockres = NULL; 2372 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 2373 struct buffer_head *local_bh = NULL; 2374 2375 BUG_ON(!inode); 2376 2377 mlog_entry_void(); 2378 2379 mlog(0, "inode %llu, take %s META lock\n", 2380 (unsigned long long)OCFS2_I(inode)->ip_blkno, 2381 ex ? "EXMODE" : "PRMODE"); 2382 2383 status = 0; 2384 acquired = 0; 2385 /* We'll allow faking a readonly metadata lock for 2386 * rodevices. */ 2387 if (ocfs2_is_hard_readonly(osb)) { 2388 if (ex) 2389 status = -EROFS; 2390 goto bail; 2391 } 2392 2393 if (ocfs2_mount_local(osb)) 2394 goto local; 2395 2396 if (!(arg_flags & OCFS2_META_LOCK_RECOVERY)) 2397 ocfs2_wait_for_recovery(osb); 2398 2399 lockres = &OCFS2_I(inode)->ip_inode_lockres; 2400 level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2401 dlm_flags = 0; 2402 if (arg_flags & OCFS2_META_LOCK_NOQUEUE) 2403 dlm_flags |= DLM_LKF_NOQUEUE; 2404 2405 status = __ocfs2_cluster_lock(osb, lockres, level, dlm_flags, 2406 arg_flags, subclass, _RET_IP_); 2407 if (status < 0) { 2408 if (status != -EAGAIN && status != -EIOCBRETRY) 2409 mlog_errno(status); 2410 goto bail; 2411 } 2412 2413 /* Notify the error cleanup path to drop the cluster lock. */ 2414 acquired = 1; 2415 2416 /* We wait twice because a node may have died while we were in 2417 * the lower dlm layers. The second time though, we've 2418 * committed to owning this lock so we don't allow signals to 2419 * abort the operation. */ 2420 if (!(arg_flags & OCFS2_META_LOCK_RECOVERY)) 2421 ocfs2_wait_for_recovery(osb); 2422 2423 local: 2424 /* 2425 * We only see this flag if we're being called from 2426 * ocfs2_read_locked_inode(). It means we're locking an inode 2427 * which hasn't been populated yet, so clear the refresh flag 2428 * and let the caller handle it. 2429 */ 2430 if (inode->i_state & I_NEW) { 2431 status = 0; 2432 if (lockres) 2433 ocfs2_complete_lock_res_refresh(lockres, 0); 2434 goto bail; 2435 } 2436 2437 /* This is fun. The caller may want a bh back, or it may 2438 * not. ocfs2_inode_lock_update definitely wants one in, but 2439 * may or may not read one, depending on what's in the 2440 * LVB. The result of all of this is that we've *only* gone to 2441 * disk if we have to, so the complexity is worthwhile. */ 2442 status = ocfs2_inode_lock_update(inode, &local_bh); 2443 if (status < 0) { 2444 if (status != -ENOENT) 2445 mlog_errno(status); 2446 goto bail; 2447 } 2448 2449 if (ret_bh) { 2450 status = ocfs2_assign_bh(inode, ret_bh, local_bh); 2451 if (status < 0) { 2452 mlog_errno(status); 2453 goto bail; 2454 } 2455 } 2456 2457 bail: 2458 if (status < 0) { 2459 if (ret_bh && (*ret_bh)) { 2460 brelse(*ret_bh); 2461 *ret_bh = NULL; 2462 } 2463 if (acquired) 2464 ocfs2_inode_unlock(inode, ex); 2465 } 2466 2467 if (local_bh) 2468 brelse(local_bh); 2469 2470 mlog_exit(status); 2471 return status; 2472 } 2473 2474 /* 2475 * This is working around a lock inversion between tasks acquiring DLM 2476 * locks while holding a page lock and the downconvert thread which 2477 * blocks dlm lock acquiry while acquiring page locks. 2478 * 2479 * ** These _with_page variantes are only intended to be called from aop 2480 * methods that hold page locks and return a very specific *positive* error 2481 * code that aop methods pass up to the VFS -- test for errors with != 0. ** 2482 * 2483 * The DLM is called such that it returns -EAGAIN if it would have 2484 * blocked waiting for the downconvert thread. In that case we unlock 2485 * our page so the downconvert thread can make progress. Once we've 2486 * done this we have to return AOP_TRUNCATED_PAGE so the aop method 2487 * that called us can bubble that back up into the VFS who will then 2488 * immediately retry the aop call. 2489 * 2490 * We do a blocking lock and immediate unlock before returning, though, so that 2491 * the lock has a great chance of being cached on this node by the time the VFS 2492 * calls back to retry the aop. This has a potential to livelock as nodes 2493 * ping locks back and forth, but that's a risk we're willing to take to avoid 2494 * the lock inversion simply. 2495 */ 2496 int ocfs2_inode_lock_with_page(struct inode *inode, 2497 struct buffer_head **ret_bh, 2498 int ex, 2499 struct page *page) 2500 { 2501 int ret; 2502 2503 ret = ocfs2_inode_lock_full(inode, ret_bh, ex, OCFS2_LOCK_NONBLOCK); 2504 if (ret == -EAGAIN) { 2505 unlock_page(page); 2506 if (ocfs2_inode_lock(inode, ret_bh, ex) == 0) 2507 ocfs2_inode_unlock(inode, ex); 2508 ret = AOP_TRUNCATED_PAGE; 2509 } 2510 2511 return ret; 2512 } 2513 2514 int ocfs2_inode_lock_atime(struct inode *inode, 2515 struct vfsmount *vfsmnt, 2516 int *level) 2517 { 2518 int ret; 2519 2520 mlog_entry_void(); 2521 ret = ocfs2_inode_lock(inode, NULL, 0); 2522 if (ret < 0) { 2523 mlog_errno(ret); 2524 return ret; 2525 } 2526 2527 /* 2528 * If we should update atime, we will get EX lock, 2529 * otherwise we just get PR lock. 2530 */ 2531 if (ocfs2_should_update_atime(inode, vfsmnt)) { 2532 struct buffer_head *bh = NULL; 2533 2534 ocfs2_inode_unlock(inode, 0); 2535 ret = ocfs2_inode_lock(inode, &bh, 1); 2536 if (ret < 0) { 2537 mlog_errno(ret); 2538 return ret; 2539 } 2540 *level = 1; 2541 if (ocfs2_should_update_atime(inode, vfsmnt)) 2542 ocfs2_update_inode_atime(inode, bh); 2543 if (bh) 2544 brelse(bh); 2545 } else 2546 *level = 0; 2547 2548 mlog_exit(ret); 2549 return ret; 2550 } 2551 2552 void ocfs2_inode_unlock(struct inode *inode, 2553 int ex) 2554 { 2555 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2556 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_inode_lockres; 2557 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 2558 2559 mlog_entry_void(); 2560 2561 mlog(0, "inode %llu drop %s META lock\n", 2562 (unsigned long long)OCFS2_I(inode)->ip_blkno, 2563 ex ? "EXMODE" : "PRMODE"); 2564 2565 if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)) && 2566 !ocfs2_mount_local(osb)) 2567 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level); 2568 2569 mlog_exit_void(); 2570 } 2571 2572 int ocfs2_orphan_scan_lock(struct ocfs2_super *osb, u32 *seqno) 2573 { 2574 struct ocfs2_lock_res *lockres; 2575 struct ocfs2_orphan_scan_lvb *lvb; 2576 int status = 0; 2577 2578 if (ocfs2_is_hard_readonly(osb)) 2579 return -EROFS; 2580 2581 if (ocfs2_mount_local(osb)) 2582 return 0; 2583 2584 lockres = &osb->osb_orphan_scan.os_lockres; 2585 status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0); 2586 if (status < 0) 2587 return status; 2588 2589 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2590 if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) && 2591 lvb->lvb_version == OCFS2_ORPHAN_LVB_VERSION) 2592 *seqno = be32_to_cpu(lvb->lvb_os_seqno); 2593 else 2594 *seqno = osb->osb_orphan_scan.os_seqno + 1; 2595 2596 return status; 2597 } 2598 2599 void ocfs2_orphan_scan_unlock(struct ocfs2_super *osb, u32 seqno) 2600 { 2601 struct ocfs2_lock_res *lockres; 2602 struct ocfs2_orphan_scan_lvb *lvb; 2603 2604 if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb)) { 2605 lockres = &osb->osb_orphan_scan.os_lockres; 2606 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2607 lvb->lvb_version = OCFS2_ORPHAN_LVB_VERSION; 2608 lvb->lvb_os_seqno = cpu_to_be32(seqno); 2609 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX); 2610 } 2611 } 2612 2613 int ocfs2_super_lock(struct ocfs2_super *osb, 2614 int ex) 2615 { 2616 int status = 0; 2617 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2618 struct ocfs2_lock_res *lockres = &osb->osb_super_lockres; 2619 2620 mlog_entry_void(); 2621 2622 if (ocfs2_is_hard_readonly(osb)) 2623 return -EROFS; 2624 2625 if (ocfs2_mount_local(osb)) 2626 goto bail; 2627 2628 status = ocfs2_cluster_lock(osb, lockres, level, 0, 0); 2629 if (status < 0) { 2630 mlog_errno(status); 2631 goto bail; 2632 } 2633 2634 /* The super block lock path is really in the best position to 2635 * know when resources covered by the lock need to be 2636 * refreshed, so we do it here. Of course, making sense of 2637 * everything is up to the caller :) */ 2638 status = ocfs2_should_refresh_lock_res(lockres); 2639 if (status < 0) { 2640 mlog_errno(status); 2641 goto bail; 2642 } 2643 if (status) { 2644 status = ocfs2_refresh_slot_info(osb); 2645 2646 ocfs2_complete_lock_res_refresh(lockres, status); 2647 2648 if (status < 0) 2649 mlog_errno(status); 2650 ocfs2_track_lock_refresh(lockres); 2651 } 2652 bail: 2653 mlog_exit(status); 2654 return status; 2655 } 2656 2657 void ocfs2_super_unlock(struct ocfs2_super *osb, 2658 int ex) 2659 { 2660 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2661 struct ocfs2_lock_res *lockres = &osb->osb_super_lockres; 2662 2663 if (!ocfs2_mount_local(osb)) 2664 ocfs2_cluster_unlock(osb, lockres, level); 2665 } 2666 2667 int ocfs2_rename_lock(struct ocfs2_super *osb) 2668 { 2669 int status; 2670 struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres; 2671 2672 if (ocfs2_is_hard_readonly(osb)) 2673 return -EROFS; 2674 2675 if (ocfs2_mount_local(osb)) 2676 return 0; 2677 2678 status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0); 2679 if (status < 0) 2680 mlog_errno(status); 2681 2682 return status; 2683 } 2684 2685 void ocfs2_rename_unlock(struct ocfs2_super *osb) 2686 { 2687 struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres; 2688 2689 if (!ocfs2_mount_local(osb)) 2690 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX); 2691 } 2692 2693 int ocfs2_nfs_sync_lock(struct ocfs2_super *osb, int ex) 2694 { 2695 int status; 2696 struct ocfs2_lock_res *lockres = &osb->osb_nfs_sync_lockres; 2697 2698 if (ocfs2_is_hard_readonly(osb)) 2699 return -EROFS; 2700 2701 if (ocfs2_mount_local(osb)) 2702 return 0; 2703 2704 status = ocfs2_cluster_lock(osb, lockres, ex ? LKM_EXMODE : LKM_PRMODE, 2705 0, 0); 2706 if (status < 0) 2707 mlog(ML_ERROR, "lock on nfs sync lock failed %d\n", status); 2708 2709 return status; 2710 } 2711 2712 void ocfs2_nfs_sync_unlock(struct ocfs2_super *osb, int ex) 2713 { 2714 struct ocfs2_lock_res *lockres = &osb->osb_nfs_sync_lockres; 2715 2716 if (!ocfs2_mount_local(osb)) 2717 ocfs2_cluster_unlock(osb, lockres, 2718 ex ? LKM_EXMODE : LKM_PRMODE); 2719 } 2720 2721 int ocfs2_dentry_lock(struct dentry *dentry, int ex) 2722 { 2723 int ret; 2724 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2725 struct ocfs2_dentry_lock *dl = dentry->d_fsdata; 2726 struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb); 2727 2728 BUG_ON(!dl); 2729 2730 if (ocfs2_is_hard_readonly(osb)) 2731 return -EROFS; 2732 2733 if (ocfs2_mount_local(osb)) 2734 return 0; 2735 2736 ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0); 2737 if (ret < 0) 2738 mlog_errno(ret); 2739 2740 return ret; 2741 } 2742 2743 void ocfs2_dentry_unlock(struct dentry *dentry, int ex) 2744 { 2745 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2746 struct ocfs2_dentry_lock *dl = dentry->d_fsdata; 2747 struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb); 2748 2749 if (!ocfs2_mount_local(osb)) 2750 ocfs2_cluster_unlock(osb, &dl->dl_lockres, level); 2751 } 2752 2753 /* Reference counting of the dlm debug structure. We want this because 2754 * open references on the debug inodes can live on after a mount, so 2755 * we can't rely on the ocfs2_super to always exist. */ 2756 static void ocfs2_dlm_debug_free(struct kref *kref) 2757 { 2758 struct ocfs2_dlm_debug *dlm_debug; 2759 2760 dlm_debug = container_of(kref, struct ocfs2_dlm_debug, d_refcnt); 2761 2762 kfree(dlm_debug); 2763 } 2764 2765 void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug) 2766 { 2767 if (dlm_debug) 2768 kref_put(&dlm_debug->d_refcnt, ocfs2_dlm_debug_free); 2769 } 2770 2771 static void ocfs2_get_dlm_debug(struct ocfs2_dlm_debug *debug) 2772 { 2773 kref_get(&debug->d_refcnt); 2774 } 2775 2776 struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void) 2777 { 2778 struct ocfs2_dlm_debug *dlm_debug; 2779 2780 dlm_debug = kmalloc(sizeof(struct ocfs2_dlm_debug), GFP_KERNEL); 2781 if (!dlm_debug) { 2782 mlog_errno(-ENOMEM); 2783 goto out; 2784 } 2785 2786 kref_init(&dlm_debug->d_refcnt); 2787 INIT_LIST_HEAD(&dlm_debug->d_lockres_tracking); 2788 dlm_debug->d_locking_state = NULL; 2789 out: 2790 return dlm_debug; 2791 } 2792 2793 /* Access to this is arbitrated for us via seq_file->sem. */ 2794 struct ocfs2_dlm_seq_priv { 2795 struct ocfs2_dlm_debug *p_dlm_debug; 2796 struct ocfs2_lock_res p_iter_res; 2797 struct ocfs2_lock_res p_tmp_res; 2798 }; 2799 2800 static struct ocfs2_lock_res *ocfs2_dlm_next_res(struct ocfs2_lock_res *start, 2801 struct ocfs2_dlm_seq_priv *priv) 2802 { 2803 struct ocfs2_lock_res *iter, *ret = NULL; 2804 struct ocfs2_dlm_debug *dlm_debug = priv->p_dlm_debug; 2805 2806 assert_spin_locked(&ocfs2_dlm_tracking_lock); 2807 2808 list_for_each_entry(iter, &start->l_debug_list, l_debug_list) { 2809 /* discover the head of the list */ 2810 if (&iter->l_debug_list == &dlm_debug->d_lockres_tracking) { 2811 mlog(0, "End of list found, %p\n", ret); 2812 break; 2813 } 2814 2815 /* We track our "dummy" iteration lockres' by a NULL 2816 * l_ops field. */ 2817 if (iter->l_ops != NULL) { 2818 ret = iter; 2819 break; 2820 } 2821 } 2822 2823 return ret; 2824 } 2825 2826 static void *ocfs2_dlm_seq_start(struct seq_file *m, loff_t *pos) 2827 { 2828 struct ocfs2_dlm_seq_priv *priv = m->private; 2829 struct ocfs2_lock_res *iter; 2830 2831 spin_lock(&ocfs2_dlm_tracking_lock); 2832 iter = ocfs2_dlm_next_res(&priv->p_iter_res, priv); 2833 if (iter) { 2834 /* Since lockres' have the lifetime of their container 2835 * (which can be inodes, ocfs2_supers, etc) we want to 2836 * copy this out to a temporary lockres while still 2837 * under the spinlock. Obviously after this we can't 2838 * trust any pointers on the copy returned, but that's 2839 * ok as the information we want isn't typically held 2840 * in them. */ 2841 priv->p_tmp_res = *iter; 2842 iter = &priv->p_tmp_res; 2843 } 2844 spin_unlock(&ocfs2_dlm_tracking_lock); 2845 2846 return iter; 2847 } 2848 2849 static void ocfs2_dlm_seq_stop(struct seq_file *m, void *v) 2850 { 2851 } 2852 2853 static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos) 2854 { 2855 struct ocfs2_dlm_seq_priv *priv = m->private; 2856 struct ocfs2_lock_res *iter = v; 2857 struct ocfs2_lock_res *dummy = &priv->p_iter_res; 2858 2859 spin_lock(&ocfs2_dlm_tracking_lock); 2860 iter = ocfs2_dlm_next_res(iter, priv); 2861 list_del_init(&dummy->l_debug_list); 2862 if (iter) { 2863 list_add(&dummy->l_debug_list, &iter->l_debug_list); 2864 priv->p_tmp_res = *iter; 2865 iter = &priv->p_tmp_res; 2866 } 2867 spin_unlock(&ocfs2_dlm_tracking_lock); 2868 2869 return iter; 2870 } 2871 2872 /* So that debugfs.ocfs2 can determine which format is being used */ 2873 #define OCFS2_DLM_DEBUG_STR_VERSION 2 2874 static int ocfs2_dlm_seq_show(struct seq_file *m, void *v) 2875 { 2876 int i; 2877 char *lvb; 2878 struct ocfs2_lock_res *lockres = v; 2879 2880 if (!lockres) 2881 return -EINVAL; 2882 2883 seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION); 2884 2885 if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY) 2886 seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1, 2887 lockres->l_name, 2888 (unsigned int)ocfs2_get_dentry_lock_ino(lockres)); 2889 else 2890 seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name); 2891 2892 seq_printf(m, "%d\t" 2893 "0x%lx\t" 2894 "0x%x\t" 2895 "0x%x\t" 2896 "%u\t" 2897 "%u\t" 2898 "%d\t" 2899 "%d\t", 2900 lockres->l_level, 2901 lockres->l_flags, 2902 lockres->l_action, 2903 lockres->l_unlock_action, 2904 lockres->l_ro_holders, 2905 lockres->l_ex_holders, 2906 lockres->l_requested, 2907 lockres->l_blocking); 2908 2909 /* Dump the raw LVB */ 2910 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2911 for(i = 0; i < DLM_LVB_LEN; i++) 2912 seq_printf(m, "0x%x\t", lvb[i]); 2913 2914 #ifdef CONFIG_OCFS2_FS_STATS 2915 # define lock_num_prmode(_l) (_l)->l_lock_num_prmode 2916 # define lock_num_exmode(_l) (_l)->l_lock_num_exmode 2917 # define lock_num_prmode_failed(_l) (_l)->l_lock_num_prmode_failed 2918 # define lock_num_exmode_failed(_l) (_l)->l_lock_num_exmode_failed 2919 # define lock_total_prmode(_l) (_l)->l_lock_total_prmode 2920 # define lock_total_exmode(_l) (_l)->l_lock_total_exmode 2921 # define lock_max_prmode(_l) (_l)->l_lock_max_prmode 2922 # define lock_max_exmode(_l) (_l)->l_lock_max_exmode 2923 # define lock_refresh(_l) (_l)->l_lock_refresh 2924 #else 2925 # define lock_num_prmode(_l) (0ULL) 2926 # define lock_num_exmode(_l) (0ULL) 2927 # define lock_num_prmode_failed(_l) (0) 2928 # define lock_num_exmode_failed(_l) (0) 2929 # define lock_total_prmode(_l) (0ULL) 2930 # define lock_total_exmode(_l) (0ULL) 2931 # define lock_max_prmode(_l) (0) 2932 # define lock_max_exmode(_l) (0) 2933 # define lock_refresh(_l) (0) 2934 #endif 2935 /* The following seq_print was added in version 2 of this output */ 2936 seq_printf(m, "%llu\t" 2937 "%llu\t" 2938 "%u\t" 2939 "%u\t" 2940 "%llu\t" 2941 "%llu\t" 2942 "%u\t" 2943 "%u\t" 2944 "%u\t", 2945 lock_num_prmode(lockres), 2946 lock_num_exmode(lockres), 2947 lock_num_prmode_failed(lockres), 2948 lock_num_exmode_failed(lockres), 2949 lock_total_prmode(lockres), 2950 lock_total_exmode(lockres), 2951 lock_max_prmode(lockres), 2952 lock_max_exmode(lockres), 2953 lock_refresh(lockres)); 2954 2955 /* End the line */ 2956 seq_printf(m, "\n"); 2957 return 0; 2958 } 2959 2960 static const struct seq_operations ocfs2_dlm_seq_ops = { 2961 .start = ocfs2_dlm_seq_start, 2962 .stop = ocfs2_dlm_seq_stop, 2963 .next = ocfs2_dlm_seq_next, 2964 .show = ocfs2_dlm_seq_show, 2965 }; 2966 2967 static int ocfs2_dlm_debug_release(struct inode *inode, struct file *file) 2968 { 2969 struct seq_file *seq = file->private_data; 2970 struct ocfs2_dlm_seq_priv *priv = seq->private; 2971 struct ocfs2_lock_res *res = &priv->p_iter_res; 2972 2973 ocfs2_remove_lockres_tracking(res); 2974 ocfs2_put_dlm_debug(priv->p_dlm_debug); 2975 return seq_release_private(inode, file); 2976 } 2977 2978 static int ocfs2_dlm_debug_open(struct inode *inode, struct file *file) 2979 { 2980 int ret; 2981 struct ocfs2_dlm_seq_priv *priv; 2982 struct seq_file *seq; 2983 struct ocfs2_super *osb; 2984 2985 priv = kzalloc(sizeof(struct ocfs2_dlm_seq_priv), GFP_KERNEL); 2986 if (!priv) { 2987 ret = -ENOMEM; 2988 mlog_errno(ret); 2989 goto out; 2990 } 2991 osb = inode->i_private; 2992 ocfs2_get_dlm_debug(osb->osb_dlm_debug); 2993 priv->p_dlm_debug = osb->osb_dlm_debug; 2994 INIT_LIST_HEAD(&priv->p_iter_res.l_debug_list); 2995 2996 ret = seq_open(file, &ocfs2_dlm_seq_ops); 2997 if (ret) { 2998 kfree(priv); 2999 mlog_errno(ret); 3000 goto out; 3001 } 3002 3003 seq = file->private_data; 3004 seq->private = priv; 3005 3006 ocfs2_add_lockres_tracking(&priv->p_iter_res, 3007 priv->p_dlm_debug); 3008 3009 out: 3010 return ret; 3011 } 3012 3013 static const struct file_operations ocfs2_dlm_debug_fops = { 3014 .open = ocfs2_dlm_debug_open, 3015 .release = ocfs2_dlm_debug_release, 3016 .read = seq_read, 3017 .llseek = seq_lseek, 3018 }; 3019 3020 static int ocfs2_dlm_init_debug(struct ocfs2_super *osb) 3021 { 3022 int ret = 0; 3023 struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug; 3024 3025 dlm_debug->d_locking_state = debugfs_create_file("locking_state", 3026 S_IFREG|S_IRUSR, 3027 osb->osb_debug_root, 3028 osb, 3029 &ocfs2_dlm_debug_fops); 3030 if (!dlm_debug->d_locking_state) { 3031 ret = -EINVAL; 3032 mlog(ML_ERROR, 3033 "Unable to create locking state debugfs file.\n"); 3034 goto out; 3035 } 3036 3037 ocfs2_get_dlm_debug(dlm_debug); 3038 out: 3039 return ret; 3040 } 3041 3042 static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb) 3043 { 3044 struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug; 3045 3046 if (dlm_debug) { 3047 debugfs_remove(dlm_debug->d_locking_state); 3048 ocfs2_put_dlm_debug(dlm_debug); 3049 } 3050 } 3051 3052 int ocfs2_dlm_init(struct ocfs2_super *osb) 3053 { 3054 int status = 0; 3055 struct ocfs2_cluster_connection *conn = NULL; 3056 3057 mlog_entry_void(); 3058 3059 if (ocfs2_mount_local(osb)) { 3060 osb->node_num = 0; 3061 goto local; 3062 } 3063 3064 status = ocfs2_dlm_init_debug(osb); 3065 if (status < 0) { 3066 mlog_errno(status); 3067 goto bail; 3068 } 3069 3070 /* launch downconvert thread */ 3071 osb->dc_task = kthread_run(ocfs2_downconvert_thread, osb, "ocfs2dc"); 3072 if (IS_ERR(osb->dc_task)) { 3073 status = PTR_ERR(osb->dc_task); 3074 osb->dc_task = NULL; 3075 mlog_errno(status); 3076 goto bail; 3077 } 3078 3079 /* for now, uuid == domain */ 3080 status = ocfs2_cluster_connect(osb->osb_cluster_stack, 3081 osb->uuid_str, 3082 strlen(osb->uuid_str), 3083 &lproto, ocfs2_do_node_down, osb, 3084 &conn); 3085 if (status) { 3086 mlog_errno(status); 3087 goto bail; 3088 } 3089 3090 status = ocfs2_cluster_this_node(&osb->node_num); 3091 if (status < 0) { 3092 mlog_errno(status); 3093 mlog(ML_ERROR, 3094 "could not find this host's node number\n"); 3095 ocfs2_cluster_disconnect(conn, 0); 3096 goto bail; 3097 } 3098 3099 local: 3100 ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb); 3101 ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb); 3102 ocfs2_nfs_sync_lock_res_init(&osb->osb_nfs_sync_lockres, osb); 3103 ocfs2_orphan_scan_lock_res_init(&osb->osb_orphan_scan.os_lockres, osb); 3104 3105 osb->cconn = conn; 3106 3107 status = 0; 3108 bail: 3109 if (status < 0) { 3110 ocfs2_dlm_shutdown_debug(osb); 3111 if (osb->dc_task) 3112 kthread_stop(osb->dc_task); 3113 } 3114 3115 mlog_exit(status); 3116 return status; 3117 } 3118 3119 void ocfs2_dlm_shutdown(struct ocfs2_super *osb, 3120 int hangup_pending) 3121 { 3122 mlog_entry_void(); 3123 3124 ocfs2_drop_osb_locks(osb); 3125 3126 /* 3127 * Now that we have dropped all locks and ocfs2_dismount_volume() 3128 * has disabled recovery, the DLM won't be talking to us. It's 3129 * safe to tear things down before disconnecting the cluster. 3130 */ 3131 3132 if (osb->dc_task) { 3133 kthread_stop(osb->dc_task); 3134 osb->dc_task = NULL; 3135 } 3136 3137 ocfs2_lock_res_free(&osb->osb_super_lockres); 3138 ocfs2_lock_res_free(&osb->osb_rename_lockres); 3139 ocfs2_lock_res_free(&osb->osb_nfs_sync_lockres); 3140 ocfs2_lock_res_free(&osb->osb_orphan_scan.os_lockres); 3141 3142 ocfs2_cluster_disconnect(osb->cconn, hangup_pending); 3143 osb->cconn = NULL; 3144 3145 ocfs2_dlm_shutdown_debug(osb); 3146 3147 mlog_exit_void(); 3148 } 3149 3150 static int ocfs2_drop_lock(struct ocfs2_super *osb, 3151 struct ocfs2_lock_res *lockres) 3152 { 3153 int ret; 3154 unsigned long flags; 3155 u32 lkm_flags = 0; 3156 3157 /* We didn't get anywhere near actually using this lockres. */ 3158 if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED)) 3159 goto out; 3160 3161 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) 3162 lkm_flags |= DLM_LKF_VALBLK; 3163 3164 spin_lock_irqsave(&lockres->l_lock, flags); 3165 3166 mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_FREEING), 3167 "lockres %s, flags 0x%lx\n", 3168 lockres->l_name, lockres->l_flags); 3169 3170 while (lockres->l_flags & OCFS2_LOCK_BUSY) { 3171 mlog(0, "waiting on busy lock \"%s\": flags = %lx, action = " 3172 "%u, unlock_action = %u\n", 3173 lockres->l_name, lockres->l_flags, lockres->l_action, 3174 lockres->l_unlock_action); 3175 3176 spin_unlock_irqrestore(&lockres->l_lock, flags); 3177 3178 /* XXX: Today we just wait on any busy 3179 * locks... Perhaps we need to cancel converts in the 3180 * future? */ 3181 ocfs2_wait_on_busy_lock(lockres); 3182 3183 spin_lock_irqsave(&lockres->l_lock, flags); 3184 } 3185 3186 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) { 3187 if (lockres->l_flags & OCFS2_LOCK_ATTACHED && 3188 lockres->l_level == DLM_LOCK_EX && 3189 !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) 3190 lockres->l_ops->set_lvb(lockres); 3191 } 3192 3193 if (lockres->l_flags & OCFS2_LOCK_BUSY) 3194 mlog(ML_ERROR, "destroying busy lock: \"%s\"\n", 3195 lockres->l_name); 3196 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) 3197 mlog(0, "destroying blocked lock: \"%s\"\n", lockres->l_name); 3198 3199 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 3200 spin_unlock_irqrestore(&lockres->l_lock, flags); 3201 goto out; 3202 } 3203 3204 lockres_clear_flags(lockres, OCFS2_LOCK_ATTACHED); 3205 3206 /* make sure we never get here while waiting for an ast to 3207 * fire. */ 3208 BUG_ON(lockres->l_action != OCFS2_AST_INVALID); 3209 3210 /* is this necessary? */ 3211 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 3212 lockres->l_unlock_action = OCFS2_UNLOCK_DROP_LOCK; 3213 spin_unlock_irqrestore(&lockres->l_lock, flags); 3214 3215 mlog(0, "lock %s\n", lockres->l_name); 3216 3217 ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, lkm_flags); 3218 if (ret) { 3219 ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres); 3220 mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags); 3221 ocfs2_dlm_dump_lksb(&lockres->l_lksb); 3222 BUG(); 3223 } 3224 mlog(0, "lock %s, successful return from ocfs2_dlm_unlock\n", 3225 lockres->l_name); 3226 3227 ocfs2_wait_on_busy_lock(lockres); 3228 out: 3229 mlog_exit(0); 3230 return 0; 3231 } 3232 3233 /* Mark the lockres as being dropped. It will no longer be 3234 * queued if blocking, but we still may have to wait on it 3235 * being dequeued from the downconvert thread before we can consider 3236 * it safe to drop. 3237 * 3238 * You can *not* attempt to call cluster_lock on this lockres anymore. */ 3239 void ocfs2_mark_lockres_freeing(struct ocfs2_lock_res *lockres) 3240 { 3241 int status; 3242 struct ocfs2_mask_waiter mw; 3243 unsigned long flags; 3244 3245 ocfs2_init_mask_waiter(&mw); 3246 3247 spin_lock_irqsave(&lockres->l_lock, flags); 3248 lockres->l_flags |= OCFS2_LOCK_FREEING; 3249 while (lockres->l_flags & OCFS2_LOCK_QUEUED) { 3250 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_QUEUED, 0); 3251 spin_unlock_irqrestore(&lockres->l_lock, flags); 3252 3253 mlog(0, "Waiting on lockres %s\n", lockres->l_name); 3254 3255 status = ocfs2_wait_for_mask(&mw); 3256 if (status) 3257 mlog_errno(status); 3258 3259 spin_lock_irqsave(&lockres->l_lock, flags); 3260 } 3261 spin_unlock_irqrestore(&lockres->l_lock, flags); 3262 } 3263 3264 void ocfs2_simple_drop_lockres(struct ocfs2_super *osb, 3265 struct ocfs2_lock_res *lockres) 3266 { 3267 int ret; 3268 3269 ocfs2_mark_lockres_freeing(lockres); 3270 ret = ocfs2_drop_lock(osb, lockres); 3271 if (ret) 3272 mlog_errno(ret); 3273 } 3274 3275 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb) 3276 { 3277 ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres); 3278 ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres); 3279 ocfs2_simple_drop_lockres(osb, &osb->osb_nfs_sync_lockres); 3280 ocfs2_simple_drop_lockres(osb, &osb->osb_orphan_scan.os_lockres); 3281 } 3282 3283 int ocfs2_drop_inode_locks(struct inode *inode) 3284 { 3285 int status, err; 3286 3287 mlog_entry_void(); 3288 3289 /* No need to call ocfs2_mark_lockres_freeing here - 3290 * ocfs2_clear_inode has done it for us. */ 3291 3292 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 3293 &OCFS2_I(inode)->ip_open_lockres); 3294 if (err < 0) 3295 mlog_errno(err); 3296 3297 status = err; 3298 3299 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 3300 &OCFS2_I(inode)->ip_inode_lockres); 3301 if (err < 0) 3302 mlog_errno(err); 3303 if (err < 0 && !status) 3304 status = err; 3305 3306 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 3307 &OCFS2_I(inode)->ip_rw_lockres); 3308 if (err < 0) 3309 mlog_errno(err); 3310 if (err < 0 && !status) 3311 status = err; 3312 3313 mlog_exit(status); 3314 return status; 3315 } 3316 3317 static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres, 3318 int new_level) 3319 { 3320 assert_spin_locked(&lockres->l_lock); 3321 3322 BUG_ON(lockres->l_blocking <= DLM_LOCK_NL); 3323 3324 if (lockres->l_level <= new_level) { 3325 mlog(ML_ERROR, "lockres %s, lvl %d <= %d, blcklst %d, mask %d, " 3326 "type %d, flags 0x%lx, hold %d %d, act %d %d, req %d, " 3327 "block %d, pgen %d\n", lockres->l_name, lockres->l_level, 3328 new_level, list_empty(&lockres->l_blocked_list), 3329 list_empty(&lockres->l_mask_waiters), lockres->l_type, 3330 lockres->l_flags, lockres->l_ro_holders, 3331 lockres->l_ex_holders, lockres->l_action, 3332 lockres->l_unlock_action, lockres->l_requested, 3333 lockres->l_blocking, lockres->l_pending_gen); 3334 BUG(); 3335 } 3336 3337 mlog(ML_BASTS, "lockres %s, level %d => %d, blocking %d\n", 3338 lockres->l_name, lockres->l_level, new_level, lockres->l_blocking); 3339 3340 lockres->l_action = OCFS2_AST_DOWNCONVERT; 3341 lockres->l_requested = new_level; 3342 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 3343 return lockres_set_pending(lockres); 3344 } 3345 3346 static int ocfs2_downconvert_lock(struct ocfs2_super *osb, 3347 struct ocfs2_lock_res *lockres, 3348 int new_level, 3349 int lvb, 3350 unsigned int generation) 3351 { 3352 int ret; 3353 u32 dlm_flags = DLM_LKF_CONVERT; 3354 3355 mlog_entry_void(); 3356 3357 mlog(ML_BASTS, "lockres %s, level %d => %d\n", lockres->l_name, 3358 lockres->l_level, new_level); 3359 3360 if (lvb) 3361 dlm_flags |= DLM_LKF_VALBLK; 3362 3363 ret = ocfs2_dlm_lock(osb->cconn, 3364 new_level, 3365 &lockres->l_lksb, 3366 dlm_flags, 3367 lockres->l_name, 3368 OCFS2_LOCK_ID_MAX_LEN - 1); 3369 lockres_clear_pending(lockres, generation, osb); 3370 if (ret) { 3371 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 3372 ocfs2_recover_from_dlm_error(lockres, 1); 3373 goto bail; 3374 } 3375 3376 ret = 0; 3377 bail: 3378 mlog_exit(ret); 3379 return ret; 3380 } 3381 3382 /* returns 1 when the caller should unlock and call ocfs2_dlm_unlock */ 3383 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb, 3384 struct ocfs2_lock_res *lockres) 3385 { 3386 assert_spin_locked(&lockres->l_lock); 3387 3388 mlog_entry_void(); 3389 3390 if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) { 3391 /* If we're already trying to cancel a lock conversion 3392 * then just drop the spinlock and allow the caller to 3393 * requeue this lock. */ 3394 mlog(ML_BASTS, "lockres %s, skip convert\n", lockres->l_name); 3395 return 0; 3396 } 3397 3398 /* were we in a convert when we got the bast fire? */ 3399 BUG_ON(lockres->l_action != OCFS2_AST_CONVERT && 3400 lockres->l_action != OCFS2_AST_DOWNCONVERT); 3401 /* set things up for the unlockast to know to just 3402 * clear out the ast_action and unset busy, etc. */ 3403 lockres->l_unlock_action = OCFS2_UNLOCK_CANCEL_CONVERT; 3404 3405 mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_BUSY), 3406 "lock %s, invalid flags: 0x%lx\n", 3407 lockres->l_name, lockres->l_flags); 3408 3409 mlog(ML_BASTS, "lockres %s\n", lockres->l_name); 3410 3411 return 1; 3412 } 3413 3414 static int ocfs2_cancel_convert(struct ocfs2_super *osb, 3415 struct ocfs2_lock_res *lockres) 3416 { 3417 int ret; 3418 3419 mlog_entry_void(); 3420 3421 ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, 3422 DLM_LKF_CANCEL); 3423 if (ret) { 3424 ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres); 3425 ocfs2_recover_from_dlm_error(lockres, 0); 3426 } 3427 3428 mlog(ML_BASTS, "lockres %s\n", lockres->l_name); 3429 3430 mlog_exit(ret); 3431 return ret; 3432 } 3433 3434 static int ocfs2_unblock_lock(struct ocfs2_super *osb, 3435 struct ocfs2_lock_res *lockres, 3436 struct ocfs2_unblock_ctl *ctl) 3437 { 3438 unsigned long flags; 3439 int blocking; 3440 int new_level; 3441 int level; 3442 int ret = 0; 3443 int set_lvb = 0; 3444 unsigned int gen; 3445 3446 mlog_entry_void(); 3447 3448 spin_lock_irqsave(&lockres->l_lock, flags); 3449 3450 recheck: 3451 /* 3452 * Is it still blocking? If not, we have no more work to do. 3453 */ 3454 if (!(lockres->l_flags & OCFS2_LOCK_BLOCKED)) { 3455 BUG_ON(lockres->l_blocking != DLM_LOCK_NL); 3456 spin_unlock_irqrestore(&lockres->l_lock, flags); 3457 ret = 0; 3458 goto leave; 3459 } 3460 3461 if (lockres->l_flags & OCFS2_LOCK_BUSY) { 3462 /* XXX 3463 * This is a *big* race. The OCFS2_LOCK_PENDING flag 3464 * exists entirely for one reason - another thread has set 3465 * OCFS2_LOCK_BUSY, but has *NOT* yet called dlm_lock(). 3466 * 3467 * If we do ocfs2_cancel_convert() before the other thread 3468 * calls dlm_lock(), our cancel will do nothing. We will 3469 * get no ast, and we will have no way of knowing the 3470 * cancel failed. Meanwhile, the other thread will call 3471 * into dlm_lock() and wait...forever. 3472 * 3473 * Why forever? Because another node has asked for the 3474 * lock first; that's why we're here in unblock_lock(). 3475 * 3476 * The solution is OCFS2_LOCK_PENDING. When PENDING is 3477 * set, we just requeue the unblock. Only when the other 3478 * thread has called dlm_lock() and cleared PENDING will 3479 * we then cancel their request. 3480 * 3481 * All callers of dlm_lock() must set OCFS2_DLM_PENDING 3482 * at the same time they set OCFS2_DLM_BUSY. They must 3483 * clear OCFS2_DLM_PENDING after dlm_lock() returns. 3484 */ 3485 if (lockres->l_flags & OCFS2_LOCK_PENDING) { 3486 mlog(ML_BASTS, "lockres %s, ReQ: Pending\n", 3487 lockres->l_name); 3488 goto leave_requeue; 3489 } 3490 3491 ctl->requeue = 1; 3492 ret = ocfs2_prepare_cancel_convert(osb, lockres); 3493 spin_unlock_irqrestore(&lockres->l_lock, flags); 3494 if (ret) { 3495 ret = ocfs2_cancel_convert(osb, lockres); 3496 if (ret < 0) 3497 mlog_errno(ret); 3498 } 3499 goto leave; 3500 } 3501 3502 /* 3503 * This prevents livelocks. OCFS2_LOCK_UPCONVERT_FINISHING flag is 3504 * set when the ast is received for an upconvert just before the 3505 * OCFS2_LOCK_BUSY flag is cleared. Now if the fs received a bast 3506 * on the heels of the ast, we want to delay the downconvert just 3507 * enough to allow the up requestor to do its task. Because this 3508 * lock is in the blocked queue, the lock will be downconverted 3509 * as soon as the requestor is done with the lock. 3510 */ 3511 if (lockres->l_flags & OCFS2_LOCK_UPCONVERT_FINISHING) 3512 goto leave_requeue; 3513 3514 /* 3515 * How can we block and yet be at NL? We were trying to upconvert 3516 * from NL and got canceled. The code comes back here, and now 3517 * we notice and clear BLOCKING. 3518 */ 3519 if (lockres->l_level == DLM_LOCK_NL) { 3520 BUG_ON(lockres->l_ex_holders || lockres->l_ro_holders); 3521 mlog(ML_BASTS, "lockres %s, Aborting dc\n", lockres->l_name); 3522 lockres->l_blocking = DLM_LOCK_NL; 3523 lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED); 3524 spin_unlock_irqrestore(&lockres->l_lock, flags); 3525 goto leave; 3526 } 3527 3528 /* if we're blocking an exclusive and we have *any* holders, 3529 * then requeue. */ 3530 if ((lockres->l_blocking == DLM_LOCK_EX) 3531 && (lockres->l_ex_holders || lockres->l_ro_holders)) { 3532 mlog(ML_BASTS, "lockres %s, ReQ: EX/PR Holders %u,%u\n", 3533 lockres->l_name, lockres->l_ex_holders, 3534 lockres->l_ro_holders); 3535 goto leave_requeue; 3536 } 3537 3538 /* If it's a PR we're blocking, then only 3539 * requeue if we've got any EX holders */ 3540 if (lockres->l_blocking == DLM_LOCK_PR && 3541 lockres->l_ex_holders) { 3542 mlog(ML_BASTS, "lockres %s, ReQ: EX Holders %u\n", 3543 lockres->l_name, lockres->l_ex_holders); 3544 goto leave_requeue; 3545 } 3546 3547 /* 3548 * Can we get a lock in this state if the holder counts are 3549 * zero? The meta data unblock code used to check this. 3550 */ 3551 if ((lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 3552 && (lockres->l_flags & OCFS2_LOCK_REFRESHING)) { 3553 mlog(ML_BASTS, "lockres %s, ReQ: Lock Refreshing\n", 3554 lockres->l_name); 3555 goto leave_requeue; 3556 } 3557 3558 new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking); 3559 3560 if (lockres->l_ops->check_downconvert 3561 && !lockres->l_ops->check_downconvert(lockres, new_level)) { 3562 mlog(ML_BASTS, "lockres %s, ReQ: Checkpointing\n", 3563 lockres->l_name); 3564 goto leave_requeue; 3565 } 3566 3567 /* If we get here, then we know that there are no more 3568 * incompatible holders (and anyone asking for an incompatible 3569 * lock is blocked). We can now downconvert the lock */ 3570 if (!lockres->l_ops->downconvert_worker) 3571 goto downconvert; 3572 3573 /* Some lockres types want to do a bit of work before 3574 * downconverting a lock. Allow that here. The worker function 3575 * may sleep, so we save off a copy of what we're blocking as 3576 * it may change while we're not holding the spin lock. */ 3577 blocking = lockres->l_blocking; 3578 level = lockres->l_level; 3579 spin_unlock_irqrestore(&lockres->l_lock, flags); 3580 3581 ctl->unblock_action = lockres->l_ops->downconvert_worker(lockres, blocking); 3582 3583 if (ctl->unblock_action == UNBLOCK_STOP_POST) { 3584 mlog(ML_BASTS, "lockres %s, UNBLOCK_STOP_POST\n", 3585 lockres->l_name); 3586 goto leave; 3587 } 3588 3589 spin_lock_irqsave(&lockres->l_lock, flags); 3590 if ((blocking != lockres->l_blocking) || (level != lockres->l_level)) { 3591 /* If this changed underneath us, then we can't drop 3592 * it just yet. */ 3593 mlog(ML_BASTS, "lockres %s, block=%d:%d, level=%d:%d, " 3594 "Recheck\n", lockres->l_name, blocking, 3595 lockres->l_blocking, level, lockres->l_level); 3596 goto recheck; 3597 } 3598 3599 downconvert: 3600 ctl->requeue = 0; 3601 3602 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) { 3603 if (lockres->l_level == DLM_LOCK_EX) 3604 set_lvb = 1; 3605 3606 /* 3607 * We only set the lvb if the lock has been fully 3608 * refreshed - otherwise we risk setting stale 3609 * data. Otherwise, there's no need to actually clear 3610 * out the lvb here as it's value is still valid. 3611 */ 3612 if (set_lvb && !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) 3613 lockres->l_ops->set_lvb(lockres); 3614 } 3615 3616 gen = ocfs2_prepare_downconvert(lockres, new_level); 3617 spin_unlock_irqrestore(&lockres->l_lock, flags); 3618 ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb, 3619 gen); 3620 3621 leave: 3622 mlog_exit(ret); 3623 return ret; 3624 3625 leave_requeue: 3626 spin_unlock_irqrestore(&lockres->l_lock, flags); 3627 ctl->requeue = 1; 3628 3629 mlog_exit(0); 3630 return 0; 3631 } 3632 3633 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres, 3634 int blocking) 3635 { 3636 struct inode *inode; 3637 struct address_space *mapping; 3638 struct ocfs2_inode_info *oi; 3639 3640 inode = ocfs2_lock_res_inode(lockres); 3641 mapping = inode->i_mapping; 3642 3643 if (S_ISDIR(inode->i_mode)) { 3644 oi = OCFS2_I(inode); 3645 oi->ip_dir_lock_gen++; 3646 mlog(0, "generation: %u\n", oi->ip_dir_lock_gen); 3647 goto out; 3648 } 3649 3650 if (!S_ISREG(inode->i_mode)) 3651 goto out; 3652 3653 /* 3654 * We need this before the filemap_fdatawrite() so that it can 3655 * transfer the dirty bit from the PTE to the 3656 * page. Unfortunately this means that even for EX->PR 3657 * downconverts, we'll lose our mappings and have to build 3658 * them up again. 3659 */ 3660 unmap_mapping_range(mapping, 0, 0, 0); 3661 3662 if (filemap_fdatawrite(mapping)) { 3663 mlog(ML_ERROR, "Could not sync inode %llu for downconvert!", 3664 (unsigned long long)OCFS2_I(inode)->ip_blkno); 3665 } 3666 sync_mapping_buffers(mapping); 3667 if (blocking == DLM_LOCK_EX) { 3668 truncate_inode_pages(mapping, 0); 3669 } else { 3670 /* We only need to wait on the I/O if we're not also 3671 * truncating pages because truncate_inode_pages waits 3672 * for us above. We don't truncate pages if we're 3673 * blocking anything < EXMODE because we want to keep 3674 * them around in that case. */ 3675 filemap_fdatawait(mapping); 3676 } 3677 3678 out: 3679 return UNBLOCK_CONTINUE; 3680 } 3681 3682 static int ocfs2_ci_checkpointed(struct ocfs2_caching_info *ci, 3683 struct ocfs2_lock_res *lockres, 3684 int new_level) 3685 { 3686 int checkpointed = ocfs2_ci_fully_checkpointed(ci); 3687 3688 BUG_ON(new_level != DLM_LOCK_NL && new_level != DLM_LOCK_PR); 3689 BUG_ON(lockres->l_level != DLM_LOCK_EX && !checkpointed); 3690 3691 if (checkpointed) 3692 return 1; 3693 3694 ocfs2_start_checkpoint(OCFS2_SB(ocfs2_metadata_cache_get_super(ci))); 3695 return 0; 3696 } 3697 3698 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres, 3699 int new_level) 3700 { 3701 struct inode *inode = ocfs2_lock_res_inode(lockres); 3702 3703 return ocfs2_ci_checkpointed(INODE_CACHE(inode), lockres, new_level); 3704 } 3705 3706 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres) 3707 { 3708 struct inode *inode = ocfs2_lock_res_inode(lockres); 3709 3710 __ocfs2_stuff_meta_lvb(inode); 3711 } 3712 3713 /* 3714 * Does the final reference drop on our dentry lock. Right now this 3715 * happens in the downconvert thread, but we could choose to simplify the 3716 * dlmglue API and push these off to the ocfs2_wq in the future. 3717 */ 3718 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb, 3719 struct ocfs2_lock_res *lockres) 3720 { 3721 struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres); 3722 ocfs2_dentry_lock_put(osb, dl); 3723 } 3724 3725 /* 3726 * d_delete() matching dentries before the lock downconvert. 3727 * 3728 * At this point, any process waiting to destroy the 3729 * dentry_lock due to last ref count is stopped by the 3730 * OCFS2_LOCK_QUEUED flag. 3731 * 3732 * We have two potential problems 3733 * 3734 * 1) If we do the last reference drop on our dentry_lock (via dput) 3735 * we'll wind up in ocfs2_release_dentry_lock(), waiting on 3736 * the downconvert to finish. Instead we take an elevated 3737 * reference and push the drop until after we've completed our 3738 * unblock processing. 3739 * 3740 * 2) There might be another process with a final reference, 3741 * waiting on us to finish processing. If this is the case, we 3742 * detect it and exit out - there's no more dentries anyway. 3743 */ 3744 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres, 3745 int blocking) 3746 { 3747 struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres); 3748 struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode); 3749 struct dentry *dentry; 3750 unsigned long flags; 3751 int extra_ref = 0; 3752 3753 /* 3754 * This node is blocking another node from getting a read 3755 * lock. This happens when we've renamed within a 3756 * directory. We've forced the other nodes to d_delete(), but 3757 * we never actually dropped our lock because it's still 3758 * valid. The downconvert code will retain a PR for this node, 3759 * so there's no further work to do. 3760 */ 3761 if (blocking == DLM_LOCK_PR) 3762 return UNBLOCK_CONTINUE; 3763 3764 /* 3765 * Mark this inode as potentially orphaned. The code in 3766 * ocfs2_delete_inode() will figure out whether it actually 3767 * needs to be freed or not. 3768 */ 3769 spin_lock(&oi->ip_lock); 3770 oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED; 3771 spin_unlock(&oi->ip_lock); 3772 3773 /* 3774 * Yuck. We need to make sure however that the check of 3775 * OCFS2_LOCK_FREEING and the extra reference are atomic with 3776 * respect to a reference decrement or the setting of that 3777 * flag. 3778 */ 3779 spin_lock_irqsave(&lockres->l_lock, flags); 3780 spin_lock(&dentry_attach_lock); 3781 if (!(lockres->l_flags & OCFS2_LOCK_FREEING) 3782 && dl->dl_count) { 3783 dl->dl_count++; 3784 extra_ref = 1; 3785 } 3786 spin_unlock(&dentry_attach_lock); 3787 spin_unlock_irqrestore(&lockres->l_lock, flags); 3788 3789 mlog(0, "extra_ref = %d\n", extra_ref); 3790 3791 /* 3792 * We have a process waiting on us in ocfs2_dentry_iput(), 3793 * which means we can't have any more outstanding 3794 * aliases. There's no need to do any more work. 3795 */ 3796 if (!extra_ref) 3797 return UNBLOCK_CONTINUE; 3798 3799 spin_lock(&dentry_attach_lock); 3800 while (1) { 3801 dentry = ocfs2_find_local_alias(dl->dl_inode, 3802 dl->dl_parent_blkno, 1); 3803 if (!dentry) 3804 break; 3805 spin_unlock(&dentry_attach_lock); 3806 3807 mlog(0, "d_delete(%.*s);\n", dentry->d_name.len, 3808 dentry->d_name.name); 3809 3810 /* 3811 * The following dcache calls may do an 3812 * iput(). Normally we don't want that from the 3813 * downconverting thread, but in this case it's ok 3814 * because the requesting node already has an 3815 * exclusive lock on the inode, so it can't be queued 3816 * for a downconvert. 3817 */ 3818 d_delete(dentry); 3819 dput(dentry); 3820 3821 spin_lock(&dentry_attach_lock); 3822 } 3823 spin_unlock(&dentry_attach_lock); 3824 3825 /* 3826 * If we are the last holder of this dentry lock, there is no 3827 * reason to downconvert so skip straight to the unlock. 3828 */ 3829 if (dl->dl_count == 1) 3830 return UNBLOCK_STOP_POST; 3831 3832 return UNBLOCK_CONTINUE_POST; 3833 } 3834 3835 static int ocfs2_check_refcount_downconvert(struct ocfs2_lock_res *lockres, 3836 int new_level) 3837 { 3838 struct ocfs2_refcount_tree *tree = 3839 ocfs2_lock_res_refcount_tree(lockres); 3840 3841 return ocfs2_ci_checkpointed(&tree->rf_ci, lockres, new_level); 3842 } 3843 3844 static int ocfs2_refcount_convert_worker(struct ocfs2_lock_res *lockres, 3845 int blocking) 3846 { 3847 struct ocfs2_refcount_tree *tree = 3848 ocfs2_lock_res_refcount_tree(lockres); 3849 3850 ocfs2_metadata_cache_purge(&tree->rf_ci); 3851 3852 return UNBLOCK_CONTINUE; 3853 } 3854 3855 static void ocfs2_set_qinfo_lvb(struct ocfs2_lock_res *lockres) 3856 { 3857 struct ocfs2_qinfo_lvb *lvb; 3858 struct ocfs2_mem_dqinfo *oinfo = ocfs2_lock_res_qinfo(lockres); 3859 struct mem_dqinfo *info = sb_dqinfo(oinfo->dqi_gi.dqi_sb, 3860 oinfo->dqi_gi.dqi_type); 3861 3862 mlog_entry_void(); 3863 3864 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 3865 lvb->lvb_version = OCFS2_QINFO_LVB_VERSION; 3866 lvb->lvb_bgrace = cpu_to_be32(info->dqi_bgrace); 3867 lvb->lvb_igrace = cpu_to_be32(info->dqi_igrace); 3868 lvb->lvb_syncms = cpu_to_be32(oinfo->dqi_syncms); 3869 lvb->lvb_blocks = cpu_to_be32(oinfo->dqi_gi.dqi_blocks); 3870 lvb->lvb_free_blk = cpu_to_be32(oinfo->dqi_gi.dqi_free_blk); 3871 lvb->lvb_free_entry = cpu_to_be32(oinfo->dqi_gi.dqi_free_entry); 3872 3873 mlog_exit_void(); 3874 } 3875 3876 void ocfs2_qinfo_unlock(struct ocfs2_mem_dqinfo *oinfo, int ex) 3877 { 3878 struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock; 3879 struct ocfs2_super *osb = OCFS2_SB(oinfo->dqi_gi.dqi_sb); 3880 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 3881 3882 mlog_entry_void(); 3883 if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb)) 3884 ocfs2_cluster_unlock(osb, lockres, level); 3885 mlog_exit_void(); 3886 } 3887 3888 static int ocfs2_refresh_qinfo(struct ocfs2_mem_dqinfo *oinfo) 3889 { 3890 struct mem_dqinfo *info = sb_dqinfo(oinfo->dqi_gi.dqi_sb, 3891 oinfo->dqi_gi.dqi_type); 3892 struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock; 3893 struct ocfs2_qinfo_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 3894 struct buffer_head *bh = NULL; 3895 struct ocfs2_global_disk_dqinfo *gdinfo; 3896 int status = 0; 3897 3898 if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) && 3899 lvb->lvb_version == OCFS2_QINFO_LVB_VERSION) { 3900 info->dqi_bgrace = be32_to_cpu(lvb->lvb_bgrace); 3901 info->dqi_igrace = be32_to_cpu(lvb->lvb_igrace); 3902 oinfo->dqi_syncms = be32_to_cpu(lvb->lvb_syncms); 3903 oinfo->dqi_gi.dqi_blocks = be32_to_cpu(lvb->lvb_blocks); 3904 oinfo->dqi_gi.dqi_free_blk = be32_to_cpu(lvb->lvb_free_blk); 3905 oinfo->dqi_gi.dqi_free_entry = 3906 be32_to_cpu(lvb->lvb_free_entry); 3907 } else { 3908 status = ocfs2_read_quota_phys_block(oinfo->dqi_gqinode, 3909 oinfo->dqi_giblk, &bh); 3910 if (status) { 3911 mlog_errno(status); 3912 goto bail; 3913 } 3914 gdinfo = (struct ocfs2_global_disk_dqinfo *) 3915 (bh->b_data + OCFS2_GLOBAL_INFO_OFF); 3916 info->dqi_bgrace = le32_to_cpu(gdinfo->dqi_bgrace); 3917 info->dqi_igrace = le32_to_cpu(gdinfo->dqi_igrace); 3918 oinfo->dqi_syncms = le32_to_cpu(gdinfo->dqi_syncms); 3919 oinfo->dqi_gi.dqi_blocks = le32_to_cpu(gdinfo->dqi_blocks); 3920 oinfo->dqi_gi.dqi_free_blk = le32_to_cpu(gdinfo->dqi_free_blk); 3921 oinfo->dqi_gi.dqi_free_entry = 3922 le32_to_cpu(gdinfo->dqi_free_entry); 3923 brelse(bh); 3924 ocfs2_track_lock_refresh(lockres); 3925 } 3926 3927 bail: 3928 return status; 3929 } 3930 3931 /* Lock quota info, this function expects at least shared lock on the quota file 3932 * so that we can safely refresh quota info from disk. */ 3933 int ocfs2_qinfo_lock(struct ocfs2_mem_dqinfo *oinfo, int ex) 3934 { 3935 struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock; 3936 struct ocfs2_super *osb = OCFS2_SB(oinfo->dqi_gi.dqi_sb); 3937 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 3938 int status = 0; 3939 3940 mlog_entry_void(); 3941 3942 /* On RO devices, locking really isn't needed... */ 3943 if (ocfs2_is_hard_readonly(osb)) { 3944 if (ex) 3945 status = -EROFS; 3946 goto bail; 3947 } 3948 if (ocfs2_mount_local(osb)) 3949 goto bail; 3950 3951 status = ocfs2_cluster_lock(osb, lockres, level, 0, 0); 3952 if (status < 0) { 3953 mlog_errno(status); 3954 goto bail; 3955 } 3956 if (!ocfs2_should_refresh_lock_res(lockres)) 3957 goto bail; 3958 /* OK, we have the lock but we need to refresh the quota info */ 3959 status = ocfs2_refresh_qinfo(oinfo); 3960 if (status) 3961 ocfs2_qinfo_unlock(oinfo, ex); 3962 ocfs2_complete_lock_res_refresh(lockres, status); 3963 bail: 3964 mlog_exit(status); 3965 return status; 3966 } 3967 3968 int ocfs2_refcount_lock(struct ocfs2_refcount_tree *ref_tree, int ex) 3969 { 3970 int status; 3971 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 3972 struct ocfs2_lock_res *lockres = &ref_tree->rf_lockres; 3973 struct ocfs2_super *osb = lockres->l_priv; 3974 3975 3976 if (ocfs2_is_hard_readonly(osb)) 3977 return -EROFS; 3978 3979 if (ocfs2_mount_local(osb)) 3980 return 0; 3981 3982 status = ocfs2_cluster_lock(osb, lockres, level, 0, 0); 3983 if (status < 0) 3984 mlog_errno(status); 3985 3986 return status; 3987 } 3988 3989 void ocfs2_refcount_unlock(struct ocfs2_refcount_tree *ref_tree, int ex) 3990 { 3991 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 3992 struct ocfs2_lock_res *lockres = &ref_tree->rf_lockres; 3993 struct ocfs2_super *osb = lockres->l_priv; 3994 3995 if (!ocfs2_mount_local(osb)) 3996 ocfs2_cluster_unlock(osb, lockres, level); 3997 } 3998 3999 static void ocfs2_process_blocked_lock(struct ocfs2_super *osb, 4000 struct ocfs2_lock_res *lockres) 4001 { 4002 int status; 4003 struct ocfs2_unblock_ctl ctl = {0, 0,}; 4004 unsigned long flags; 4005 4006 /* Our reference to the lockres in this function can be 4007 * considered valid until we remove the OCFS2_LOCK_QUEUED 4008 * flag. */ 4009 4010 mlog_entry_void(); 4011 4012 BUG_ON(!lockres); 4013 BUG_ON(!lockres->l_ops); 4014 4015 mlog(ML_BASTS, "lockres %s blocked\n", lockres->l_name); 4016 4017 /* Detect whether a lock has been marked as going away while 4018 * the downconvert thread was processing other things. A lock can 4019 * still be marked with OCFS2_LOCK_FREEING after this check, 4020 * but short circuiting here will still save us some 4021 * performance. */ 4022 spin_lock_irqsave(&lockres->l_lock, flags); 4023 if (lockres->l_flags & OCFS2_LOCK_FREEING) 4024 goto unqueue; 4025 spin_unlock_irqrestore(&lockres->l_lock, flags); 4026 4027 status = ocfs2_unblock_lock(osb, lockres, &ctl); 4028 if (status < 0) 4029 mlog_errno(status); 4030 4031 spin_lock_irqsave(&lockres->l_lock, flags); 4032 unqueue: 4033 if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) { 4034 lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED); 4035 } else 4036 ocfs2_schedule_blocked_lock(osb, lockres); 4037 4038 mlog(ML_BASTS, "lockres %s, requeue = %s.\n", lockres->l_name, 4039 ctl.requeue ? "yes" : "no"); 4040 spin_unlock_irqrestore(&lockres->l_lock, flags); 4041 4042 if (ctl.unblock_action != UNBLOCK_CONTINUE 4043 && lockres->l_ops->post_unlock) 4044 lockres->l_ops->post_unlock(osb, lockres); 4045 4046 mlog_exit_void(); 4047 } 4048 4049 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb, 4050 struct ocfs2_lock_res *lockres) 4051 { 4052 mlog_entry_void(); 4053 4054 assert_spin_locked(&lockres->l_lock); 4055 4056 if (lockres->l_flags & OCFS2_LOCK_FREEING) { 4057 /* Do not schedule a lock for downconvert when it's on 4058 * the way to destruction - any nodes wanting access 4059 * to the resource will get it soon. */ 4060 mlog(ML_BASTS, "lockres %s won't be scheduled: flags 0x%lx\n", 4061 lockres->l_name, lockres->l_flags); 4062 return; 4063 } 4064 4065 lockres_or_flags(lockres, OCFS2_LOCK_QUEUED); 4066 4067 spin_lock(&osb->dc_task_lock); 4068 if (list_empty(&lockres->l_blocked_list)) { 4069 list_add_tail(&lockres->l_blocked_list, 4070 &osb->blocked_lock_list); 4071 osb->blocked_lock_count++; 4072 } 4073 spin_unlock(&osb->dc_task_lock); 4074 4075 mlog_exit_void(); 4076 } 4077 4078 static void ocfs2_downconvert_thread_do_work(struct ocfs2_super *osb) 4079 { 4080 unsigned long processed; 4081 struct ocfs2_lock_res *lockres; 4082 4083 mlog_entry_void(); 4084 4085 spin_lock(&osb->dc_task_lock); 4086 /* grab this early so we know to try again if a state change and 4087 * wake happens part-way through our work */ 4088 osb->dc_work_sequence = osb->dc_wake_sequence; 4089 4090 processed = osb->blocked_lock_count; 4091 while (processed) { 4092 BUG_ON(list_empty(&osb->blocked_lock_list)); 4093 4094 lockres = list_entry(osb->blocked_lock_list.next, 4095 struct ocfs2_lock_res, l_blocked_list); 4096 list_del_init(&lockres->l_blocked_list); 4097 osb->blocked_lock_count--; 4098 spin_unlock(&osb->dc_task_lock); 4099 4100 BUG_ON(!processed); 4101 processed--; 4102 4103 ocfs2_process_blocked_lock(osb, lockres); 4104 4105 spin_lock(&osb->dc_task_lock); 4106 } 4107 spin_unlock(&osb->dc_task_lock); 4108 4109 mlog_exit_void(); 4110 } 4111 4112 static int ocfs2_downconvert_thread_lists_empty(struct ocfs2_super *osb) 4113 { 4114 int empty = 0; 4115 4116 spin_lock(&osb->dc_task_lock); 4117 if (list_empty(&osb->blocked_lock_list)) 4118 empty = 1; 4119 4120 spin_unlock(&osb->dc_task_lock); 4121 return empty; 4122 } 4123 4124 static int ocfs2_downconvert_thread_should_wake(struct ocfs2_super *osb) 4125 { 4126 int should_wake = 0; 4127 4128 spin_lock(&osb->dc_task_lock); 4129 if (osb->dc_work_sequence != osb->dc_wake_sequence) 4130 should_wake = 1; 4131 spin_unlock(&osb->dc_task_lock); 4132 4133 return should_wake; 4134 } 4135 4136 static int ocfs2_downconvert_thread(void *arg) 4137 { 4138 int status = 0; 4139 struct ocfs2_super *osb = arg; 4140 4141 /* only quit once we've been asked to stop and there is no more 4142 * work available */ 4143 while (!(kthread_should_stop() && 4144 ocfs2_downconvert_thread_lists_empty(osb))) { 4145 4146 wait_event_interruptible(osb->dc_event, 4147 ocfs2_downconvert_thread_should_wake(osb) || 4148 kthread_should_stop()); 4149 4150 mlog(0, "downconvert_thread: awoken\n"); 4151 4152 ocfs2_downconvert_thread_do_work(osb); 4153 } 4154 4155 osb->dc_task = NULL; 4156 return status; 4157 } 4158 4159 void ocfs2_wake_downconvert_thread(struct ocfs2_super *osb) 4160 { 4161 spin_lock(&osb->dc_task_lock); 4162 /* make sure the voting thread gets a swipe at whatever changes 4163 * the caller may have made to the voting state */ 4164 osb->dc_wake_sequence++; 4165 spin_unlock(&osb->dc_task_lock); 4166 wake_up(&osb->dc_event); 4167 } 4168