1 /* -*- mode: c; c-basic-offset: 8; -*- 2 * vim: noexpandtab sw=8 ts=8 sts=0: 3 * 4 * dlmglue.c 5 * 6 * Code which implements an OCFS2 specific interface to our DLM. 7 * 8 * Copyright (C) 2003, 2004 Oracle. All rights reserved. 9 * 10 * This program is free software; you can redistribute it and/or 11 * modify it under the terms of the GNU General Public 12 * License as published by the Free Software Foundation; either 13 * version 2 of the License, or (at your option) any later version. 14 * 15 * This program is distributed in the hope that it will be useful, 16 * but WITHOUT ANY WARRANTY; without even the implied warranty of 17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 18 * General Public License for more details. 19 * 20 * You should have received a copy of the GNU General Public 21 * License along with this program; if not, write to the 22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 23 * Boston, MA 021110-1307, USA. 24 */ 25 26 #include <linux/types.h> 27 #include <linux/slab.h> 28 #include <linux/highmem.h> 29 #include <linux/mm.h> 30 #include <linux/kthread.h> 31 #include <linux/pagemap.h> 32 #include <linux/debugfs.h> 33 #include <linux/seq_file.h> 34 #include <linux/time.h> 35 #include <linux/quotaops.h> 36 37 #define MLOG_MASK_PREFIX ML_DLM_GLUE 38 #include <cluster/masklog.h> 39 40 #include "ocfs2.h" 41 #include "ocfs2_lockingver.h" 42 43 #include "alloc.h" 44 #include "dcache.h" 45 #include "dlmglue.h" 46 #include "extent_map.h" 47 #include "file.h" 48 #include "heartbeat.h" 49 #include "inode.h" 50 #include "journal.h" 51 #include "stackglue.h" 52 #include "slot_map.h" 53 #include "super.h" 54 #include "uptodate.h" 55 #include "quota.h" 56 #include "refcounttree.h" 57 58 #include "buffer_head_io.h" 59 60 struct ocfs2_mask_waiter { 61 struct list_head mw_item; 62 int mw_status; 63 struct completion mw_complete; 64 unsigned long mw_mask; 65 unsigned long mw_goal; 66 #ifdef CONFIG_OCFS2_FS_STATS 67 ktime_t mw_lock_start; 68 #endif 69 }; 70 71 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres); 72 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres); 73 static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres); 74 static struct ocfs2_super *ocfs2_get_qinfo_osb(struct ocfs2_lock_res *lockres); 75 76 /* 77 * Return value from ->downconvert_worker functions. 78 * 79 * These control the precise actions of ocfs2_unblock_lock() 80 * and ocfs2_process_blocked_lock() 81 * 82 */ 83 enum ocfs2_unblock_action { 84 UNBLOCK_CONTINUE = 0, /* Continue downconvert */ 85 UNBLOCK_CONTINUE_POST = 1, /* Continue downconvert, fire 86 * ->post_unlock callback */ 87 UNBLOCK_STOP_POST = 2, /* Do not downconvert, fire 88 * ->post_unlock() callback. */ 89 }; 90 91 struct ocfs2_unblock_ctl { 92 int requeue; 93 enum ocfs2_unblock_action unblock_action; 94 }; 95 96 /* Lockdep class keys */ 97 struct lock_class_key lockdep_keys[OCFS2_NUM_LOCK_TYPES]; 98 99 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres, 100 int new_level); 101 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres); 102 103 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres, 104 int blocking); 105 106 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres, 107 int blocking); 108 109 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb, 110 struct ocfs2_lock_res *lockres); 111 112 static void ocfs2_set_qinfo_lvb(struct ocfs2_lock_res *lockres); 113 114 static int ocfs2_check_refcount_downconvert(struct ocfs2_lock_res *lockres, 115 int new_level); 116 static int ocfs2_refcount_convert_worker(struct ocfs2_lock_res *lockres, 117 int blocking); 118 119 #define mlog_meta_lvb(__level, __lockres) ocfs2_dump_meta_lvb_info(__level, __PRETTY_FUNCTION__, __LINE__, __lockres) 120 121 /* This aids in debugging situations where a bad LVB might be involved. */ 122 static void ocfs2_dump_meta_lvb_info(u64 level, 123 const char *function, 124 unsigned int line, 125 struct ocfs2_lock_res *lockres) 126 { 127 struct ocfs2_meta_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 128 129 mlog(level, "LVB information for %s (called from %s:%u):\n", 130 lockres->l_name, function, line); 131 mlog(level, "version: %u, clusters: %u, generation: 0x%x\n", 132 lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters), 133 be32_to_cpu(lvb->lvb_igeneration)); 134 mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n", 135 (unsigned long long)be64_to_cpu(lvb->lvb_isize), 136 be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid), 137 be16_to_cpu(lvb->lvb_imode)); 138 mlog(level, "nlink %u, atime_packed 0x%llx, ctime_packed 0x%llx, " 139 "mtime_packed 0x%llx iattr 0x%x\n", be16_to_cpu(lvb->lvb_inlink), 140 (long long)be64_to_cpu(lvb->lvb_iatime_packed), 141 (long long)be64_to_cpu(lvb->lvb_ictime_packed), 142 (long long)be64_to_cpu(lvb->lvb_imtime_packed), 143 be32_to_cpu(lvb->lvb_iattr)); 144 } 145 146 147 /* 148 * OCFS2 Lock Resource Operations 149 * 150 * These fine tune the behavior of the generic dlmglue locking infrastructure. 151 * 152 * The most basic of lock types can point ->l_priv to their respective 153 * struct ocfs2_super and allow the default actions to manage things. 154 * 155 * Right now, each lock type also needs to implement an init function, 156 * and trivial lock/unlock wrappers. ocfs2_simple_drop_lockres() 157 * should be called when the lock is no longer needed (i.e., object 158 * destruction time). 159 */ 160 struct ocfs2_lock_res_ops { 161 /* 162 * Translate an ocfs2_lock_res * into an ocfs2_super *. Define 163 * this callback if ->l_priv is not an ocfs2_super pointer 164 */ 165 struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *); 166 167 /* 168 * Optionally called in the downconvert thread after a 169 * successful downconvert. The lockres will not be referenced 170 * after this callback is called, so it is safe to free 171 * memory, etc. 172 * 173 * The exact semantics of when this is called are controlled 174 * by ->downconvert_worker() 175 */ 176 void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *); 177 178 /* 179 * Allow a lock type to add checks to determine whether it is 180 * safe to downconvert a lock. Return 0 to re-queue the 181 * downconvert at a later time, nonzero to continue. 182 * 183 * For most locks, the default checks that there are no 184 * incompatible holders are sufficient. 185 * 186 * Called with the lockres spinlock held. 187 */ 188 int (*check_downconvert)(struct ocfs2_lock_res *, int); 189 190 /* 191 * Allows a lock type to populate the lock value block. This 192 * is called on downconvert, and when we drop a lock. 193 * 194 * Locks that want to use this should set LOCK_TYPE_USES_LVB 195 * in the flags field. 196 * 197 * Called with the lockres spinlock held. 198 */ 199 void (*set_lvb)(struct ocfs2_lock_res *); 200 201 /* 202 * Called from the downconvert thread when it is determined 203 * that a lock will be downconverted. This is called without 204 * any locks held so the function can do work that might 205 * schedule (syncing out data, etc). 206 * 207 * This should return any one of the ocfs2_unblock_action 208 * values, depending on what it wants the thread to do. 209 */ 210 int (*downconvert_worker)(struct ocfs2_lock_res *, int); 211 212 /* 213 * LOCK_TYPE_* flags which describe the specific requirements 214 * of a lock type. Descriptions of each individual flag follow. 215 */ 216 int flags; 217 }; 218 219 /* 220 * Some locks want to "refresh" potentially stale data when a 221 * meaningful (PRMODE or EXMODE) lock level is first obtained. If this 222 * flag is set, the OCFS2_LOCK_NEEDS_REFRESH flag will be set on the 223 * individual lockres l_flags member from the ast function. It is 224 * expected that the locking wrapper will clear the 225 * OCFS2_LOCK_NEEDS_REFRESH flag when done. 226 */ 227 #define LOCK_TYPE_REQUIRES_REFRESH 0x1 228 229 /* 230 * Indicate that a lock type makes use of the lock value block. The 231 * ->set_lvb lock type callback must be defined. 232 */ 233 #define LOCK_TYPE_USES_LVB 0x2 234 235 static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = { 236 .get_osb = ocfs2_get_inode_osb, 237 .flags = 0, 238 }; 239 240 static struct ocfs2_lock_res_ops ocfs2_inode_inode_lops = { 241 .get_osb = ocfs2_get_inode_osb, 242 .check_downconvert = ocfs2_check_meta_downconvert, 243 .set_lvb = ocfs2_set_meta_lvb, 244 .downconvert_worker = ocfs2_data_convert_worker, 245 .flags = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB, 246 }; 247 248 static struct ocfs2_lock_res_ops ocfs2_super_lops = { 249 .flags = LOCK_TYPE_REQUIRES_REFRESH, 250 }; 251 252 static struct ocfs2_lock_res_ops ocfs2_rename_lops = { 253 .flags = 0, 254 }; 255 256 static struct ocfs2_lock_res_ops ocfs2_nfs_sync_lops = { 257 .flags = 0, 258 }; 259 260 static struct ocfs2_lock_res_ops ocfs2_orphan_scan_lops = { 261 .flags = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB, 262 }; 263 264 static struct ocfs2_lock_res_ops ocfs2_dentry_lops = { 265 .get_osb = ocfs2_get_dentry_osb, 266 .post_unlock = ocfs2_dentry_post_unlock, 267 .downconvert_worker = ocfs2_dentry_convert_worker, 268 .flags = 0, 269 }; 270 271 static struct ocfs2_lock_res_ops ocfs2_inode_open_lops = { 272 .get_osb = ocfs2_get_inode_osb, 273 .flags = 0, 274 }; 275 276 static struct ocfs2_lock_res_ops ocfs2_flock_lops = { 277 .get_osb = ocfs2_get_file_osb, 278 .flags = 0, 279 }; 280 281 static struct ocfs2_lock_res_ops ocfs2_qinfo_lops = { 282 .set_lvb = ocfs2_set_qinfo_lvb, 283 .get_osb = ocfs2_get_qinfo_osb, 284 .flags = LOCK_TYPE_REQUIRES_REFRESH | LOCK_TYPE_USES_LVB, 285 }; 286 287 static struct ocfs2_lock_res_ops ocfs2_refcount_block_lops = { 288 .check_downconvert = ocfs2_check_refcount_downconvert, 289 .downconvert_worker = ocfs2_refcount_convert_worker, 290 .flags = 0, 291 }; 292 293 static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres) 294 { 295 return lockres->l_type == OCFS2_LOCK_TYPE_META || 296 lockres->l_type == OCFS2_LOCK_TYPE_RW || 297 lockres->l_type == OCFS2_LOCK_TYPE_OPEN; 298 } 299 300 static inline struct ocfs2_lock_res *ocfs2_lksb_to_lock_res(struct ocfs2_dlm_lksb *lksb) 301 { 302 return container_of(lksb, struct ocfs2_lock_res, l_lksb); 303 } 304 305 static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres) 306 { 307 BUG_ON(!ocfs2_is_inode_lock(lockres)); 308 309 return (struct inode *) lockres->l_priv; 310 } 311 312 static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres) 313 { 314 BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY); 315 316 return (struct ocfs2_dentry_lock *)lockres->l_priv; 317 } 318 319 static inline struct ocfs2_mem_dqinfo *ocfs2_lock_res_qinfo(struct ocfs2_lock_res *lockres) 320 { 321 BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_QINFO); 322 323 return (struct ocfs2_mem_dqinfo *)lockres->l_priv; 324 } 325 326 static inline struct ocfs2_refcount_tree * 327 ocfs2_lock_res_refcount_tree(struct ocfs2_lock_res *res) 328 { 329 return container_of(res, struct ocfs2_refcount_tree, rf_lockres); 330 } 331 332 static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *lockres) 333 { 334 if (lockres->l_ops->get_osb) 335 return lockres->l_ops->get_osb(lockres); 336 337 return (struct ocfs2_super *)lockres->l_priv; 338 } 339 340 static int ocfs2_lock_create(struct ocfs2_super *osb, 341 struct ocfs2_lock_res *lockres, 342 int level, 343 u32 dlm_flags); 344 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres, 345 int wanted); 346 static void __ocfs2_cluster_unlock(struct ocfs2_super *osb, 347 struct ocfs2_lock_res *lockres, 348 int level, unsigned long caller_ip); 349 static inline void ocfs2_cluster_unlock(struct ocfs2_super *osb, 350 struct ocfs2_lock_res *lockres, 351 int level) 352 { 353 __ocfs2_cluster_unlock(osb, lockres, level, _RET_IP_); 354 } 355 356 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres); 357 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres); 358 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres); 359 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, int level); 360 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb, 361 struct ocfs2_lock_res *lockres); 362 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres, 363 int convert); 364 #define ocfs2_log_dlm_error(_func, _err, _lockres) do { \ 365 if ((_lockres)->l_type != OCFS2_LOCK_TYPE_DENTRY) \ 366 mlog(ML_ERROR, "DLM error %d while calling %s on resource %s\n", \ 367 _err, _func, _lockres->l_name); \ 368 else \ 369 mlog(ML_ERROR, "DLM error %d while calling %s on resource %.*s%08x\n", \ 370 _err, _func, OCFS2_DENTRY_LOCK_INO_START - 1, (_lockres)->l_name, \ 371 (unsigned int)ocfs2_get_dentry_lock_ino(_lockres)); \ 372 } while (0) 373 static int ocfs2_downconvert_thread(void *arg); 374 static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb, 375 struct ocfs2_lock_res *lockres); 376 static int ocfs2_inode_lock_update(struct inode *inode, 377 struct buffer_head **bh); 378 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb); 379 static inline int ocfs2_highest_compat_lock_level(int level); 380 static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres, 381 int new_level); 382 static int ocfs2_downconvert_lock(struct ocfs2_super *osb, 383 struct ocfs2_lock_res *lockres, 384 int new_level, 385 int lvb, 386 unsigned int generation); 387 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb, 388 struct ocfs2_lock_res *lockres); 389 static int ocfs2_cancel_convert(struct ocfs2_super *osb, 390 struct ocfs2_lock_res *lockres); 391 392 393 static void ocfs2_build_lock_name(enum ocfs2_lock_type type, 394 u64 blkno, 395 u32 generation, 396 char *name) 397 { 398 int len; 399 400 BUG_ON(type >= OCFS2_NUM_LOCK_TYPES); 401 402 len = snprintf(name, OCFS2_LOCK_ID_MAX_LEN, "%c%s%016llx%08x", 403 ocfs2_lock_type_char(type), OCFS2_LOCK_ID_PAD, 404 (long long)blkno, generation); 405 406 BUG_ON(len != (OCFS2_LOCK_ID_MAX_LEN - 1)); 407 408 mlog(0, "built lock resource with name: %s\n", name); 409 } 410 411 static DEFINE_SPINLOCK(ocfs2_dlm_tracking_lock); 412 413 static void ocfs2_add_lockres_tracking(struct ocfs2_lock_res *res, 414 struct ocfs2_dlm_debug *dlm_debug) 415 { 416 mlog(0, "Add tracking for lockres %s\n", res->l_name); 417 418 spin_lock(&ocfs2_dlm_tracking_lock); 419 list_add(&res->l_debug_list, &dlm_debug->d_lockres_tracking); 420 spin_unlock(&ocfs2_dlm_tracking_lock); 421 } 422 423 static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res) 424 { 425 spin_lock(&ocfs2_dlm_tracking_lock); 426 if (!list_empty(&res->l_debug_list)) 427 list_del_init(&res->l_debug_list); 428 spin_unlock(&ocfs2_dlm_tracking_lock); 429 } 430 431 #ifdef CONFIG_OCFS2_FS_STATS 432 static void ocfs2_init_lock_stats(struct ocfs2_lock_res *res) 433 { 434 res->l_lock_refresh = 0; 435 memset(&res->l_lock_prmode, 0, sizeof(struct ocfs2_lock_stats)); 436 memset(&res->l_lock_exmode, 0, sizeof(struct ocfs2_lock_stats)); 437 } 438 439 static void ocfs2_update_lock_stats(struct ocfs2_lock_res *res, int level, 440 struct ocfs2_mask_waiter *mw, int ret) 441 { 442 u32 usec; 443 ktime_t kt; 444 struct ocfs2_lock_stats *stats; 445 446 if (level == LKM_PRMODE) 447 stats = &res->l_lock_prmode; 448 else if (level == LKM_EXMODE) 449 stats = &res->l_lock_exmode; 450 else 451 return; 452 453 kt = ktime_sub(ktime_get(), mw->mw_lock_start); 454 usec = ktime_to_us(kt); 455 456 stats->ls_gets++; 457 stats->ls_total += ktime_to_ns(kt); 458 /* overflow */ 459 if (unlikely(stats->ls_gets == 0)) { 460 stats->ls_gets++; 461 stats->ls_total = ktime_to_ns(kt); 462 } 463 464 if (stats->ls_max < usec) 465 stats->ls_max = usec; 466 467 if (ret) 468 stats->ls_fail++; 469 } 470 471 static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres) 472 { 473 lockres->l_lock_refresh++; 474 } 475 476 static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw) 477 { 478 mw->mw_lock_start = ktime_get(); 479 } 480 #else 481 static inline void ocfs2_init_lock_stats(struct ocfs2_lock_res *res) 482 { 483 } 484 static inline void ocfs2_update_lock_stats(struct ocfs2_lock_res *res, 485 int level, struct ocfs2_mask_waiter *mw, int ret) 486 { 487 } 488 static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres) 489 { 490 } 491 static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw) 492 { 493 } 494 #endif 495 496 static void ocfs2_lock_res_init_common(struct ocfs2_super *osb, 497 struct ocfs2_lock_res *res, 498 enum ocfs2_lock_type type, 499 struct ocfs2_lock_res_ops *ops, 500 void *priv) 501 { 502 res->l_type = type; 503 res->l_ops = ops; 504 res->l_priv = priv; 505 506 res->l_level = DLM_LOCK_IV; 507 res->l_requested = DLM_LOCK_IV; 508 res->l_blocking = DLM_LOCK_IV; 509 res->l_action = OCFS2_AST_INVALID; 510 res->l_unlock_action = OCFS2_UNLOCK_INVALID; 511 512 res->l_flags = OCFS2_LOCK_INITIALIZED; 513 514 ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug); 515 516 ocfs2_init_lock_stats(res); 517 #ifdef CONFIG_DEBUG_LOCK_ALLOC 518 if (type != OCFS2_LOCK_TYPE_OPEN) 519 lockdep_init_map(&res->l_lockdep_map, ocfs2_lock_type_strings[type], 520 &lockdep_keys[type], 0); 521 else 522 res->l_lockdep_map.key = NULL; 523 #endif 524 } 525 526 void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res) 527 { 528 /* This also clears out the lock status block */ 529 memset(res, 0, sizeof(struct ocfs2_lock_res)); 530 spin_lock_init(&res->l_lock); 531 init_waitqueue_head(&res->l_event); 532 INIT_LIST_HEAD(&res->l_blocked_list); 533 INIT_LIST_HEAD(&res->l_mask_waiters); 534 } 535 536 void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res, 537 enum ocfs2_lock_type type, 538 unsigned int generation, 539 struct inode *inode) 540 { 541 struct ocfs2_lock_res_ops *ops; 542 543 switch(type) { 544 case OCFS2_LOCK_TYPE_RW: 545 ops = &ocfs2_inode_rw_lops; 546 break; 547 case OCFS2_LOCK_TYPE_META: 548 ops = &ocfs2_inode_inode_lops; 549 break; 550 case OCFS2_LOCK_TYPE_OPEN: 551 ops = &ocfs2_inode_open_lops; 552 break; 553 default: 554 mlog_bug_on_msg(1, "type: %d\n", type); 555 ops = NULL; /* thanks, gcc */ 556 break; 557 }; 558 559 ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno, 560 generation, res->l_name); 561 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode); 562 } 563 564 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres) 565 { 566 struct inode *inode = ocfs2_lock_res_inode(lockres); 567 568 return OCFS2_SB(inode->i_sb); 569 } 570 571 static struct ocfs2_super *ocfs2_get_qinfo_osb(struct ocfs2_lock_res *lockres) 572 { 573 struct ocfs2_mem_dqinfo *info = lockres->l_priv; 574 575 return OCFS2_SB(info->dqi_gi.dqi_sb); 576 } 577 578 static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres) 579 { 580 struct ocfs2_file_private *fp = lockres->l_priv; 581 582 return OCFS2_SB(fp->fp_file->f_mapping->host->i_sb); 583 } 584 585 static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres) 586 { 587 __be64 inode_blkno_be; 588 589 memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], 590 sizeof(__be64)); 591 592 return be64_to_cpu(inode_blkno_be); 593 } 594 595 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres) 596 { 597 struct ocfs2_dentry_lock *dl = lockres->l_priv; 598 599 return OCFS2_SB(dl->dl_inode->i_sb); 600 } 601 602 void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl, 603 u64 parent, struct inode *inode) 604 { 605 int len; 606 u64 inode_blkno = OCFS2_I(inode)->ip_blkno; 607 __be64 inode_blkno_be = cpu_to_be64(inode_blkno); 608 struct ocfs2_lock_res *lockres = &dl->dl_lockres; 609 610 ocfs2_lock_res_init_once(lockres); 611 612 /* 613 * Unfortunately, the standard lock naming scheme won't work 614 * here because we have two 16 byte values to use. Instead, 615 * we'll stuff the inode number as a binary value. We still 616 * want error prints to show something without garbling the 617 * display, so drop a null byte in there before the inode 618 * number. A future version of OCFS2 will likely use all 619 * binary lock names. The stringified names have been a 620 * tremendous aid in debugging, but now that the debugfs 621 * interface exists, we can mangle things there if need be. 622 * 623 * NOTE: We also drop the standard "pad" value (the total lock 624 * name size stays the same though - the last part is all 625 * zeros due to the memset in ocfs2_lock_res_init_once() 626 */ 627 len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START, 628 "%c%016llx", 629 ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY), 630 (long long)parent); 631 632 BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1)); 633 634 memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be, 635 sizeof(__be64)); 636 637 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres, 638 OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops, 639 dl); 640 } 641 642 static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res, 643 struct ocfs2_super *osb) 644 { 645 /* Superblock lockres doesn't come from a slab so we call init 646 * once on it manually. */ 647 ocfs2_lock_res_init_once(res); 648 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO, 649 0, res->l_name); 650 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER, 651 &ocfs2_super_lops, osb); 652 } 653 654 static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res, 655 struct ocfs2_super *osb) 656 { 657 /* Rename lockres doesn't come from a slab so we call init 658 * once on it manually. */ 659 ocfs2_lock_res_init_once(res); 660 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name); 661 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME, 662 &ocfs2_rename_lops, osb); 663 } 664 665 static void ocfs2_nfs_sync_lock_res_init(struct ocfs2_lock_res *res, 666 struct ocfs2_super *osb) 667 { 668 /* nfs_sync lockres doesn't come from a slab so we call init 669 * once on it manually. */ 670 ocfs2_lock_res_init_once(res); 671 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_NFS_SYNC, 0, 0, res->l_name); 672 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_NFS_SYNC, 673 &ocfs2_nfs_sync_lops, osb); 674 } 675 676 static void ocfs2_orphan_scan_lock_res_init(struct ocfs2_lock_res *res, 677 struct ocfs2_super *osb) 678 { 679 ocfs2_lock_res_init_once(res); 680 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_ORPHAN_SCAN, 0, 0, res->l_name); 681 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_ORPHAN_SCAN, 682 &ocfs2_orphan_scan_lops, osb); 683 } 684 685 void ocfs2_file_lock_res_init(struct ocfs2_lock_res *lockres, 686 struct ocfs2_file_private *fp) 687 { 688 struct inode *inode = fp->fp_file->f_mapping->host; 689 struct ocfs2_inode_info *oi = OCFS2_I(inode); 690 691 ocfs2_lock_res_init_once(lockres); 692 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_FLOCK, oi->ip_blkno, 693 inode->i_generation, lockres->l_name); 694 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres, 695 OCFS2_LOCK_TYPE_FLOCK, &ocfs2_flock_lops, 696 fp); 697 lockres->l_flags |= OCFS2_LOCK_NOCACHE; 698 } 699 700 void ocfs2_qinfo_lock_res_init(struct ocfs2_lock_res *lockres, 701 struct ocfs2_mem_dqinfo *info) 702 { 703 ocfs2_lock_res_init_once(lockres); 704 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_QINFO, info->dqi_gi.dqi_type, 705 0, lockres->l_name); 706 ocfs2_lock_res_init_common(OCFS2_SB(info->dqi_gi.dqi_sb), lockres, 707 OCFS2_LOCK_TYPE_QINFO, &ocfs2_qinfo_lops, 708 info); 709 } 710 711 void ocfs2_refcount_lock_res_init(struct ocfs2_lock_res *lockres, 712 struct ocfs2_super *osb, u64 ref_blkno, 713 unsigned int generation) 714 { 715 ocfs2_lock_res_init_once(lockres); 716 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_REFCOUNT, ref_blkno, 717 generation, lockres->l_name); 718 ocfs2_lock_res_init_common(osb, lockres, OCFS2_LOCK_TYPE_REFCOUNT, 719 &ocfs2_refcount_block_lops, osb); 720 } 721 722 void ocfs2_lock_res_free(struct ocfs2_lock_res *res) 723 { 724 if (!(res->l_flags & OCFS2_LOCK_INITIALIZED)) 725 return; 726 727 ocfs2_remove_lockres_tracking(res); 728 729 mlog_bug_on_msg(!list_empty(&res->l_blocked_list), 730 "Lockres %s is on the blocked list\n", 731 res->l_name); 732 mlog_bug_on_msg(!list_empty(&res->l_mask_waiters), 733 "Lockres %s has mask waiters pending\n", 734 res->l_name); 735 mlog_bug_on_msg(spin_is_locked(&res->l_lock), 736 "Lockres %s is locked\n", 737 res->l_name); 738 mlog_bug_on_msg(res->l_ro_holders, 739 "Lockres %s has %u ro holders\n", 740 res->l_name, res->l_ro_holders); 741 mlog_bug_on_msg(res->l_ex_holders, 742 "Lockres %s has %u ex holders\n", 743 res->l_name, res->l_ex_holders); 744 745 /* Need to clear out the lock status block for the dlm */ 746 memset(&res->l_lksb, 0, sizeof(res->l_lksb)); 747 748 res->l_flags = 0UL; 749 } 750 751 static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres, 752 int level) 753 { 754 BUG_ON(!lockres); 755 756 switch(level) { 757 case DLM_LOCK_EX: 758 lockres->l_ex_holders++; 759 break; 760 case DLM_LOCK_PR: 761 lockres->l_ro_holders++; 762 break; 763 default: 764 BUG(); 765 } 766 } 767 768 static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres, 769 int level) 770 { 771 BUG_ON(!lockres); 772 773 switch(level) { 774 case DLM_LOCK_EX: 775 BUG_ON(!lockres->l_ex_holders); 776 lockres->l_ex_holders--; 777 break; 778 case DLM_LOCK_PR: 779 BUG_ON(!lockres->l_ro_holders); 780 lockres->l_ro_holders--; 781 break; 782 default: 783 BUG(); 784 } 785 } 786 787 /* WARNING: This function lives in a world where the only three lock 788 * levels are EX, PR, and NL. It *will* have to be adjusted when more 789 * lock types are added. */ 790 static inline int ocfs2_highest_compat_lock_level(int level) 791 { 792 int new_level = DLM_LOCK_EX; 793 794 if (level == DLM_LOCK_EX) 795 new_level = DLM_LOCK_NL; 796 else if (level == DLM_LOCK_PR) 797 new_level = DLM_LOCK_PR; 798 return new_level; 799 } 800 801 static void lockres_set_flags(struct ocfs2_lock_res *lockres, 802 unsigned long newflags) 803 { 804 struct ocfs2_mask_waiter *mw, *tmp; 805 806 assert_spin_locked(&lockres->l_lock); 807 808 lockres->l_flags = newflags; 809 810 list_for_each_entry_safe(mw, tmp, &lockres->l_mask_waiters, mw_item) { 811 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal) 812 continue; 813 814 list_del_init(&mw->mw_item); 815 mw->mw_status = 0; 816 complete(&mw->mw_complete); 817 } 818 } 819 static void lockres_or_flags(struct ocfs2_lock_res *lockres, unsigned long or) 820 { 821 lockres_set_flags(lockres, lockres->l_flags | or); 822 } 823 static void lockres_clear_flags(struct ocfs2_lock_res *lockres, 824 unsigned long clear) 825 { 826 lockres_set_flags(lockres, lockres->l_flags & ~clear); 827 } 828 829 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres) 830 { 831 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 832 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED)); 833 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); 834 BUG_ON(lockres->l_blocking <= DLM_LOCK_NL); 835 836 lockres->l_level = lockres->l_requested; 837 if (lockres->l_level <= 838 ocfs2_highest_compat_lock_level(lockres->l_blocking)) { 839 lockres->l_blocking = DLM_LOCK_NL; 840 lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED); 841 } 842 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 843 } 844 845 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres) 846 { 847 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 848 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED)); 849 850 /* Convert from RO to EX doesn't really need anything as our 851 * information is already up to data. Convert from NL to 852 * *anything* however should mark ourselves as needing an 853 * update */ 854 if (lockres->l_level == DLM_LOCK_NL && 855 lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 856 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 857 858 lockres->l_level = lockres->l_requested; 859 860 /* 861 * We set the OCFS2_LOCK_UPCONVERT_FINISHING flag before clearing 862 * the OCFS2_LOCK_BUSY flag to prevent the dc thread from 863 * downconverting the lock before the upconvert has fully completed. 864 * Do not prevent the dc thread from downconverting if NONBLOCK lock 865 * had already returned. 866 */ 867 if (!(lockres->l_flags & OCFS2_LOCK_NONBLOCK_FINISHED)) 868 lockres_or_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING); 869 else 870 lockres_clear_flags(lockres, OCFS2_LOCK_NONBLOCK_FINISHED); 871 872 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 873 } 874 875 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres) 876 { 877 BUG_ON((!(lockres->l_flags & OCFS2_LOCK_BUSY))); 878 BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED); 879 880 if (lockres->l_requested > DLM_LOCK_NL && 881 !(lockres->l_flags & OCFS2_LOCK_LOCAL) && 882 lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 883 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 884 885 lockres->l_level = lockres->l_requested; 886 lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED); 887 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 888 } 889 890 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, 891 int level) 892 { 893 int needs_downconvert = 0; 894 895 assert_spin_locked(&lockres->l_lock); 896 897 if (level > lockres->l_blocking) { 898 /* only schedule a downconvert if we haven't already scheduled 899 * one that goes low enough to satisfy the level we're 900 * blocking. this also catches the case where we get 901 * duplicate BASTs */ 902 if (ocfs2_highest_compat_lock_level(level) < 903 ocfs2_highest_compat_lock_level(lockres->l_blocking)) 904 needs_downconvert = 1; 905 906 lockres->l_blocking = level; 907 } 908 909 mlog(ML_BASTS, "lockres %s, block %d, level %d, l_block %d, dwn %d\n", 910 lockres->l_name, level, lockres->l_level, lockres->l_blocking, 911 needs_downconvert); 912 913 if (needs_downconvert) 914 lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED); 915 mlog(0, "needs_downconvert = %d\n", needs_downconvert); 916 return needs_downconvert; 917 } 918 919 /* 920 * OCFS2_LOCK_PENDING and l_pending_gen. 921 * 922 * Why does OCFS2_LOCK_PENDING exist? To close a race between setting 923 * OCFS2_LOCK_BUSY and calling ocfs2_dlm_lock(). See ocfs2_unblock_lock() 924 * for more details on the race. 925 * 926 * OCFS2_LOCK_PENDING closes the race quite nicely. However, it introduces 927 * a race on itself. In o2dlm, we can get the ast before ocfs2_dlm_lock() 928 * returns. The ast clears OCFS2_LOCK_BUSY, and must therefore clear 929 * OCFS2_LOCK_PENDING at the same time. When ocfs2_dlm_lock() returns, 930 * the caller is going to try to clear PENDING again. If nothing else is 931 * happening, __lockres_clear_pending() sees PENDING is unset and does 932 * nothing. 933 * 934 * But what if another path (eg downconvert thread) has just started a 935 * new locking action? The other path has re-set PENDING. Our path 936 * cannot clear PENDING, because that will re-open the original race 937 * window. 938 * 939 * [Example] 940 * 941 * ocfs2_meta_lock() 942 * ocfs2_cluster_lock() 943 * set BUSY 944 * set PENDING 945 * drop l_lock 946 * ocfs2_dlm_lock() 947 * ocfs2_locking_ast() ocfs2_downconvert_thread() 948 * clear PENDING ocfs2_unblock_lock() 949 * take_l_lock 950 * !BUSY 951 * ocfs2_prepare_downconvert() 952 * set BUSY 953 * set PENDING 954 * drop l_lock 955 * take l_lock 956 * clear PENDING 957 * drop l_lock 958 * <window> 959 * ocfs2_dlm_lock() 960 * 961 * So as you can see, we now have a window where l_lock is not held, 962 * PENDING is not set, and ocfs2_dlm_lock() has not been called. 963 * 964 * The core problem is that ocfs2_cluster_lock() has cleared the PENDING 965 * set by ocfs2_prepare_downconvert(). That wasn't nice. 966 * 967 * To solve this we introduce l_pending_gen. A call to 968 * lockres_clear_pending() will only do so when it is passed a generation 969 * number that matches the lockres. lockres_set_pending() will return the 970 * current generation number. When ocfs2_cluster_lock() goes to clear 971 * PENDING, it passes the generation it got from set_pending(). In our 972 * example above, the generation numbers will *not* match. Thus, 973 * ocfs2_cluster_lock() will not clear the PENDING set by 974 * ocfs2_prepare_downconvert(). 975 */ 976 977 /* Unlocked version for ocfs2_locking_ast() */ 978 static void __lockres_clear_pending(struct ocfs2_lock_res *lockres, 979 unsigned int generation, 980 struct ocfs2_super *osb) 981 { 982 assert_spin_locked(&lockres->l_lock); 983 984 /* 985 * The ast and locking functions can race us here. The winner 986 * will clear pending, the loser will not. 987 */ 988 if (!(lockres->l_flags & OCFS2_LOCK_PENDING) || 989 (lockres->l_pending_gen != generation)) 990 return; 991 992 lockres_clear_flags(lockres, OCFS2_LOCK_PENDING); 993 lockres->l_pending_gen++; 994 995 /* 996 * The downconvert thread may have skipped us because we 997 * were PENDING. Wake it up. 998 */ 999 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) 1000 ocfs2_wake_downconvert_thread(osb); 1001 } 1002 1003 /* Locked version for callers of ocfs2_dlm_lock() */ 1004 static void lockres_clear_pending(struct ocfs2_lock_res *lockres, 1005 unsigned int generation, 1006 struct ocfs2_super *osb) 1007 { 1008 unsigned long flags; 1009 1010 spin_lock_irqsave(&lockres->l_lock, flags); 1011 __lockres_clear_pending(lockres, generation, osb); 1012 spin_unlock_irqrestore(&lockres->l_lock, flags); 1013 } 1014 1015 static unsigned int lockres_set_pending(struct ocfs2_lock_res *lockres) 1016 { 1017 assert_spin_locked(&lockres->l_lock); 1018 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 1019 1020 lockres_or_flags(lockres, OCFS2_LOCK_PENDING); 1021 1022 return lockres->l_pending_gen; 1023 } 1024 1025 static void ocfs2_blocking_ast(struct ocfs2_dlm_lksb *lksb, int level) 1026 { 1027 struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb); 1028 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 1029 int needs_downconvert; 1030 unsigned long flags; 1031 1032 BUG_ON(level <= DLM_LOCK_NL); 1033 1034 mlog(ML_BASTS, "BAST fired for lockres %s, blocking %d, level %d, " 1035 "type %s\n", lockres->l_name, level, lockres->l_level, 1036 ocfs2_lock_type_string(lockres->l_type)); 1037 1038 /* 1039 * We can skip the bast for locks which don't enable caching - 1040 * they'll be dropped at the earliest possible time anyway. 1041 */ 1042 if (lockres->l_flags & OCFS2_LOCK_NOCACHE) 1043 return; 1044 1045 spin_lock_irqsave(&lockres->l_lock, flags); 1046 needs_downconvert = ocfs2_generic_handle_bast(lockres, level); 1047 if (needs_downconvert) 1048 ocfs2_schedule_blocked_lock(osb, lockres); 1049 spin_unlock_irqrestore(&lockres->l_lock, flags); 1050 1051 wake_up(&lockres->l_event); 1052 1053 ocfs2_wake_downconvert_thread(osb); 1054 } 1055 1056 static void ocfs2_locking_ast(struct ocfs2_dlm_lksb *lksb) 1057 { 1058 struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb); 1059 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 1060 unsigned long flags; 1061 int status; 1062 1063 spin_lock_irqsave(&lockres->l_lock, flags); 1064 1065 status = ocfs2_dlm_lock_status(&lockres->l_lksb); 1066 1067 if (status == -EAGAIN) { 1068 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 1069 goto out; 1070 } 1071 1072 if (status) { 1073 mlog(ML_ERROR, "lockres %s: lksb status value of %d!\n", 1074 lockres->l_name, status); 1075 spin_unlock_irqrestore(&lockres->l_lock, flags); 1076 return; 1077 } 1078 1079 mlog(ML_BASTS, "AST fired for lockres %s, action %d, unlock %d, " 1080 "level %d => %d\n", lockres->l_name, lockres->l_action, 1081 lockres->l_unlock_action, lockres->l_level, lockres->l_requested); 1082 1083 switch(lockres->l_action) { 1084 case OCFS2_AST_ATTACH: 1085 ocfs2_generic_handle_attach_action(lockres); 1086 lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL); 1087 break; 1088 case OCFS2_AST_CONVERT: 1089 ocfs2_generic_handle_convert_action(lockres); 1090 break; 1091 case OCFS2_AST_DOWNCONVERT: 1092 ocfs2_generic_handle_downconvert_action(lockres); 1093 break; 1094 default: 1095 mlog(ML_ERROR, "lockres %s: AST fired with invalid action: %u, " 1096 "flags 0x%lx, unlock: %u\n", 1097 lockres->l_name, lockres->l_action, lockres->l_flags, 1098 lockres->l_unlock_action); 1099 BUG(); 1100 } 1101 out: 1102 /* set it to something invalid so if we get called again we 1103 * can catch it. */ 1104 lockres->l_action = OCFS2_AST_INVALID; 1105 1106 /* Did we try to cancel this lock? Clear that state */ 1107 if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) 1108 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 1109 1110 /* 1111 * We may have beaten the locking functions here. We certainly 1112 * know that dlm_lock() has been called :-) 1113 * Because we can't have two lock calls in flight at once, we 1114 * can use lockres->l_pending_gen. 1115 */ 1116 __lockres_clear_pending(lockres, lockres->l_pending_gen, osb); 1117 1118 wake_up(&lockres->l_event); 1119 spin_unlock_irqrestore(&lockres->l_lock, flags); 1120 } 1121 1122 static void ocfs2_unlock_ast(struct ocfs2_dlm_lksb *lksb, int error) 1123 { 1124 struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb); 1125 unsigned long flags; 1126 1127 mlog(ML_BASTS, "UNLOCK AST fired for lockres %s, action = %d\n", 1128 lockres->l_name, lockres->l_unlock_action); 1129 1130 spin_lock_irqsave(&lockres->l_lock, flags); 1131 if (error) { 1132 mlog(ML_ERROR, "Dlm passes error %d for lock %s, " 1133 "unlock_action %d\n", error, lockres->l_name, 1134 lockres->l_unlock_action); 1135 spin_unlock_irqrestore(&lockres->l_lock, flags); 1136 return; 1137 } 1138 1139 switch(lockres->l_unlock_action) { 1140 case OCFS2_UNLOCK_CANCEL_CONVERT: 1141 mlog(0, "Cancel convert success for %s\n", lockres->l_name); 1142 lockres->l_action = OCFS2_AST_INVALID; 1143 /* Downconvert thread may have requeued this lock, we 1144 * need to wake it. */ 1145 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) 1146 ocfs2_wake_downconvert_thread(ocfs2_get_lockres_osb(lockres)); 1147 break; 1148 case OCFS2_UNLOCK_DROP_LOCK: 1149 lockres->l_level = DLM_LOCK_IV; 1150 break; 1151 default: 1152 BUG(); 1153 } 1154 1155 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 1156 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 1157 wake_up(&lockres->l_event); 1158 spin_unlock_irqrestore(&lockres->l_lock, flags); 1159 } 1160 1161 /* 1162 * This is the filesystem locking protocol. It provides the lock handling 1163 * hooks for the underlying DLM. It has a maximum version number. 1164 * The version number allows interoperability with systems running at 1165 * the same major number and an equal or smaller minor number. 1166 * 1167 * Whenever the filesystem does new things with locks (adds or removes a 1168 * lock, orders them differently, does different things underneath a lock), 1169 * the version must be changed. The protocol is negotiated when joining 1170 * the dlm domain. A node may join the domain if its major version is 1171 * identical to all other nodes and its minor version is greater than 1172 * or equal to all other nodes. When its minor version is greater than 1173 * the other nodes, it will run at the minor version specified by the 1174 * other nodes. 1175 * 1176 * If a locking change is made that will not be compatible with older 1177 * versions, the major number must be increased and the minor version set 1178 * to zero. If a change merely adds a behavior that can be disabled when 1179 * speaking to older versions, the minor version must be increased. If a 1180 * change adds a fully backwards compatible change (eg, LVB changes that 1181 * are just ignored by older versions), the version does not need to be 1182 * updated. 1183 */ 1184 static struct ocfs2_locking_protocol lproto = { 1185 .lp_max_version = { 1186 .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR, 1187 .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR, 1188 }, 1189 .lp_lock_ast = ocfs2_locking_ast, 1190 .lp_blocking_ast = ocfs2_blocking_ast, 1191 .lp_unlock_ast = ocfs2_unlock_ast, 1192 }; 1193 1194 void ocfs2_set_locking_protocol(void) 1195 { 1196 ocfs2_stack_glue_set_max_proto_version(&lproto.lp_max_version); 1197 } 1198 1199 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres, 1200 int convert) 1201 { 1202 unsigned long flags; 1203 1204 spin_lock_irqsave(&lockres->l_lock, flags); 1205 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 1206 lockres_clear_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING); 1207 if (convert) 1208 lockres->l_action = OCFS2_AST_INVALID; 1209 else 1210 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 1211 spin_unlock_irqrestore(&lockres->l_lock, flags); 1212 1213 wake_up(&lockres->l_event); 1214 } 1215 1216 /* Note: If we detect another process working on the lock (i.e., 1217 * OCFS2_LOCK_BUSY), we'll bail out returning 0. It's up to the caller 1218 * to do the right thing in that case. 1219 */ 1220 static int ocfs2_lock_create(struct ocfs2_super *osb, 1221 struct ocfs2_lock_res *lockres, 1222 int level, 1223 u32 dlm_flags) 1224 { 1225 int ret = 0; 1226 unsigned long flags; 1227 unsigned int gen; 1228 1229 mlog(0, "lock %s, level = %d, flags = %u\n", lockres->l_name, level, 1230 dlm_flags); 1231 1232 spin_lock_irqsave(&lockres->l_lock, flags); 1233 if ((lockres->l_flags & OCFS2_LOCK_ATTACHED) || 1234 (lockres->l_flags & OCFS2_LOCK_BUSY)) { 1235 spin_unlock_irqrestore(&lockres->l_lock, flags); 1236 goto bail; 1237 } 1238 1239 lockres->l_action = OCFS2_AST_ATTACH; 1240 lockres->l_requested = level; 1241 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 1242 gen = lockres_set_pending(lockres); 1243 spin_unlock_irqrestore(&lockres->l_lock, flags); 1244 1245 ret = ocfs2_dlm_lock(osb->cconn, 1246 level, 1247 &lockres->l_lksb, 1248 dlm_flags, 1249 lockres->l_name, 1250 OCFS2_LOCK_ID_MAX_LEN - 1); 1251 lockres_clear_pending(lockres, gen, osb); 1252 if (ret) { 1253 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 1254 ocfs2_recover_from_dlm_error(lockres, 1); 1255 } 1256 1257 mlog(0, "lock %s, return from ocfs2_dlm_lock\n", lockres->l_name); 1258 1259 bail: 1260 return ret; 1261 } 1262 1263 static inline int ocfs2_check_wait_flag(struct ocfs2_lock_res *lockres, 1264 int flag) 1265 { 1266 unsigned long flags; 1267 int ret; 1268 1269 spin_lock_irqsave(&lockres->l_lock, flags); 1270 ret = lockres->l_flags & flag; 1271 spin_unlock_irqrestore(&lockres->l_lock, flags); 1272 1273 return ret; 1274 } 1275 1276 static inline void ocfs2_wait_on_busy_lock(struct ocfs2_lock_res *lockres) 1277 1278 { 1279 wait_event(lockres->l_event, 1280 !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_BUSY)); 1281 } 1282 1283 static inline void ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res *lockres) 1284 1285 { 1286 wait_event(lockres->l_event, 1287 !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_REFRESHING)); 1288 } 1289 1290 /* predict what lock level we'll be dropping down to on behalf 1291 * of another node, and return true if the currently wanted 1292 * level will be compatible with it. */ 1293 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres, 1294 int wanted) 1295 { 1296 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); 1297 1298 return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking); 1299 } 1300 1301 static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw) 1302 { 1303 INIT_LIST_HEAD(&mw->mw_item); 1304 init_completion(&mw->mw_complete); 1305 ocfs2_init_start_time(mw); 1306 } 1307 1308 static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw) 1309 { 1310 wait_for_completion(&mw->mw_complete); 1311 /* Re-arm the completion in case we want to wait on it again */ 1312 reinit_completion(&mw->mw_complete); 1313 return mw->mw_status; 1314 } 1315 1316 static void lockres_add_mask_waiter(struct ocfs2_lock_res *lockres, 1317 struct ocfs2_mask_waiter *mw, 1318 unsigned long mask, 1319 unsigned long goal) 1320 { 1321 BUG_ON(!list_empty(&mw->mw_item)); 1322 1323 assert_spin_locked(&lockres->l_lock); 1324 1325 list_add_tail(&mw->mw_item, &lockres->l_mask_waiters); 1326 mw->mw_mask = mask; 1327 mw->mw_goal = goal; 1328 } 1329 1330 /* returns 0 if the mw that was removed was already satisfied, -EBUSY 1331 * if the mask still hadn't reached its goal */ 1332 static int __lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres, 1333 struct ocfs2_mask_waiter *mw) 1334 { 1335 int ret = 0; 1336 1337 assert_spin_locked(&lockres->l_lock); 1338 if (!list_empty(&mw->mw_item)) { 1339 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal) 1340 ret = -EBUSY; 1341 1342 list_del_init(&mw->mw_item); 1343 init_completion(&mw->mw_complete); 1344 } 1345 1346 return ret; 1347 } 1348 1349 static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres, 1350 struct ocfs2_mask_waiter *mw) 1351 { 1352 unsigned long flags; 1353 int ret = 0; 1354 1355 spin_lock_irqsave(&lockres->l_lock, flags); 1356 ret = __lockres_remove_mask_waiter(lockres, mw); 1357 spin_unlock_irqrestore(&lockres->l_lock, flags); 1358 1359 return ret; 1360 1361 } 1362 1363 static int ocfs2_wait_for_mask_interruptible(struct ocfs2_mask_waiter *mw, 1364 struct ocfs2_lock_res *lockres) 1365 { 1366 int ret; 1367 1368 ret = wait_for_completion_interruptible(&mw->mw_complete); 1369 if (ret) 1370 lockres_remove_mask_waiter(lockres, mw); 1371 else 1372 ret = mw->mw_status; 1373 /* Re-arm the completion in case we want to wait on it again */ 1374 reinit_completion(&mw->mw_complete); 1375 return ret; 1376 } 1377 1378 static int __ocfs2_cluster_lock(struct ocfs2_super *osb, 1379 struct ocfs2_lock_res *lockres, 1380 int level, 1381 u32 lkm_flags, 1382 int arg_flags, 1383 int l_subclass, 1384 unsigned long caller_ip) 1385 { 1386 struct ocfs2_mask_waiter mw; 1387 int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR); 1388 int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */ 1389 unsigned long flags; 1390 unsigned int gen; 1391 int noqueue_attempted = 0; 1392 int dlm_locked = 0; 1393 int kick_dc = 0; 1394 1395 if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED)) { 1396 mlog_errno(-EINVAL); 1397 return -EINVAL; 1398 } 1399 1400 ocfs2_init_mask_waiter(&mw); 1401 1402 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) 1403 lkm_flags |= DLM_LKF_VALBLK; 1404 1405 again: 1406 wait = 0; 1407 1408 spin_lock_irqsave(&lockres->l_lock, flags); 1409 1410 if (catch_signals && signal_pending(current)) { 1411 ret = -ERESTARTSYS; 1412 goto unlock; 1413 } 1414 1415 mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING, 1416 "Cluster lock called on freeing lockres %s! flags " 1417 "0x%lx\n", lockres->l_name, lockres->l_flags); 1418 1419 /* We only compare against the currently granted level 1420 * here. If the lock is blocked waiting on a downconvert, 1421 * we'll get caught below. */ 1422 if (lockres->l_flags & OCFS2_LOCK_BUSY && 1423 level > lockres->l_level) { 1424 /* is someone sitting in dlm_lock? If so, wait on 1425 * them. */ 1426 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1427 wait = 1; 1428 goto unlock; 1429 } 1430 1431 if (lockres->l_flags & OCFS2_LOCK_UPCONVERT_FINISHING) { 1432 /* 1433 * We've upconverted. If the lock now has a level we can 1434 * work with, we take it. If, however, the lock is not at the 1435 * required level, we go thru the full cycle. One way this could 1436 * happen is if a process requesting an upconvert to PR is 1437 * closely followed by another requesting upconvert to an EX. 1438 * If the process requesting EX lands here, we want it to 1439 * continue attempting to upconvert and let the process 1440 * requesting PR take the lock. 1441 * If multiple processes request upconvert to PR, the first one 1442 * here will take the lock. The others will have to go thru the 1443 * OCFS2_LOCK_BLOCKED check to ensure that there is no pending 1444 * downconvert request. 1445 */ 1446 if (level <= lockres->l_level) 1447 goto update_holders; 1448 } 1449 1450 if (lockres->l_flags & OCFS2_LOCK_BLOCKED && 1451 !ocfs2_may_continue_on_blocked_lock(lockres, level)) { 1452 /* is the lock is currently blocked on behalf of 1453 * another node */ 1454 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BLOCKED, 0); 1455 wait = 1; 1456 goto unlock; 1457 } 1458 1459 if (level > lockres->l_level) { 1460 if (noqueue_attempted > 0) { 1461 ret = -EAGAIN; 1462 goto unlock; 1463 } 1464 if (lkm_flags & DLM_LKF_NOQUEUE) 1465 noqueue_attempted = 1; 1466 1467 if (lockres->l_action != OCFS2_AST_INVALID) 1468 mlog(ML_ERROR, "lockres %s has action %u pending\n", 1469 lockres->l_name, lockres->l_action); 1470 1471 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 1472 lockres->l_action = OCFS2_AST_ATTACH; 1473 lkm_flags &= ~DLM_LKF_CONVERT; 1474 } else { 1475 lockres->l_action = OCFS2_AST_CONVERT; 1476 lkm_flags |= DLM_LKF_CONVERT; 1477 } 1478 1479 lockres->l_requested = level; 1480 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 1481 gen = lockres_set_pending(lockres); 1482 spin_unlock_irqrestore(&lockres->l_lock, flags); 1483 1484 BUG_ON(level == DLM_LOCK_IV); 1485 BUG_ON(level == DLM_LOCK_NL); 1486 1487 mlog(ML_BASTS, "lockres %s, convert from %d to %d\n", 1488 lockres->l_name, lockres->l_level, level); 1489 1490 /* call dlm_lock to upgrade lock now */ 1491 ret = ocfs2_dlm_lock(osb->cconn, 1492 level, 1493 &lockres->l_lksb, 1494 lkm_flags, 1495 lockres->l_name, 1496 OCFS2_LOCK_ID_MAX_LEN - 1); 1497 lockres_clear_pending(lockres, gen, osb); 1498 if (ret) { 1499 if (!(lkm_flags & DLM_LKF_NOQUEUE) || 1500 (ret != -EAGAIN)) { 1501 ocfs2_log_dlm_error("ocfs2_dlm_lock", 1502 ret, lockres); 1503 } 1504 ocfs2_recover_from_dlm_error(lockres, 1); 1505 goto out; 1506 } 1507 dlm_locked = 1; 1508 1509 mlog(0, "lock %s, successful return from ocfs2_dlm_lock\n", 1510 lockres->l_name); 1511 1512 /* At this point we've gone inside the dlm and need to 1513 * complete our work regardless. */ 1514 catch_signals = 0; 1515 1516 /* wait for busy to clear and carry on */ 1517 goto again; 1518 } 1519 1520 update_holders: 1521 /* Ok, if we get here then we're good to go. */ 1522 ocfs2_inc_holders(lockres, level); 1523 1524 ret = 0; 1525 unlock: 1526 lockres_clear_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING); 1527 1528 /* ocfs2_unblock_lock reques on seeing OCFS2_LOCK_UPCONVERT_FINISHING */ 1529 kick_dc = (lockres->l_flags & OCFS2_LOCK_BLOCKED); 1530 1531 spin_unlock_irqrestore(&lockres->l_lock, flags); 1532 if (kick_dc) 1533 ocfs2_wake_downconvert_thread(osb); 1534 out: 1535 /* 1536 * This is helping work around a lock inversion between the page lock 1537 * and dlm locks. One path holds the page lock while calling aops 1538 * which block acquiring dlm locks. The voting thread holds dlm 1539 * locks while acquiring page locks while down converting data locks. 1540 * This block is helping an aop path notice the inversion and back 1541 * off to unlock its page lock before trying the dlm lock again. 1542 */ 1543 if (wait && arg_flags & OCFS2_LOCK_NONBLOCK && 1544 mw.mw_mask & (OCFS2_LOCK_BUSY|OCFS2_LOCK_BLOCKED)) { 1545 wait = 0; 1546 spin_lock_irqsave(&lockres->l_lock, flags); 1547 if (__lockres_remove_mask_waiter(lockres, &mw)) { 1548 if (dlm_locked) 1549 lockres_or_flags(lockres, 1550 OCFS2_LOCK_NONBLOCK_FINISHED); 1551 spin_unlock_irqrestore(&lockres->l_lock, flags); 1552 ret = -EAGAIN; 1553 } else { 1554 spin_unlock_irqrestore(&lockres->l_lock, flags); 1555 goto again; 1556 } 1557 } 1558 if (wait) { 1559 ret = ocfs2_wait_for_mask(&mw); 1560 if (ret == 0) 1561 goto again; 1562 mlog_errno(ret); 1563 } 1564 ocfs2_update_lock_stats(lockres, level, &mw, ret); 1565 1566 #ifdef CONFIG_DEBUG_LOCK_ALLOC 1567 if (!ret && lockres->l_lockdep_map.key != NULL) { 1568 if (level == DLM_LOCK_PR) 1569 rwsem_acquire_read(&lockres->l_lockdep_map, l_subclass, 1570 !!(arg_flags & OCFS2_META_LOCK_NOQUEUE), 1571 caller_ip); 1572 else 1573 rwsem_acquire(&lockres->l_lockdep_map, l_subclass, 1574 !!(arg_flags & OCFS2_META_LOCK_NOQUEUE), 1575 caller_ip); 1576 } 1577 #endif 1578 return ret; 1579 } 1580 1581 static inline int ocfs2_cluster_lock(struct ocfs2_super *osb, 1582 struct ocfs2_lock_res *lockres, 1583 int level, 1584 u32 lkm_flags, 1585 int arg_flags) 1586 { 1587 return __ocfs2_cluster_lock(osb, lockres, level, lkm_flags, arg_flags, 1588 0, _RET_IP_); 1589 } 1590 1591 1592 static void __ocfs2_cluster_unlock(struct ocfs2_super *osb, 1593 struct ocfs2_lock_res *lockres, 1594 int level, 1595 unsigned long caller_ip) 1596 { 1597 unsigned long flags; 1598 1599 spin_lock_irqsave(&lockres->l_lock, flags); 1600 ocfs2_dec_holders(lockres, level); 1601 ocfs2_downconvert_on_unlock(osb, lockres); 1602 spin_unlock_irqrestore(&lockres->l_lock, flags); 1603 #ifdef CONFIG_DEBUG_LOCK_ALLOC 1604 if (lockres->l_lockdep_map.key != NULL) 1605 rwsem_release(&lockres->l_lockdep_map, 1, caller_ip); 1606 #endif 1607 } 1608 1609 static int ocfs2_create_new_lock(struct ocfs2_super *osb, 1610 struct ocfs2_lock_res *lockres, 1611 int ex, 1612 int local) 1613 { 1614 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 1615 unsigned long flags; 1616 u32 lkm_flags = local ? DLM_LKF_LOCAL : 0; 1617 1618 spin_lock_irqsave(&lockres->l_lock, flags); 1619 BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED); 1620 lockres_or_flags(lockres, OCFS2_LOCK_LOCAL); 1621 spin_unlock_irqrestore(&lockres->l_lock, flags); 1622 1623 return ocfs2_lock_create(osb, lockres, level, lkm_flags); 1624 } 1625 1626 /* Grants us an EX lock on the data and metadata resources, skipping 1627 * the normal cluster directory lookup. Use this ONLY on newly created 1628 * inodes which other nodes can't possibly see, and which haven't been 1629 * hashed in the inode hash yet. This can give us a good performance 1630 * increase as it'll skip the network broadcast normally associated 1631 * with creating a new lock resource. */ 1632 int ocfs2_create_new_inode_locks(struct inode *inode) 1633 { 1634 int ret; 1635 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1636 1637 BUG_ON(!inode); 1638 BUG_ON(!ocfs2_inode_is_new(inode)); 1639 1640 mlog(0, "Inode %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno); 1641 1642 /* NOTE: That we don't increment any of the holder counts, nor 1643 * do we add anything to a journal handle. Since this is 1644 * supposed to be a new inode which the cluster doesn't know 1645 * about yet, there is no need to. As far as the LVB handling 1646 * is concerned, this is basically like acquiring an EX lock 1647 * on a resource which has an invalid one -- we'll set it 1648 * valid when we release the EX. */ 1649 1650 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1, 1); 1651 if (ret) { 1652 mlog_errno(ret); 1653 goto bail; 1654 } 1655 1656 /* 1657 * We don't want to use DLM_LKF_LOCAL on a meta data lock as they 1658 * don't use a generation in their lock names. 1659 */ 1660 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_inode_lockres, 1, 0); 1661 if (ret) { 1662 mlog_errno(ret); 1663 goto bail; 1664 } 1665 1666 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_open_lockres, 0, 0); 1667 if (ret) { 1668 mlog_errno(ret); 1669 goto bail; 1670 } 1671 1672 bail: 1673 return ret; 1674 } 1675 1676 int ocfs2_rw_lock(struct inode *inode, int write) 1677 { 1678 int status, level; 1679 struct ocfs2_lock_res *lockres; 1680 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1681 1682 BUG_ON(!inode); 1683 1684 mlog(0, "inode %llu take %s RW lock\n", 1685 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1686 write ? "EXMODE" : "PRMODE"); 1687 1688 if (ocfs2_mount_local(osb)) 1689 return 0; 1690 1691 lockres = &OCFS2_I(inode)->ip_rw_lockres; 1692 1693 level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1694 1695 status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0, 1696 0); 1697 if (status < 0) 1698 mlog_errno(status); 1699 1700 return status; 1701 } 1702 1703 void ocfs2_rw_unlock(struct inode *inode, int write) 1704 { 1705 int level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1706 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres; 1707 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1708 1709 mlog(0, "inode %llu drop %s RW lock\n", 1710 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1711 write ? "EXMODE" : "PRMODE"); 1712 1713 if (!ocfs2_mount_local(osb)) 1714 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level); 1715 } 1716 1717 /* 1718 * ocfs2_open_lock always get PR mode lock. 1719 */ 1720 int ocfs2_open_lock(struct inode *inode) 1721 { 1722 int status = 0; 1723 struct ocfs2_lock_res *lockres; 1724 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1725 1726 BUG_ON(!inode); 1727 1728 mlog(0, "inode %llu take PRMODE open lock\n", 1729 (unsigned long long)OCFS2_I(inode)->ip_blkno); 1730 1731 if (ocfs2_is_hard_readonly(osb) || ocfs2_mount_local(osb)) 1732 goto out; 1733 1734 lockres = &OCFS2_I(inode)->ip_open_lockres; 1735 1736 status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, 1737 DLM_LOCK_PR, 0, 0); 1738 if (status < 0) 1739 mlog_errno(status); 1740 1741 out: 1742 return status; 1743 } 1744 1745 int ocfs2_try_open_lock(struct inode *inode, int write) 1746 { 1747 int status = 0, level; 1748 struct ocfs2_lock_res *lockres; 1749 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1750 1751 BUG_ON(!inode); 1752 1753 mlog(0, "inode %llu try to take %s open lock\n", 1754 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1755 write ? "EXMODE" : "PRMODE"); 1756 1757 if (ocfs2_is_hard_readonly(osb)) { 1758 if (write) 1759 status = -EROFS; 1760 goto out; 1761 } 1762 1763 if (ocfs2_mount_local(osb)) 1764 goto out; 1765 1766 lockres = &OCFS2_I(inode)->ip_open_lockres; 1767 1768 level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1769 1770 /* 1771 * The file system may already holding a PRMODE/EXMODE open lock. 1772 * Since we pass DLM_LKF_NOQUEUE, the request won't block waiting on 1773 * other nodes and the -EAGAIN will indicate to the caller that 1774 * this inode is still in use. 1775 */ 1776 status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, 1777 level, DLM_LKF_NOQUEUE, 0); 1778 1779 out: 1780 return status; 1781 } 1782 1783 /* 1784 * ocfs2_open_unlock unlock PR and EX mode open locks. 1785 */ 1786 void ocfs2_open_unlock(struct inode *inode) 1787 { 1788 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_open_lockres; 1789 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1790 1791 mlog(0, "inode %llu drop open lock\n", 1792 (unsigned long long)OCFS2_I(inode)->ip_blkno); 1793 1794 if (ocfs2_mount_local(osb)) 1795 goto out; 1796 1797 if(lockres->l_ro_holders) 1798 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, 1799 DLM_LOCK_PR); 1800 if(lockres->l_ex_holders) 1801 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, 1802 DLM_LOCK_EX); 1803 1804 out: 1805 return; 1806 } 1807 1808 static int ocfs2_flock_handle_signal(struct ocfs2_lock_res *lockres, 1809 int level) 1810 { 1811 int ret; 1812 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 1813 unsigned long flags; 1814 struct ocfs2_mask_waiter mw; 1815 1816 ocfs2_init_mask_waiter(&mw); 1817 1818 retry_cancel: 1819 spin_lock_irqsave(&lockres->l_lock, flags); 1820 if (lockres->l_flags & OCFS2_LOCK_BUSY) { 1821 ret = ocfs2_prepare_cancel_convert(osb, lockres); 1822 if (ret) { 1823 spin_unlock_irqrestore(&lockres->l_lock, flags); 1824 ret = ocfs2_cancel_convert(osb, lockres); 1825 if (ret < 0) { 1826 mlog_errno(ret); 1827 goto out; 1828 } 1829 goto retry_cancel; 1830 } 1831 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1832 spin_unlock_irqrestore(&lockres->l_lock, flags); 1833 1834 ocfs2_wait_for_mask(&mw); 1835 goto retry_cancel; 1836 } 1837 1838 ret = -ERESTARTSYS; 1839 /* 1840 * We may still have gotten the lock, in which case there's no 1841 * point to restarting the syscall. 1842 */ 1843 if (lockres->l_level == level) 1844 ret = 0; 1845 1846 mlog(0, "Cancel returning %d. flags: 0x%lx, level: %d, act: %d\n", ret, 1847 lockres->l_flags, lockres->l_level, lockres->l_action); 1848 1849 spin_unlock_irqrestore(&lockres->l_lock, flags); 1850 1851 out: 1852 return ret; 1853 } 1854 1855 /* 1856 * ocfs2_file_lock() and ocfs2_file_unlock() map to a single pair of 1857 * flock() calls. The locking approach this requires is sufficiently 1858 * different from all other cluster lock types that we implement a 1859 * separate path to the "low-level" dlm calls. In particular: 1860 * 1861 * - No optimization of lock levels is done - we take at exactly 1862 * what's been requested. 1863 * 1864 * - No lock caching is employed. We immediately downconvert to 1865 * no-lock at unlock time. This also means flock locks never go on 1866 * the blocking list). 1867 * 1868 * - Since userspace can trivially deadlock itself with flock, we make 1869 * sure to allow cancellation of a misbehaving applications flock() 1870 * request. 1871 * 1872 * - Access to any flock lockres doesn't require concurrency, so we 1873 * can simplify the code by requiring the caller to guarantee 1874 * serialization of dlmglue flock calls. 1875 */ 1876 int ocfs2_file_lock(struct file *file, int ex, int trylock) 1877 { 1878 int ret, level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 1879 unsigned int lkm_flags = trylock ? DLM_LKF_NOQUEUE : 0; 1880 unsigned long flags; 1881 struct ocfs2_file_private *fp = file->private_data; 1882 struct ocfs2_lock_res *lockres = &fp->fp_flock; 1883 struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb); 1884 struct ocfs2_mask_waiter mw; 1885 1886 ocfs2_init_mask_waiter(&mw); 1887 1888 if ((lockres->l_flags & OCFS2_LOCK_BUSY) || 1889 (lockres->l_level > DLM_LOCK_NL)) { 1890 mlog(ML_ERROR, 1891 "File lock \"%s\" has busy or locked state: flags: 0x%lx, " 1892 "level: %u\n", lockres->l_name, lockres->l_flags, 1893 lockres->l_level); 1894 return -EINVAL; 1895 } 1896 1897 spin_lock_irqsave(&lockres->l_lock, flags); 1898 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 1899 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1900 spin_unlock_irqrestore(&lockres->l_lock, flags); 1901 1902 /* 1903 * Get the lock at NLMODE to start - that way we 1904 * can cancel the upconvert request if need be. 1905 */ 1906 ret = ocfs2_lock_create(osb, lockres, DLM_LOCK_NL, 0); 1907 if (ret < 0) { 1908 mlog_errno(ret); 1909 goto out; 1910 } 1911 1912 ret = ocfs2_wait_for_mask(&mw); 1913 if (ret) { 1914 mlog_errno(ret); 1915 goto out; 1916 } 1917 spin_lock_irqsave(&lockres->l_lock, flags); 1918 } 1919 1920 lockres->l_action = OCFS2_AST_CONVERT; 1921 lkm_flags |= DLM_LKF_CONVERT; 1922 lockres->l_requested = level; 1923 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 1924 1925 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1926 spin_unlock_irqrestore(&lockres->l_lock, flags); 1927 1928 ret = ocfs2_dlm_lock(osb->cconn, level, &lockres->l_lksb, lkm_flags, 1929 lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1); 1930 if (ret) { 1931 if (!trylock || (ret != -EAGAIN)) { 1932 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 1933 ret = -EINVAL; 1934 } 1935 1936 ocfs2_recover_from_dlm_error(lockres, 1); 1937 lockres_remove_mask_waiter(lockres, &mw); 1938 goto out; 1939 } 1940 1941 ret = ocfs2_wait_for_mask_interruptible(&mw, lockres); 1942 if (ret == -ERESTARTSYS) { 1943 /* 1944 * Userspace can cause deadlock itself with 1945 * flock(). Current behavior locally is to allow the 1946 * deadlock, but abort the system call if a signal is 1947 * received. We follow this example, otherwise a 1948 * poorly written program could sit in kernel until 1949 * reboot. 1950 * 1951 * Handling this is a bit more complicated for Ocfs2 1952 * though. We can't exit this function with an 1953 * outstanding lock request, so a cancel convert is 1954 * required. We intentionally overwrite 'ret' - if the 1955 * cancel fails and the lock was granted, it's easier 1956 * to just bubble success back up to the user. 1957 */ 1958 ret = ocfs2_flock_handle_signal(lockres, level); 1959 } else if (!ret && (level > lockres->l_level)) { 1960 /* Trylock failed asynchronously */ 1961 BUG_ON(!trylock); 1962 ret = -EAGAIN; 1963 } 1964 1965 out: 1966 1967 mlog(0, "Lock: \"%s\" ex: %d, trylock: %d, returns: %d\n", 1968 lockres->l_name, ex, trylock, ret); 1969 return ret; 1970 } 1971 1972 void ocfs2_file_unlock(struct file *file) 1973 { 1974 int ret; 1975 unsigned int gen; 1976 unsigned long flags; 1977 struct ocfs2_file_private *fp = file->private_data; 1978 struct ocfs2_lock_res *lockres = &fp->fp_flock; 1979 struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb); 1980 struct ocfs2_mask_waiter mw; 1981 1982 ocfs2_init_mask_waiter(&mw); 1983 1984 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) 1985 return; 1986 1987 if (lockres->l_level == DLM_LOCK_NL) 1988 return; 1989 1990 mlog(0, "Unlock: \"%s\" flags: 0x%lx, level: %d, act: %d\n", 1991 lockres->l_name, lockres->l_flags, lockres->l_level, 1992 lockres->l_action); 1993 1994 spin_lock_irqsave(&lockres->l_lock, flags); 1995 /* 1996 * Fake a blocking ast for the downconvert code. 1997 */ 1998 lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED); 1999 lockres->l_blocking = DLM_LOCK_EX; 2000 2001 gen = ocfs2_prepare_downconvert(lockres, DLM_LOCK_NL); 2002 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 2003 spin_unlock_irqrestore(&lockres->l_lock, flags); 2004 2005 ret = ocfs2_downconvert_lock(osb, lockres, DLM_LOCK_NL, 0, gen); 2006 if (ret) { 2007 mlog_errno(ret); 2008 return; 2009 } 2010 2011 ret = ocfs2_wait_for_mask(&mw); 2012 if (ret) 2013 mlog_errno(ret); 2014 } 2015 2016 static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb, 2017 struct ocfs2_lock_res *lockres) 2018 { 2019 int kick = 0; 2020 2021 /* If we know that another node is waiting on our lock, kick 2022 * the downconvert thread * pre-emptively when we reach a release 2023 * condition. */ 2024 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) { 2025 switch(lockres->l_blocking) { 2026 case DLM_LOCK_EX: 2027 if (!lockres->l_ex_holders && !lockres->l_ro_holders) 2028 kick = 1; 2029 break; 2030 case DLM_LOCK_PR: 2031 if (!lockres->l_ex_holders) 2032 kick = 1; 2033 break; 2034 default: 2035 BUG(); 2036 } 2037 } 2038 2039 if (kick) 2040 ocfs2_wake_downconvert_thread(osb); 2041 } 2042 2043 #define OCFS2_SEC_BITS 34 2044 #define OCFS2_SEC_SHIFT (64 - 34) 2045 #define OCFS2_NSEC_MASK ((1ULL << OCFS2_SEC_SHIFT) - 1) 2046 2047 /* LVB only has room for 64 bits of time here so we pack it for 2048 * now. */ 2049 static u64 ocfs2_pack_timespec(struct timespec *spec) 2050 { 2051 u64 res; 2052 u64 sec = spec->tv_sec; 2053 u32 nsec = spec->tv_nsec; 2054 2055 res = (sec << OCFS2_SEC_SHIFT) | (nsec & OCFS2_NSEC_MASK); 2056 2057 return res; 2058 } 2059 2060 /* Call this with the lockres locked. I am reasonably sure we don't 2061 * need ip_lock in this function as anyone who would be changing those 2062 * values is supposed to be blocked in ocfs2_inode_lock right now. */ 2063 static void __ocfs2_stuff_meta_lvb(struct inode *inode) 2064 { 2065 struct ocfs2_inode_info *oi = OCFS2_I(inode); 2066 struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; 2067 struct ocfs2_meta_lvb *lvb; 2068 2069 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2070 2071 /* 2072 * Invalidate the LVB of a deleted inode - this way other 2073 * nodes are forced to go to disk and discover the new inode 2074 * status. 2075 */ 2076 if (oi->ip_flags & OCFS2_INODE_DELETED) { 2077 lvb->lvb_version = 0; 2078 goto out; 2079 } 2080 2081 lvb->lvb_version = OCFS2_LVB_VERSION; 2082 lvb->lvb_isize = cpu_to_be64(i_size_read(inode)); 2083 lvb->lvb_iclusters = cpu_to_be32(oi->ip_clusters); 2084 lvb->lvb_iuid = cpu_to_be32(i_uid_read(inode)); 2085 lvb->lvb_igid = cpu_to_be32(i_gid_read(inode)); 2086 lvb->lvb_imode = cpu_to_be16(inode->i_mode); 2087 lvb->lvb_inlink = cpu_to_be16(inode->i_nlink); 2088 lvb->lvb_iatime_packed = 2089 cpu_to_be64(ocfs2_pack_timespec(&inode->i_atime)); 2090 lvb->lvb_ictime_packed = 2091 cpu_to_be64(ocfs2_pack_timespec(&inode->i_ctime)); 2092 lvb->lvb_imtime_packed = 2093 cpu_to_be64(ocfs2_pack_timespec(&inode->i_mtime)); 2094 lvb->lvb_iattr = cpu_to_be32(oi->ip_attr); 2095 lvb->lvb_idynfeatures = cpu_to_be16(oi->ip_dyn_features); 2096 lvb->lvb_igeneration = cpu_to_be32(inode->i_generation); 2097 2098 out: 2099 mlog_meta_lvb(0, lockres); 2100 } 2101 2102 static void ocfs2_unpack_timespec(struct timespec *spec, 2103 u64 packed_time) 2104 { 2105 spec->tv_sec = packed_time >> OCFS2_SEC_SHIFT; 2106 spec->tv_nsec = packed_time & OCFS2_NSEC_MASK; 2107 } 2108 2109 static void ocfs2_refresh_inode_from_lvb(struct inode *inode) 2110 { 2111 struct ocfs2_inode_info *oi = OCFS2_I(inode); 2112 struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; 2113 struct ocfs2_meta_lvb *lvb; 2114 2115 mlog_meta_lvb(0, lockres); 2116 2117 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2118 2119 /* We're safe here without the lockres lock... */ 2120 spin_lock(&oi->ip_lock); 2121 oi->ip_clusters = be32_to_cpu(lvb->lvb_iclusters); 2122 i_size_write(inode, be64_to_cpu(lvb->lvb_isize)); 2123 2124 oi->ip_attr = be32_to_cpu(lvb->lvb_iattr); 2125 oi->ip_dyn_features = be16_to_cpu(lvb->lvb_idynfeatures); 2126 ocfs2_set_inode_flags(inode); 2127 2128 /* fast-symlinks are a special case */ 2129 if (S_ISLNK(inode->i_mode) && !oi->ip_clusters) 2130 inode->i_blocks = 0; 2131 else 2132 inode->i_blocks = ocfs2_inode_sector_count(inode); 2133 2134 i_uid_write(inode, be32_to_cpu(lvb->lvb_iuid)); 2135 i_gid_write(inode, be32_to_cpu(lvb->lvb_igid)); 2136 inode->i_mode = be16_to_cpu(lvb->lvb_imode); 2137 set_nlink(inode, be16_to_cpu(lvb->lvb_inlink)); 2138 ocfs2_unpack_timespec(&inode->i_atime, 2139 be64_to_cpu(lvb->lvb_iatime_packed)); 2140 ocfs2_unpack_timespec(&inode->i_mtime, 2141 be64_to_cpu(lvb->lvb_imtime_packed)); 2142 ocfs2_unpack_timespec(&inode->i_ctime, 2143 be64_to_cpu(lvb->lvb_ictime_packed)); 2144 spin_unlock(&oi->ip_lock); 2145 } 2146 2147 static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode, 2148 struct ocfs2_lock_res *lockres) 2149 { 2150 struct ocfs2_meta_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2151 2152 if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) 2153 && lvb->lvb_version == OCFS2_LVB_VERSION 2154 && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation) 2155 return 1; 2156 return 0; 2157 } 2158 2159 /* Determine whether a lock resource needs to be refreshed, and 2160 * arbitrate who gets to refresh it. 2161 * 2162 * 0 means no refresh needed. 2163 * 2164 * > 0 means you need to refresh this and you MUST call 2165 * ocfs2_complete_lock_res_refresh afterwards. */ 2166 static int ocfs2_should_refresh_lock_res(struct ocfs2_lock_res *lockres) 2167 { 2168 unsigned long flags; 2169 int status = 0; 2170 2171 refresh_check: 2172 spin_lock_irqsave(&lockres->l_lock, flags); 2173 if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) { 2174 spin_unlock_irqrestore(&lockres->l_lock, flags); 2175 goto bail; 2176 } 2177 2178 if (lockres->l_flags & OCFS2_LOCK_REFRESHING) { 2179 spin_unlock_irqrestore(&lockres->l_lock, flags); 2180 2181 ocfs2_wait_on_refreshing_lock(lockres); 2182 goto refresh_check; 2183 } 2184 2185 /* Ok, I'll be the one to refresh this lock. */ 2186 lockres_or_flags(lockres, OCFS2_LOCK_REFRESHING); 2187 spin_unlock_irqrestore(&lockres->l_lock, flags); 2188 2189 status = 1; 2190 bail: 2191 mlog(0, "status %d\n", status); 2192 return status; 2193 } 2194 2195 /* If status is non zero, I'll mark it as not being in refresh 2196 * anymroe, but i won't clear the needs refresh flag. */ 2197 static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockres, 2198 int status) 2199 { 2200 unsigned long flags; 2201 2202 spin_lock_irqsave(&lockres->l_lock, flags); 2203 lockres_clear_flags(lockres, OCFS2_LOCK_REFRESHING); 2204 if (!status) 2205 lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 2206 spin_unlock_irqrestore(&lockres->l_lock, flags); 2207 2208 wake_up(&lockres->l_event); 2209 } 2210 2211 /* may or may not return a bh if it went to disk. */ 2212 static int ocfs2_inode_lock_update(struct inode *inode, 2213 struct buffer_head **bh) 2214 { 2215 int status = 0; 2216 struct ocfs2_inode_info *oi = OCFS2_I(inode); 2217 struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; 2218 struct ocfs2_dinode *fe; 2219 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 2220 2221 if (ocfs2_mount_local(osb)) 2222 goto bail; 2223 2224 spin_lock(&oi->ip_lock); 2225 if (oi->ip_flags & OCFS2_INODE_DELETED) { 2226 mlog(0, "Orphaned inode %llu was deleted while we " 2227 "were waiting on a lock. ip_flags = 0x%x\n", 2228 (unsigned long long)oi->ip_blkno, oi->ip_flags); 2229 spin_unlock(&oi->ip_lock); 2230 status = -ENOENT; 2231 goto bail; 2232 } 2233 spin_unlock(&oi->ip_lock); 2234 2235 if (!ocfs2_should_refresh_lock_res(lockres)) 2236 goto bail; 2237 2238 /* This will discard any caching information we might have had 2239 * for the inode metadata. */ 2240 ocfs2_metadata_cache_purge(INODE_CACHE(inode)); 2241 2242 ocfs2_extent_map_trunc(inode, 0); 2243 2244 if (ocfs2_meta_lvb_is_trustable(inode, lockres)) { 2245 mlog(0, "Trusting LVB on inode %llu\n", 2246 (unsigned long long)oi->ip_blkno); 2247 ocfs2_refresh_inode_from_lvb(inode); 2248 } else { 2249 /* Boo, we have to go to disk. */ 2250 /* read bh, cast, ocfs2_refresh_inode */ 2251 status = ocfs2_read_inode_block(inode, bh); 2252 if (status < 0) { 2253 mlog_errno(status); 2254 goto bail_refresh; 2255 } 2256 fe = (struct ocfs2_dinode *) (*bh)->b_data; 2257 2258 /* This is a good chance to make sure we're not 2259 * locking an invalid object. ocfs2_read_inode_block() 2260 * already checked that the inode block is sane. 2261 * 2262 * We bug on a stale inode here because we checked 2263 * above whether it was wiped from disk. The wiping 2264 * node provides a guarantee that we receive that 2265 * message and can mark the inode before dropping any 2266 * locks associated with it. */ 2267 mlog_bug_on_msg(inode->i_generation != 2268 le32_to_cpu(fe->i_generation), 2269 "Invalid dinode %llu disk generation: %u " 2270 "inode->i_generation: %u\n", 2271 (unsigned long long)oi->ip_blkno, 2272 le32_to_cpu(fe->i_generation), 2273 inode->i_generation); 2274 mlog_bug_on_msg(le64_to_cpu(fe->i_dtime) || 2275 !(fe->i_flags & cpu_to_le32(OCFS2_VALID_FL)), 2276 "Stale dinode %llu dtime: %llu flags: 0x%x\n", 2277 (unsigned long long)oi->ip_blkno, 2278 (unsigned long long)le64_to_cpu(fe->i_dtime), 2279 le32_to_cpu(fe->i_flags)); 2280 2281 ocfs2_refresh_inode(inode, fe); 2282 ocfs2_track_lock_refresh(lockres); 2283 } 2284 2285 status = 0; 2286 bail_refresh: 2287 ocfs2_complete_lock_res_refresh(lockres, status); 2288 bail: 2289 return status; 2290 } 2291 2292 static int ocfs2_assign_bh(struct inode *inode, 2293 struct buffer_head **ret_bh, 2294 struct buffer_head *passed_bh) 2295 { 2296 int status; 2297 2298 if (passed_bh) { 2299 /* Ok, the update went to disk for us, use the 2300 * returned bh. */ 2301 *ret_bh = passed_bh; 2302 get_bh(*ret_bh); 2303 2304 return 0; 2305 } 2306 2307 status = ocfs2_read_inode_block(inode, ret_bh); 2308 if (status < 0) 2309 mlog_errno(status); 2310 2311 return status; 2312 } 2313 2314 /* 2315 * returns < 0 error if the callback will never be called, otherwise 2316 * the result of the lock will be communicated via the callback. 2317 */ 2318 int ocfs2_inode_lock_full_nested(struct inode *inode, 2319 struct buffer_head **ret_bh, 2320 int ex, 2321 int arg_flags, 2322 int subclass) 2323 { 2324 int status, level, acquired; 2325 u32 dlm_flags; 2326 struct ocfs2_lock_res *lockres = NULL; 2327 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 2328 struct buffer_head *local_bh = NULL; 2329 2330 BUG_ON(!inode); 2331 2332 mlog(0, "inode %llu, take %s META lock\n", 2333 (unsigned long long)OCFS2_I(inode)->ip_blkno, 2334 ex ? "EXMODE" : "PRMODE"); 2335 2336 status = 0; 2337 acquired = 0; 2338 /* We'll allow faking a readonly metadata lock for 2339 * rodevices. */ 2340 if (ocfs2_is_hard_readonly(osb)) { 2341 if (ex) 2342 status = -EROFS; 2343 goto getbh; 2344 } 2345 2346 if (ocfs2_mount_local(osb)) 2347 goto local; 2348 2349 if (!(arg_flags & OCFS2_META_LOCK_RECOVERY)) 2350 ocfs2_wait_for_recovery(osb); 2351 2352 lockres = &OCFS2_I(inode)->ip_inode_lockres; 2353 level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2354 dlm_flags = 0; 2355 if (arg_flags & OCFS2_META_LOCK_NOQUEUE) 2356 dlm_flags |= DLM_LKF_NOQUEUE; 2357 2358 status = __ocfs2_cluster_lock(osb, lockres, level, dlm_flags, 2359 arg_flags, subclass, _RET_IP_); 2360 if (status < 0) { 2361 if (status != -EAGAIN) 2362 mlog_errno(status); 2363 goto bail; 2364 } 2365 2366 /* Notify the error cleanup path to drop the cluster lock. */ 2367 acquired = 1; 2368 2369 /* We wait twice because a node may have died while we were in 2370 * the lower dlm layers. The second time though, we've 2371 * committed to owning this lock so we don't allow signals to 2372 * abort the operation. */ 2373 if (!(arg_flags & OCFS2_META_LOCK_RECOVERY)) 2374 ocfs2_wait_for_recovery(osb); 2375 2376 local: 2377 /* 2378 * We only see this flag if we're being called from 2379 * ocfs2_read_locked_inode(). It means we're locking an inode 2380 * which hasn't been populated yet, so clear the refresh flag 2381 * and let the caller handle it. 2382 */ 2383 if (inode->i_state & I_NEW) { 2384 status = 0; 2385 if (lockres) 2386 ocfs2_complete_lock_res_refresh(lockres, 0); 2387 goto bail; 2388 } 2389 2390 /* This is fun. The caller may want a bh back, or it may 2391 * not. ocfs2_inode_lock_update definitely wants one in, but 2392 * may or may not read one, depending on what's in the 2393 * LVB. The result of all of this is that we've *only* gone to 2394 * disk if we have to, so the complexity is worthwhile. */ 2395 status = ocfs2_inode_lock_update(inode, &local_bh); 2396 if (status < 0) { 2397 if (status != -ENOENT) 2398 mlog_errno(status); 2399 goto bail; 2400 } 2401 getbh: 2402 if (ret_bh) { 2403 status = ocfs2_assign_bh(inode, ret_bh, local_bh); 2404 if (status < 0) { 2405 mlog_errno(status); 2406 goto bail; 2407 } 2408 } 2409 2410 bail: 2411 if (status < 0) { 2412 if (ret_bh && (*ret_bh)) { 2413 brelse(*ret_bh); 2414 *ret_bh = NULL; 2415 } 2416 if (acquired) 2417 ocfs2_inode_unlock(inode, ex); 2418 } 2419 2420 if (local_bh) 2421 brelse(local_bh); 2422 2423 return status; 2424 } 2425 2426 /* 2427 * This is working around a lock inversion between tasks acquiring DLM 2428 * locks while holding a page lock and the downconvert thread which 2429 * blocks dlm lock acquiry while acquiring page locks. 2430 * 2431 * ** These _with_page variantes are only intended to be called from aop 2432 * methods that hold page locks and return a very specific *positive* error 2433 * code that aop methods pass up to the VFS -- test for errors with != 0. ** 2434 * 2435 * The DLM is called such that it returns -EAGAIN if it would have 2436 * blocked waiting for the downconvert thread. In that case we unlock 2437 * our page so the downconvert thread can make progress. Once we've 2438 * done this we have to return AOP_TRUNCATED_PAGE so the aop method 2439 * that called us can bubble that back up into the VFS who will then 2440 * immediately retry the aop call. 2441 */ 2442 int ocfs2_inode_lock_with_page(struct inode *inode, 2443 struct buffer_head **ret_bh, 2444 int ex, 2445 struct page *page) 2446 { 2447 int ret; 2448 2449 ret = ocfs2_inode_lock_full(inode, ret_bh, ex, OCFS2_LOCK_NONBLOCK); 2450 if (ret == -EAGAIN) { 2451 unlock_page(page); 2452 ret = AOP_TRUNCATED_PAGE; 2453 } 2454 2455 return ret; 2456 } 2457 2458 int ocfs2_inode_lock_atime(struct inode *inode, 2459 struct vfsmount *vfsmnt, 2460 int *level) 2461 { 2462 int ret; 2463 2464 ret = ocfs2_inode_lock(inode, NULL, 0); 2465 if (ret < 0) { 2466 mlog_errno(ret); 2467 return ret; 2468 } 2469 2470 /* 2471 * If we should update atime, we will get EX lock, 2472 * otherwise we just get PR lock. 2473 */ 2474 if (ocfs2_should_update_atime(inode, vfsmnt)) { 2475 struct buffer_head *bh = NULL; 2476 2477 ocfs2_inode_unlock(inode, 0); 2478 ret = ocfs2_inode_lock(inode, &bh, 1); 2479 if (ret < 0) { 2480 mlog_errno(ret); 2481 return ret; 2482 } 2483 *level = 1; 2484 if (ocfs2_should_update_atime(inode, vfsmnt)) 2485 ocfs2_update_inode_atime(inode, bh); 2486 if (bh) 2487 brelse(bh); 2488 } else 2489 *level = 0; 2490 2491 return ret; 2492 } 2493 2494 void ocfs2_inode_unlock(struct inode *inode, 2495 int ex) 2496 { 2497 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2498 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_inode_lockres; 2499 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 2500 2501 mlog(0, "inode %llu drop %s META lock\n", 2502 (unsigned long long)OCFS2_I(inode)->ip_blkno, 2503 ex ? "EXMODE" : "PRMODE"); 2504 2505 if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)) && 2506 !ocfs2_mount_local(osb)) 2507 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level); 2508 } 2509 2510 int ocfs2_orphan_scan_lock(struct ocfs2_super *osb, u32 *seqno) 2511 { 2512 struct ocfs2_lock_res *lockres; 2513 struct ocfs2_orphan_scan_lvb *lvb; 2514 int status = 0; 2515 2516 if (ocfs2_is_hard_readonly(osb)) 2517 return -EROFS; 2518 2519 if (ocfs2_mount_local(osb)) 2520 return 0; 2521 2522 lockres = &osb->osb_orphan_scan.os_lockres; 2523 status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0); 2524 if (status < 0) 2525 return status; 2526 2527 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2528 if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) && 2529 lvb->lvb_version == OCFS2_ORPHAN_LVB_VERSION) 2530 *seqno = be32_to_cpu(lvb->lvb_os_seqno); 2531 else 2532 *seqno = osb->osb_orphan_scan.os_seqno + 1; 2533 2534 return status; 2535 } 2536 2537 void ocfs2_orphan_scan_unlock(struct ocfs2_super *osb, u32 seqno) 2538 { 2539 struct ocfs2_lock_res *lockres; 2540 struct ocfs2_orphan_scan_lvb *lvb; 2541 2542 if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb)) { 2543 lockres = &osb->osb_orphan_scan.os_lockres; 2544 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2545 lvb->lvb_version = OCFS2_ORPHAN_LVB_VERSION; 2546 lvb->lvb_os_seqno = cpu_to_be32(seqno); 2547 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX); 2548 } 2549 } 2550 2551 int ocfs2_super_lock(struct ocfs2_super *osb, 2552 int ex) 2553 { 2554 int status = 0; 2555 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2556 struct ocfs2_lock_res *lockres = &osb->osb_super_lockres; 2557 2558 if (ocfs2_is_hard_readonly(osb)) 2559 return -EROFS; 2560 2561 if (ocfs2_mount_local(osb)) 2562 goto bail; 2563 2564 status = ocfs2_cluster_lock(osb, lockres, level, 0, 0); 2565 if (status < 0) { 2566 mlog_errno(status); 2567 goto bail; 2568 } 2569 2570 /* The super block lock path is really in the best position to 2571 * know when resources covered by the lock need to be 2572 * refreshed, so we do it here. Of course, making sense of 2573 * everything is up to the caller :) */ 2574 status = ocfs2_should_refresh_lock_res(lockres); 2575 if (status) { 2576 status = ocfs2_refresh_slot_info(osb); 2577 2578 ocfs2_complete_lock_res_refresh(lockres, status); 2579 2580 if (status < 0) { 2581 ocfs2_cluster_unlock(osb, lockres, level); 2582 mlog_errno(status); 2583 } 2584 ocfs2_track_lock_refresh(lockres); 2585 } 2586 bail: 2587 return status; 2588 } 2589 2590 void ocfs2_super_unlock(struct ocfs2_super *osb, 2591 int ex) 2592 { 2593 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2594 struct ocfs2_lock_res *lockres = &osb->osb_super_lockres; 2595 2596 if (!ocfs2_mount_local(osb)) 2597 ocfs2_cluster_unlock(osb, lockres, level); 2598 } 2599 2600 int ocfs2_rename_lock(struct ocfs2_super *osb) 2601 { 2602 int status; 2603 struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres; 2604 2605 if (ocfs2_is_hard_readonly(osb)) 2606 return -EROFS; 2607 2608 if (ocfs2_mount_local(osb)) 2609 return 0; 2610 2611 status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0); 2612 if (status < 0) 2613 mlog_errno(status); 2614 2615 return status; 2616 } 2617 2618 void ocfs2_rename_unlock(struct ocfs2_super *osb) 2619 { 2620 struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres; 2621 2622 if (!ocfs2_mount_local(osb)) 2623 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX); 2624 } 2625 2626 int ocfs2_nfs_sync_lock(struct ocfs2_super *osb, int ex) 2627 { 2628 int status; 2629 struct ocfs2_lock_res *lockres = &osb->osb_nfs_sync_lockres; 2630 2631 if (ocfs2_is_hard_readonly(osb)) 2632 return -EROFS; 2633 2634 if (ocfs2_mount_local(osb)) 2635 return 0; 2636 2637 status = ocfs2_cluster_lock(osb, lockres, ex ? LKM_EXMODE : LKM_PRMODE, 2638 0, 0); 2639 if (status < 0) 2640 mlog(ML_ERROR, "lock on nfs sync lock failed %d\n", status); 2641 2642 return status; 2643 } 2644 2645 void ocfs2_nfs_sync_unlock(struct ocfs2_super *osb, int ex) 2646 { 2647 struct ocfs2_lock_res *lockres = &osb->osb_nfs_sync_lockres; 2648 2649 if (!ocfs2_mount_local(osb)) 2650 ocfs2_cluster_unlock(osb, lockres, 2651 ex ? LKM_EXMODE : LKM_PRMODE); 2652 } 2653 2654 int ocfs2_dentry_lock(struct dentry *dentry, int ex) 2655 { 2656 int ret; 2657 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2658 struct ocfs2_dentry_lock *dl = dentry->d_fsdata; 2659 struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb); 2660 2661 BUG_ON(!dl); 2662 2663 if (ocfs2_is_hard_readonly(osb)) { 2664 if (ex) 2665 return -EROFS; 2666 return 0; 2667 } 2668 2669 if (ocfs2_mount_local(osb)) 2670 return 0; 2671 2672 ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0); 2673 if (ret < 0) 2674 mlog_errno(ret); 2675 2676 return ret; 2677 } 2678 2679 void ocfs2_dentry_unlock(struct dentry *dentry, int ex) 2680 { 2681 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2682 struct ocfs2_dentry_lock *dl = dentry->d_fsdata; 2683 struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb); 2684 2685 if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb)) 2686 ocfs2_cluster_unlock(osb, &dl->dl_lockres, level); 2687 } 2688 2689 /* Reference counting of the dlm debug structure. We want this because 2690 * open references on the debug inodes can live on after a mount, so 2691 * we can't rely on the ocfs2_super to always exist. */ 2692 static void ocfs2_dlm_debug_free(struct kref *kref) 2693 { 2694 struct ocfs2_dlm_debug *dlm_debug; 2695 2696 dlm_debug = container_of(kref, struct ocfs2_dlm_debug, d_refcnt); 2697 2698 kfree(dlm_debug); 2699 } 2700 2701 void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug) 2702 { 2703 if (dlm_debug) 2704 kref_put(&dlm_debug->d_refcnt, ocfs2_dlm_debug_free); 2705 } 2706 2707 static void ocfs2_get_dlm_debug(struct ocfs2_dlm_debug *debug) 2708 { 2709 kref_get(&debug->d_refcnt); 2710 } 2711 2712 struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void) 2713 { 2714 struct ocfs2_dlm_debug *dlm_debug; 2715 2716 dlm_debug = kmalloc(sizeof(struct ocfs2_dlm_debug), GFP_KERNEL); 2717 if (!dlm_debug) { 2718 mlog_errno(-ENOMEM); 2719 goto out; 2720 } 2721 2722 kref_init(&dlm_debug->d_refcnt); 2723 INIT_LIST_HEAD(&dlm_debug->d_lockres_tracking); 2724 dlm_debug->d_locking_state = NULL; 2725 out: 2726 return dlm_debug; 2727 } 2728 2729 /* Access to this is arbitrated for us via seq_file->sem. */ 2730 struct ocfs2_dlm_seq_priv { 2731 struct ocfs2_dlm_debug *p_dlm_debug; 2732 struct ocfs2_lock_res p_iter_res; 2733 struct ocfs2_lock_res p_tmp_res; 2734 }; 2735 2736 static struct ocfs2_lock_res *ocfs2_dlm_next_res(struct ocfs2_lock_res *start, 2737 struct ocfs2_dlm_seq_priv *priv) 2738 { 2739 struct ocfs2_lock_res *iter, *ret = NULL; 2740 struct ocfs2_dlm_debug *dlm_debug = priv->p_dlm_debug; 2741 2742 assert_spin_locked(&ocfs2_dlm_tracking_lock); 2743 2744 list_for_each_entry(iter, &start->l_debug_list, l_debug_list) { 2745 /* discover the head of the list */ 2746 if (&iter->l_debug_list == &dlm_debug->d_lockres_tracking) { 2747 mlog(0, "End of list found, %p\n", ret); 2748 break; 2749 } 2750 2751 /* We track our "dummy" iteration lockres' by a NULL 2752 * l_ops field. */ 2753 if (iter->l_ops != NULL) { 2754 ret = iter; 2755 break; 2756 } 2757 } 2758 2759 return ret; 2760 } 2761 2762 static void *ocfs2_dlm_seq_start(struct seq_file *m, loff_t *pos) 2763 { 2764 struct ocfs2_dlm_seq_priv *priv = m->private; 2765 struct ocfs2_lock_res *iter; 2766 2767 spin_lock(&ocfs2_dlm_tracking_lock); 2768 iter = ocfs2_dlm_next_res(&priv->p_iter_res, priv); 2769 if (iter) { 2770 /* Since lockres' have the lifetime of their container 2771 * (which can be inodes, ocfs2_supers, etc) we want to 2772 * copy this out to a temporary lockres while still 2773 * under the spinlock. Obviously after this we can't 2774 * trust any pointers on the copy returned, but that's 2775 * ok as the information we want isn't typically held 2776 * in them. */ 2777 priv->p_tmp_res = *iter; 2778 iter = &priv->p_tmp_res; 2779 } 2780 spin_unlock(&ocfs2_dlm_tracking_lock); 2781 2782 return iter; 2783 } 2784 2785 static void ocfs2_dlm_seq_stop(struct seq_file *m, void *v) 2786 { 2787 } 2788 2789 static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos) 2790 { 2791 struct ocfs2_dlm_seq_priv *priv = m->private; 2792 struct ocfs2_lock_res *iter = v; 2793 struct ocfs2_lock_res *dummy = &priv->p_iter_res; 2794 2795 spin_lock(&ocfs2_dlm_tracking_lock); 2796 iter = ocfs2_dlm_next_res(iter, priv); 2797 list_del_init(&dummy->l_debug_list); 2798 if (iter) { 2799 list_add(&dummy->l_debug_list, &iter->l_debug_list); 2800 priv->p_tmp_res = *iter; 2801 iter = &priv->p_tmp_res; 2802 } 2803 spin_unlock(&ocfs2_dlm_tracking_lock); 2804 2805 return iter; 2806 } 2807 2808 /* 2809 * Version is used by debugfs.ocfs2 to determine the format being used 2810 * 2811 * New in version 2 2812 * - Lock stats printed 2813 * New in version 3 2814 * - Max time in lock stats is in usecs (instead of nsecs) 2815 */ 2816 #define OCFS2_DLM_DEBUG_STR_VERSION 3 2817 static int ocfs2_dlm_seq_show(struct seq_file *m, void *v) 2818 { 2819 int i; 2820 char *lvb; 2821 struct ocfs2_lock_res *lockres = v; 2822 2823 if (!lockres) 2824 return -EINVAL; 2825 2826 seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION); 2827 2828 if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY) 2829 seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1, 2830 lockres->l_name, 2831 (unsigned int)ocfs2_get_dentry_lock_ino(lockres)); 2832 else 2833 seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name); 2834 2835 seq_printf(m, "%d\t" 2836 "0x%lx\t" 2837 "0x%x\t" 2838 "0x%x\t" 2839 "%u\t" 2840 "%u\t" 2841 "%d\t" 2842 "%d\t", 2843 lockres->l_level, 2844 lockres->l_flags, 2845 lockres->l_action, 2846 lockres->l_unlock_action, 2847 lockres->l_ro_holders, 2848 lockres->l_ex_holders, 2849 lockres->l_requested, 2850 lockres->l_blocking); 2851 2852 /* Dump the raw LVB */ 2853 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2854 for(i = 0; i < DLM_LVB_LEN; i++) 2855 seq_printf(m, "0x%x\t", lvb[i]); 2856 2857 #ifdef CONFIG_OCFS2_FS_STATS 2858 # define lock_num_prmode(_l) ((_l)->l_lock_prmode.ls_gets) 2859 # define lock_num_exmode(_l) ((_l)->l_lock_exmode.ls_gets) 2860 # define lock_num_prmode_failed(_l) ((_l)->l_lock_prmode.ls_fail) 2861 # define lock_num_exmode_failed(_l) ((_l)->l_lock_exmode.ls_fail) 2862 # define lock_total_prmode(_l) ((_l)->l_lock_prmode.ls_total) 2863 # define lock_total_exmode(_l) ((_l)->l_lock_exmode.ls_total) 2864 # define lock_max_prmode(_l) ((_l)->l_lock_prmode.ls_max) 2865 # define lock_max_exmode(_l) ((_l)->l_lock_exmode.ls_max) 2866 # define lock_refresh(_l) ((_l)->l_lock_refresh) 2867 #else 2868 # define lock_num_prmode(_l) (0) 2869 # define lock_num_exmode(_l) (0) 2870 # define lock_num_prmode_failed(_l) (0) 2871 # define lock_num_exmode_failed(_l) (0) 2872 # define lock_total_prmode(_l) (0ULL) 2873 # define lock_total_exmode(_l) (0ULL) 2874 # define lock_max_prmode(_l) (0) 2875 # define lock_max_exmode(_l) (0) 2876 # define lock_refresh(_l) (0) 2877 #endif 2878 /* The following seq_print was added in version 2 of this output */ 2879 seq_printf(m, "%u\t" 2880 "%u\t" 2881 "%u\t" 2882 "%u\t" 2883 "%llu\t" 2884 "%llu\t" 2885 "%u\t" 2886 "%u\t" 2887 "%u\t", 2888 lock_num_prmode(lockres), 2889 lock_num_exmode(lockres), 2890 lock_num_prmode_failed(lockres), 2891 lock_num_exmode_failed(lockres), 2892 lock_total_prmode(lockres), 2893 lock_total_exmode(lockres), 2894 lock_max_prmode(lockres), 2895 lock_max_exmode(lockres), 2896 lock_refresh(lockres)); 2897 2898 /* End the line */ 2899 seq_printf(m, "\n"); 2900 return 0; 2901 } 2902 2903 static const struct seq_operations ocfs2_dlm_seq_ops = { 2904 .start = ocfs2_dlm_seq_start, 2905 .stop = ocfs2_dlm_seq_stop, 2906 .next = ocfs2_dlm_seq_next, 2907 .show = ocfs2_dlm_seq_show, 2908 }; 2909 2910 static int ocfs2_dlm_debug_release(struct inode *inode, struct file *file) 2911 { 2912 struct seq_file *seq = file->private_data; 2913 struct ocfs2_dlm_seq_priv *priv = seq->private; 2914 struct ocfs2_lock_res *res = &priv->p_iter_res; 2915 2916 ocfs2_remove_lockres_tracking(res); 2917 ocfs2_put_dlm_debug(priv->p_dlm_debug); 2918 return seq_release_private(inode, file); 2919 } 2920 2921 static int ocfs2_dlm_debug_open(struct inode *inode, struct file *file) 2922 { 2923 struct ocfs2_dlm_seq_priv *priv; 2924 struct ocfs2_super *osb; 2925 2926 priv = __seq_open_private(file, &ocfs2_dlm_seq_ops, sizeof(*priv)); 2927 if (!priv) { 2928 mlog_errno(-ENOMEM); 2929 return -ENOMEM; 2930 } 2931 2932 osb = inode->i_private; 2933 ocfs2_get_dlm_debug(osb->osb_dlm_debug); 2934 priv->p_dlm_debug = osb->osb_dlm_debug; 2935 INIT_LIST_HEAD(&priv->p_iter_res.l_debug_list); 2936 2937 ocfs2_add_lockres_tracking(&priv->p_iter_res, 2938 priv->p_dlm_debug); 2939 2940 return 0; 2941 } 2942 2943 static const struct file_operations ocfs2_dlm_debug_fops = { 2944 .open = ocfs2_dlm_debug_open, 2945 .release = ocfs2_dlm_debug_release, 2946 .read = seq_read, 2947 .llseek = seq_lseek, 2948 }; 2949 2950 static int ocfs2_dlm_init_debug(struct ocfs2_super *osb) 2951 { 2952 int ret = 0; 2953 struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug; 2954 2955 dlm_debug->d_locking_state = debugfs_create_file("locking_state", 2956 S_IFREG|S_IRUSR, 2957 osb->osb_debug_root, 2958 osb, 2959 &ocfs2_dlm_debug_fops); 2960 if (!dlm_debug->d_locking_state) { 2961 ret = -EINVAL; 2962 mlog(ML_ERROR, 2963 "Unable to create locking state debugfs file.\n"); 2964 goto out; 2965 } 2966 2967 ocfs2_get_dlm_debug(dlm_debug); 2968 out: 2969 return ret; 2970 } 2971 2972 static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb) 2973 { 2974 struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug; 2975 2976 if (dlm_debug) { 2977 debugfs_remove(dlm_debug->d_locking_state); 2978 ocfs2_put_dlm_debug(dlm_debug); 2979 } 2980 } 2981 2982 int ocfs2_dlm_init(struct ocfs2_super *osb) 2983 { 2984 int status = 0; 2985 struct ocfs2_cluster_connection *conn = NULL; 2986 2987 if (ocfs2_mount_local(osb)) { 2988 osb->node_num = 0; 2989 goto local; 2990 } 2991 2992 status = ocfs2_dlm_init_debug(osb); 2993 if (status < 0) { 2994 mlog_errno(status); 2995 goto bail; 2996 } 2997 2998 /* launch downconvert thread */ 2999 osb->dc_task = kthread_run(ocfs2_downconvert_thread, osb, "ocfs2dc-%s", 3000 osb->uuid_str); 3001 if (IS_ERR(osb->dc_task)) { 3002 status = PTR_ERR(osb->dc_task); 3003 osb->dc_task = NULL; 3004 mlog_errno(status); 3005 goto bail; 3006 } 3007 3008 /* for now, uuid == domain */ 3009 status = ocfs2_cluster_connect(osb->osb_cluster_stack, 3010 osb->osb_cluster_name, 3011 strlen(osb->osb_cluster_name), 3012 osb->uuid_str, 3013 strlen(osb->uuid_str), 3014 &lproto, ocfs2_do_node_down, osb, 3015 &conn); 3016 if (status) { 3017 mlog_errno(status); 3018 goto bail; 3019 } 3020 3021 status = ocfs2_cluster_this_node(conn, &osb->node_num); 3022 if (status < 0) { 3023 mlog_errno(status); 3024 mlog(ML_ERROR, 3025 "could not find this host's node number\n"); 3026 ocfs2_cluster_disconnect(conn, 0); 3027 goto bail; 3028 } 3029 3030 local: 3031 ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb); 3032 ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb); 3033 ocfs2_nfs_sync_lock_res_init(&osb->osb_nfs_sync_lockres, osb); 3034 ocfs2_orphan_scan_lock_res_init(&osb->osb_orphan_scan.os_lockres, osb); 3035 3036 osb->cconn = conn; 3037 bail: 3038 if (status < 0) { 3039 ocfs2_dlm_shutdown_debug(osb); 3040 if (osb->dc_task) 3041 kthread_stop(osb->dc_task); 3042 } 3043 3044 return status; 3045 } 3046 3047 void ocfs2_dlm_shutdown(struct ocfs2_super *osb, 3048 int hangup_pending) 3049 { 3050 ocfs2_drop_osb_locks(osb); 3051 3052 /* 3053 * Now that we have dropped all locks and ocfs2_dismount_volume() 3054 * has disabled recovery, the DLM won't be talking to us. It's 3055 * safe to tear things down before disconnecting the cluster. 3056 */ 3057 3058 if (osb->dc_task) { 3059 kthread_stop(osb->dc_task); 3060 osb->dc_task = NULL; 3061 } 3062 3063 ocfs2_lock_res_free(&osb->osb_super_lockres); 3064 ocfs2_lock_res_free(&osb->osb_rename_lockres); 3065 ocfs2_lock_res_free(&osb->osb_nfs_sync_lockres); 3066 ocfs2_lock_res_free(&osb->osb_orphan_scan.os_lockres); 3067 3068 ocfs2_cluster_disconnect(osb->cconn, hangup_pending); 3069 osb->cconn = NULL; 3070 3071 ocfs2_dlm_shutdown_debug(osb); 3072 } 3073 3074 static int ocfs2_drop_lock(struct ocfs2_super *osb, 3075 struct ocfs2_lock_res *lockres) 3076 { 3077 int ret; 3078 unsigned long flags; 3079 u32 lkm_flags = 0; 3080 3081 /* We didn't get anywhere near actually using this lockres. */ 3082 if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED)) 3083 goto out; 3084 3085 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) 3086 lkm_flags |= DLM_LKF_VALBLK; 3087 3088 spin_lock_irqsave(&lockres->l_lock, flags); 3089 3090 mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_FREEING), 3091 "lockres %s, flags 0x%lx\n", 3092 lockres->l_name, lockres->l_flags); 3093 3094 while (lockres->l_flags & OCFS2_LOCK_BUSY) { 3095 mlog(0, "waiting on busy lock \"%s\": flags = %lx, action = " 3096 "%u, unlock_action = %u\n", 3097 lockres->l_name, lockres->l_flags, lockres->l_action, 3098 lockres->l_unlock_action); 3099 3100 spin_unlock_irqrestore(&lockres->l_lock, flags); 3101 3102 /* XXX: Today we just wait on any busy 3103 * locks... Perhaps we need to cancel converts in the 3104 * future? */ 3105 ocfs2_wait_on_busy_lock(lockres); 3106 3107 spin_lock_irqsave(&lockres->l_lock, flags); 3108 } 3109 3110 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) { 3111 if (lockres->l_flags & OCFS2_LOCK_ATTACHED && 3112 lockres->l_level == DLM_LOCK_EX && 3113 !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) 3114 lockres->l_ops->set_lvb(lockres); 3115 } 3116 3117 if (lockres->l_flags & OCFS2_LOCK_BUSY) 3118 mlog(ML_ERROR, "destroying busy lock: \"%s\"\n", 3119 lockres->l_name); 3120 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) 3121 mlog(0, "destroying blocked lock: \"%s\"\n", lockres->l_name); 3122 3123 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 3124 spin_unlock_irqrestore(&lockres->l_lock, flags); 3125 goto out; 3126 } 3127 3128 lockres_clear_flags(lockres, OCFS2_LOCK_ATTACHED); 3129 3130 /* make sure we never get here while waiting for an ast to 3131 * fire. */ 3132 BUG_ON(lockres->l_action != OCFS2_AST_INVALID); 3133 3134 /* is this necessary? */ 3135 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 3136 lockres->l_unlock_action = OCFS2_UNLOCK_DROP_LOCK; 3137 spin_unlock_irqrestore(&lockres->l_lock, flags); 3138 3139 mlog(0, "lock %s\n", lockres->l_name); 3140 3141 ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, lkm_flags); 3142 if (ret) { 3143 ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres); 3144 mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags); 3145 ocfs2_dlm_dump_lksb(&lockres->l_lksb); 3146 BUG(); 3147 } 3148 mlog(0, "lock %s, successful return from ocfs2_dlm_unlock\n", 3149 lockres->l_name); 3150 3151 ocfs2_wait_on_busy_lock(lockres); 3152 out: 3153 return 0; 3154 } 3155 3156 static void ocfs2_process_blocked_lock(struct ocfs2_super *osb, 3157 struct ocfs2_lock_res *lockres); 3158 3159 /* Mark the lockres as being dropped. It will no longer be 3160 * queued if blocking, but we still may have to wait on it 3161 * being dequeued from the downconvert thread before we can consider 3162 * it safe to drop. 3163 * 3164 * You can *not* attempt to call cluster_lock on this lockres anymore. */ 3165 void ocfs2_mark_lockres_freeing(struct ocfs2_super *osb, 3166 struct ocfs2_lock_res *lockres) 3167 { 3168 int status; 3169 struct ocfs2_mask_waiter mw; 3170 unsigned long flags, flags2; 3171 3172 ocfs2_init_mask_waiter(&mw); 3173 3174 spin_lock_irqsave(&lockres->l_lock, flags); 3175 lockres->l_flags |= OCFS2_LOCK_FREEING; 3176 if (lockres->l_flags & OCFS2_LOCK_QUEUED && current == osb->dc_task) { 3177 /* 3178 * We know the downconvert is queued but not in progress 3179 * because we are the downconvert thread and processing 3180 * different lock. So we can just remove the lock from the 3181 * queue. This is not only an optimization but also a way 3182 * to avoid the following deadlock: 3183 * ocfs2_dentry_post_unlock() 3184 * ocfs2_dentry_lock_put() 3185 * ocfs2_drop_dentry_lock() 3186 * iput() 3187 * ocfs2_evict_inode() 3188 * ocfs2_clear_inode() 3189 * ocfs2_mark_lockres_freeing() 3190 * ... blocks waiting for OCFS2_LOCK_QUEUED 3191 * since we are the downconvert thread which 3192 * should clear the flag. 3193 */ 3194 spin_unlock_irqrestore(&lockres->l_lock, flags); 3195 spin_lock_irqsave(&osb->dc_task_lock, flags2); 3196 list_del_init(&lockres->l_blocked_list); 3197 osb->blocked_lock_count--; 3198 spin_unlock_irqrestore(&osb->dc_task_lock, flags2); 3199 /* 3200 * Warn if we recurse into another post_unlock call. Strictly 3201 * speaking it isn't a problem but we need to be careful if 3202 * that happens (stack overflow, deadlocks, ...) so warn if 3203 * ocfs2 grows a path for which this can happen. 3204 */ 3205 WARN_ON_ONCE(lockres->l_ops->post_unlock); 3206 /* Since the lock is freeing we don't do much in the fn below */ 3207 ocfs2_process_blocked_lock(osb, lockres); 3208 return; 3209 } 3210 while (lockres->l_flags & OCFS2_LOCK_QUEUED) { 3211 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_QUEUED, 0); 3212 spin_unlock_irqrestore(&lockres->l_lock, flags); 3213 3214 mlog(0, "Waiting on lockres %s\n", lockres->l_name); 3215 3216 status = ocfs2_wait_for_mask(&mw); 3217 if (status) 3218 mlog_errno(status); 3219 3220 spin_lock_irqsave(&lockres->l_lock, flags); 3221 } 3222 spin_unlock_irqrestore(&lockres->l_lock, flags); 3223 } 3224 3225 void ocfs2_simple_drop_lockres(struct ocfs2_super *osb, 3226 struct ocfs2_lock_res *lockres) 3227 { 3228 int ret; 3229 3230 ocfs2_mark_lockres_freeing(osb, lockres); 3231 ret = ocfs2_drop_lock(osb, lockres); 3232 if (ret) 3233 mlog_errno(ret); 3234 } 3235 3236 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb) 3237 { 3238 ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres); 3239 ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres); 3240 ocfs2_simple_drop_lockres(osb, &osb->osb_nfs_sync_lockres); 3241 ocfs2_simple_drop_lockres(osb, &osb->osb_orphan_scan.os_lockres); 3242 } 3243 3244 int ocfs2_drop_inode_locks(struct inode *inode) 3245 { 3246 int status, err; 3247 3248 /* No need to call ocfs2_mark_lockres_freeing here - 3249 * ocfs2_clear_inode has done it for us. */ 3250 3251 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 3252 &OCFS2_I(inode)->ip_open_lockres); 3253 if (err < 0) 3254 mlog_errno(err); 3255 3256 status = err; 3257 3258 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 3259 &OCFS2_I(inode)->ip_inode_lockres); 3260 if (err < 0) 3261 mlog_errno(err); 3262 if (err < 0 && !status) 3263 status = err; 3264 3265 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 3266 &OCFS2_I(inode)->ip_rw_lockres); 3267 if (err < 0) 3268 mlog_errno(err); 3269 if (err < 0 && !status) 3270 status = err; 3271 3272 return status; 3273 } 3274 3275 static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres, 3276 int new_level) 3277 { 3278 assert_spin_locked(&lockres->l_lock); 3279 3280 BUG_ON(lockres->l_blocking <= DLM_LOCK_NL); 3281 3282 if (lockres->l_level <= new_level) { 3283 mlog(ML_ERROR, "lockres %s, lvl %d <= %d, blcklst %d, mask %d, " 3284 "type %d, flags 0x%lx, hold %d %d, act %d %d, req %d, " 3285 "block %d, pgen %d\n", lockres->l_name, lockres->l_level, 3286 new_level, list_empty(&lockres->l_blocked_list), 3287 list_empty(&lockres->l_mask_waiters), lockres->l_type, 3288 lockres->l_flags, lockres->l_ro_holders, 3289 lockres->l_ex_holders, lockres->l_action, 3290 lockres->l_unlock_action, lockres->l_requested, 3291 lockres->l_blocking, lockres->l_pending_gen); 3292 BUG(); 3293 } 3294 3295 mlog(ML_BASTS, "lockres %s, level %d => %d, blocking %d\n", 3296 lockres->l_name, lockres->l_level, new_level, lockres->l_blocking); 3297 3298 lockres->l_action = OCFS2_AST_DOWNCONVERT; 3299 lockres->l_requested = new_level; 3300 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 3301 return lockres_set_pending(lockres); 3302 } 3303 3304 static int ocfs2_downconvert_lock(struct ocfs2_super *osb, 3305 struct ocfs2_lock_res *lockres, 3306 int new_level, 3307 int lvb, 3308 unsigned int generation) 3309 { 3310 int ret; 3311 u32 dlm_flags = DLM_LKF_CONVERT; 3312 3313 mlog(ML_BASTS, "lockres %s, level %d => %d\n", lockres->l_name, 3314 lockres->l_level, new_level); 3315 3316 if (lvb) 3317 dlm_flags |= DLM_LKF_VALBLK; 3318 3319 ret = ocfs2_dlm_lock(osb->cconn, 3320 new_level, 3321 &lockres->l_lksb, 3322 dlm_flags, 3323 lockres->l_name, 3324 OCFS2_LOCK_ID_MAX_LEN - 1); 3325 lockres_clear_pending(lockres, generation, osb); 3326 if (ret) { 3327 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 3328 ocfs2_recover_from_dlm_error(lockres, 1); 3329 goto bail; 3330 } 3331 3332 ret = 0; 3333 bail: 3334 return ret; 3335 } 3336 3337 /* returns 1 when the caller should unlock and call ocfs2_dlm_unlock */ 3338 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb, 3339 struct ocfs2_lock_res *lockres) 3340 { 3341 assert_spin_locked(&lockres->l_lock); 3342 3343 if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) { 3344 /* If we're already trying to cancel a lock conversion 3345 * then just drop the spinlock and allow the caller to 3346 * requeue this lock. */ 3347 mlog(ML_BASTS, "lockres %s, skip convert\n", lockres->l_name); 3348 return 0; 3349 } 3350 3351 /* were we in a convert when we got the bast fire? */ 3352 BUG_ON(lockres->l_action != OCFS2_AST_CONVERT && 3353 lockres->l_action != OCFS2_AST_DOWNCONVERT); 3354 /* set things up for the unlockast to know to just 3355 * clear out the ast_action and unset busy, etc. */ 3356 lockres->l_unlock_action = OCFS2_UNLOCK_CANCEL_CONVERT; 3357 3358 mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_BUSY), 3359 "lock %s, invalid flags: 0x%lx\n", 3360 lockres->l_name, lockres->l_flags); 3361 3362 mlog(ML_BASTS, "lockres %s\n", lockres->l_name); 3363 3364 return 1; 3365 } 3366 3367 static int ocfs2_cancel_convert(struct ocfs2_super *osb, 3368 struct ocfs2_lock_res *lockres) 3369 { 3370 int ret; 3371 3372 ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, 3373 DLM_LKF_CANCEL); 3374 if (ret) { 3375 ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres); 3376 ocfs2_recover_from_dlm_error(lockres, 0); 3377 } 3378 3379 mlog(ML_BASTS, "lockres %s\n", lockres->l_name); 3380 3381 return ret; 3382 } 3383 3384 static int ocfs2_unblock_lock(struct ocfs2_super *osb, 3385 struct ocfs2_lock_res *lockres, 3386 struct ocfs2_unblock_ctl *ctl) 3387 { 3388 unsigned long flags; 3389 int blocking; 3390 int new_level; 3391 int level; 3392 int ret = 0; 3393 int set_lvb = 0; 3394 unsigned int gen; 3395 3396 spin_lock_irqsave(&lockres->l_lock, flags); 3397 3398 recheck: 3399 /* 3400 * Is it still blocking? If not, we have no more work to do. 3401 */ 3402 if (!(lockres->l_flags & OCFS2_LOCK_BLOCKED)) { 3403 BUG_ON(lockres->l_blocking != DLM_LOCK_NL); 3404 spin_unlock_irqrestore(&lockres->l_lock, flags); 3405 ret = 0; 3406 goto leave; 3407 } 3408 3409 if (lockres->l_flags & OCFS2_LOCK_BUSY) { 3410 /* XXX 3411 * This is a *big* race. The OCFS2_LOCK_PENDING flag 3412 * exists entirely for one reason - another thread has set 3413 * OCFS2_LOCK_BUSY, but has *NOT* yet called dlm_lock(). 3414 * 3415 * If we do ocfs2_cancel_convert() before the other thread 3416 * calls dlm_lock(), our cancel will do nothing. We will 3417 * get no ast, and we will have no way of knowing the 3418 * cancel failed. Meanwhile, the other thread will call 3419 * into dlm_lock() and wait...forever. 3420 * 3421 * Why forever? Because another node has asked for the 3422 * lock first; that's why we're here in unblock_lock(). 3423 * 3424 * The solution is OCFS2_LOCK_PENDING. When PENDING is 3425 * set, we just requeue the unblock. Only when the other 3426 * thread has called dlm_lock() and cleared PENDING will 3427 * we then cancel their request. 3428 * 3429 * All callers of dlm_lock() must set OCFS2_DLM_PENDING 3430 * at the same time they set OCFS2_DLM_BUSY. They must 3431 * clear OCFS2_DLM_PENDING after dlm_lock() returns. 3432 */ 3433 if (lockres->l_flags & OCFS2_LOCK_PENDING) { 3434 mlog(ML_BASTS, "lockres %s, ReQ: Pending\n", 3435 lockres->l_name); 3436 goto leave_requeue; 3437 } 3438 3439 ctl->requeue = 1; 3440 ret = ocfs2_prepare_cancel_convert(osb, lockres); 3441 spin_unlock_irqrestore(&lockres->l_lock, flags); 3442 if (ret) { 3443 ret = ocfs2_cancel_convert(osb, lockres); 3444 if (ret < 0) 3445 mlog_errno(ret); 3446 } 3447 goto leave; 3448 } 3449 3450 /* 3451 * This prevents livelocks. OCFS2_LOCK_UPCONVERT_FINISHING flag is 3452 * set when the ast is received for an upconvert just before the 3453 * OCFS2_LOCK_BUSY flag is cleared. Now if the fs received a bast 3454 * on the heels of the ast, we want to delay the downconvert just 3455 * enough to allow the up requestor to do its task. Because this 3456 * lock is in the blocked queue, the lock will be downconverted 3457 * as soon as the requestor is done with the lock. 3458 */ 3459 if (lockres->l_flags & OCFS2_LOCK_UPCONVERT_FINISHING) 3460 goto leave_requeue; 3461 3462 /* 3463 * How can we block and yet be at NL? We were trying to upconvert 3464 * from NL and got canceled. The code comes back here, and now 3465 * we notice and clear BLOCKING. 3466 */ 3467 if (lockres->l_level == DLM_LOCK_NL) { 3468 BUG_ON(lockres->l_ex_holders || lockres->l_ro_holders); 3469 mlog(ML_BASTS, "lockres %s, Aborting dc\n", lockres->l_name); 3470 lockres->l_blocking = DLM_LOCK_NL; 3471 lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED); 3472 spin_unlock_irqrestore(&lockres->l_lock, flags); 3473 goto leave; 3474 } 3475 3476 /* if we're blocking an exclusive and we have *any* holders, 3477 * then requeue. */ 3478 if ((lockres->l_blocking == DLM_LOCK_EX) 3479 && (lockres->l_ex_holders || lockres->l_ro_holders)) { 3480 mlog(ML_BASTS, "lockres %s, ReQ: EX/PR Holders %u,%u\n", 3481 lockres->l_name, lockres->l_ex_holders, 3482 lockres->l_ro_holders); 3483 goto leave_requeue; 3484 } 3485 3486 /* If it's a PR we're blocking, then only 3487 * requeue if we've got any EX holders */ 3488 if (lockres->l_blocking == DLM_LOCK_PR && 3489 lockres->l_ex_holders) { 3490 mlog(ML_BASTS, "lockres %s, ReQ: EX Holders %u\n", 3491 lockres->l_name, lockres->l_ex_holders); 3492 goto leave_requeue; 3493 } 3494 3495 /* 3496 * Can we get a lock in this state if the holder counts are 3497 * zero? The meta data unblock code used to check this. 3498 */ 3499 if ((lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 3500 && (lockres->l_flags & OCFS2_LOCK_REFRESHING)) { 3501 mlog(ML_BASTS, "lockres %s, ReQ: Lock Refreshing\n", 3502 lockres->l_name); 3503 goto leave_requeue; 3504 } 3505 3506 new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking); 3507 3508 if (lockres->l_ops->check_downconvert 3509 && !lockres->l_ops->check_downconvert(lockres, new_level)) { 3510 mlog(ML_BASTS, "lockres %s, ReQ: Checkpointing\n", 3511 lockres->l_name); 3512 goto leave_requeue; 3513 } 3514 3515 /* If we get here, then we know that there are no more 3516 * incompatible holders (and anyone asking for an incompatible 3517 * lock is blocked). We can now downconvert the lock */ 3518 if (!lockres->l_ops->downconvert_worker) 3519 goto downconvert; 3520 3521 /* Some lockres types want to do a bit of work before 3522 * downconverting a lock. Allow that here. The worker function 3523 * may sleep, so we save off a copy of what we're blocking as 3524 * it may change while we're not holding the spin lock. */ 3525 blocking = lockres->l_blocking; 3526 level = lockres->l_level; 3527 spin_unlock_irqrestore(&lockres->l_lock, flags); 3528 3529 ctl->unblock_action = lockres->l_ops->downconvert_worker(lockres, blocking); 3530 3531 if (ctl->unblock_action == UNBLOCK_STOP_POST) { 3532 mlog(ML_BASTS, "lockres %s, UNBLOCK_STOP_POST\n", 3533 lockres->l_name); 3534 goto leave; 3535 } 3536 3537 spin_lock_irqsave(&lockres->l_lock, flags); 3538 if ((blocking != lockres->l_blocking) || (level != lockres->l_level)) { 3539 /* If this changed underneath us, then we can't drop 3540 * it just yet. */ 3541 mlog(ML_BASTS, "lockres %s, block=%d:%d, level=%d:%d, " 3542 "Recheck\n", lockres->l_name, blocking, 3543 lockres->l_blocking, level, lockres->l_level); 3544 goto recheck; 3545 } 3546 3547 downconvert: 3548 ctl->requeue = 0; 3549 3550 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) { 3551 if (lockres->l_level == DLM_LOCK_EX) 3552 set_lvb = 1; 3553 3554 /* 3555 * We only set the lvb if the lock has been fully 3556 * refreshed - otherwise we risk setting stale 3557 * data. Otherwise, there's no need to actually clear 3558 * out the lvb here as it's value is still valid. 3559 */ 3560 if (set_lvb && !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) 3561 lockres->l_ops->set_lvb(lockres); 3562 } 3563 3564 gen = ocfs2_prepare_downconvert(lockres, new_level); 3565 spin_unlock_irqrestore(&lockres->l_lock, flags); 3566 ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb, 3567 gen); 3568 3569 leave: 3570 if (ret) 3571 mlog_errno(ret); 3572 return ret; 3573 3574 leave_requeue: 3575 spin_unlock_irqrestore(&lockres->l_lock, flags); 3576 ctl->requeue = 1; 3577 3578 return 0; 3579 } 3580 3581 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres, 3582 int blocking) 3583 { 3584 struct inode *inode; 3585 struct address_space *mapping; 3586 struct ocfs2_inode_info *oi; 3587 3588 inode = ocfs2_lock_res_inode(lockres); 3589 mapping = inode->i_mapping; 3590 3591 if (S_ISDIR(inode->i_mode)) { 3592 oi = OCFS2_I(inode); 3593 oi->ip_dir_lock_gen++; 3594 mlog(0, "generation: %u\n", oi->ip_dir_lock_gen); 3595 goto out; 3596 } 3597 3598 if (!S_ISREG(inode->i_mode)) 3599 goto out; 3600 3601 /* 3602 * We need this before the filemap_fdatawrite() so that it can 3603 * transfer the dirty bit from the PTE to the 3604 * page. Unfortunately this means that even for EX->PR 3605 * downconverts, we'll lose our mappings and have to build 3606 * them up again. 3607 */ 3608 unmap_mapping_range(mapping, 0, 0, 0); 3609 3610 if (filemap_fdatawrite(mapping)) { 3611 mlog(ML_ERROR, "Could not sync inode %llu for downconvert!", 3612 (unsigned long long)OCFS2_I(inode)->ip_blkno); 3613 } 3614 sync_mapping_buffers(mapping); 3615 if (blocking == DLM_LOCK_EX) { 3616 truncate_inode_pages(mapping, 0); 3617 } else { 3618 /* We only need to wait on the I/O if we're not also 3619 * truncating pages because truncate_inode_pages waits 3620 * for us above. We don't truncate pages if we're 3621 * blocking anything < EXMODE because we want to keep 3622 * them around in that case. */ 3623 filemap_fdatawait(mapping); 3624 } 3625 3626 out: 3627 return UNBLOCK_CONTINUE; 3628 } 3629 3630 static int ocfs2_ci_checkpointed(struct ocfs2_caching_info *ci, 3631 struct ocfs2_lock_res *lockres, 3632 int new_level) 3633 { 3634 int checkpointed = ocfs2_ci_fully_checkpointed(ci); 3635 3636 BUG_ON(new_level != DLM_LOCK_NL && new_level != DLM_LOCK_PR); 3637 BUG_ON(lockres->l_level != DLM_LOCK_EX && !checkpointed); 3638 3639 if (checkpointed) 3640 return 1; 3641 3642 ocfs2_start_checkpoint(OCFS2_SB(ocfs2_metadata_cache_get_super(ci))); 3643 return 0; 3644 } 3645 3646 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres, 3647 int new_level) 3648 { 3649 struct inode *inode = ocfs2_lock_res_inode(lockres); 3650 3651 return ocfs2_ci_checkpointed(INODE_CACHE(inode), lockres, new_level); 3652 } 3653 3654 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres) 3655 { 3656 struct inode *inode = ocfs2_lock_res_inode(lockres); 3657 3658 __ocfs2_stuff_meta_lvb(inode); 3659 } 3660 3661 /* 3662 * Does the final reference drop on our dentry lock. Right now this 3663 * happens in the downconvert thread, but we could choose to simplify the 3664 * dlmglue API and push these off to the ocfs2_wq in the future. 3665 */ 3666 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb, 3667 struct ocfs2_lock_res *lockres) 3668 { 3669 struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres); 3670 ocfs2_dentry_lock_put(osb, dl); 3671 } 3672 3673 /* 3674 * d_delete() matching dentries before the lock downconvert. 3675 * 3676 * At this point, any process waiting to destroy the 3677 * dentry_lock due to last ref count is stopped by the 3678 * OCFS2_LOCK_QUEUED flag. 3679 * 3680 * We have two potential problems 3681 * 3682 * 1) If we do the last reference drop on our dentry_lock (via dput) 3683 * we'll wind up in ocfs2_release_dentry_lock(), waiting on 3684 * the downconvert to finish. Instead we take an elevated 3685 * reference and push the drop until after we've completed our 3686 * unblock processing. 3687 * 3688 * 2) There might be another process with a final reference, 3689 * waiting on us to finish processing. If this is the case, we 3690 * detect it and exit out - there's no more dentries anyway. 3691 */ 3692 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres, 3693 int blocking) 3694 { 3695 struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres); 3696 struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode); 3697 struct dentry *dentry; 3698 unsigned long flags; 3699 int extra_ref = 0; 3700 3701 /* 3702 * This node is blocking another node from getting a read 3703 * lock. This happens when we've renamed within a 3704 * directory. We've forced the other nodes to d_delete(), but 3705 * we never actually dropped our lock because it's still 3706 * valid. The downconvert code will retain a PR for this node, 3707 * so there's no further work to do. 3708 */ 3709 if (blocking == DLM_LOCK_PR) 3710 return UNBLOCK_CONTINUE; 3711 3712 /* 3713 * Mark this inode as potentially orphaned. The code in 3714 * ocfs2_delete_inode() will figure out whether it actually 3715 * needs to be freed or not. 3716 */ 3717 spin_lock(&oi->ip_lock); 3718 oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED; 3719 spin_unlock(&oi->ip_lock); 3720 3721 /* 3722 * Yuck. We need to make sure however that the check of 3723 * OCFS2_LOCK_FREEING and the extra reference are atomic with 3724 * respect to a reference decrement or the setting of that 3725 * flag. 3726 */ 3727 spin_lock_irqsave(&lockres->l_lock, flags); 3728 spin_lock(&dentry_attach_lock); 3729 if (!(lockres->l_flags & OCFS2_LOCK_FREEING) 3730 && dl->dl_count) { 3731 dl->dl_count++; 3732 extra_ref = 1; 3733 } 3734 spin_unlock(&dentry_attach_lock); 3735 spin_unlock_irqrestore(&lockres->l_lock, flags); 3736 3737 mlog(0, "extra_ref = %d\n", extra_ref); 3738 3739 /* 3740 * We have a process waiting on us in ocfs2_dentry_iput(), 3741 * which means we can't have any more outstanding 3742 * aliases. There's no need to do any more work. 3743 */ 3744 if (!extra_ref) 3745 return UNBLOCK_CONTINUE; 3746 3747 spin_lock(&dentry_attach_lock); 3748 while (1) { 3749 dentry = ocfs2_find_local_alias(dl->dl_inode, 3750 dl->dl_parent_blkno, 1); 3751 if (!dentry) 3752 break; 3753 spin_unlock(&dentry_attach_lock); 3754 3755 if (S_ISDIR(dl->dl_inode->i_mode)) 3756 shrink_dcache_parent(dentry); 3757 3758 mlog(0, "d_delete(%pd);\n", dentry); 3759 3760 /* 3761 * The following dcache calls may do an 3762 * iput(). Normally we don't want that from the 3763 * downconverting thread, but in this case it's ok 3764 * because the requesting node already has an 3765 * exclusive lock on the inode, so it can't be queued 3766 * for a downconvert. 3767 */ 3768 d_delete(dentry); 3769 dput(dentry); 3770 3771 spin_lock(&dentry_attach_lock); 3772 } 3773 spin_unlock(&dentry_attach_lock); 3774 3775 /* 3776 * If we are the last holder of this dentry lock, there is no 3777 * reason to downconvert so skip straight to the unlock. 3778 */ 3779 if (dl->dl_count == 1) 3780 return UNBLOCK_STOP_POST; 3781 3782 return UNBLOCK_CONTINUE_POST; 3783 } 3784 3785 static int ocfs2_check_refcount_downconvert(struct ocfs2_lock_res *lockres, 3786 int new_level) 3787 { 3788 struct ocfs2_refcount_tree *tree = 3789 ocfs2_lock_res_refcount_tree(lockres); 3790 3791 return ocfs2_ci_checkpointed(&tree->rf_ci, lockres, new_level); 3792 } 3793 3794 static int ocfs2_refcount_convert_worker(struct ocfs2_lock_res *lockres, 3795 int blocking) 3796 { 3797 struct ocfs2_refcount_tree *tree = 3798 ocfs2_lock_res_refcount_tree(lockres); 3799 3800 ocfs2_metadata_cache_purge(&tree->rf_ci); 3801 3802 return UNBLOCK_CONTINUE; 3803 } 3804 3805 static void ocfs2_set_qinfo_lvb(struct ocfs2_lock_res *lockres) 3806 { 3807 struct ocfs2_qinfo_lvb *lvb; 3808 struct ocfs2_mem_dqinfo *oinfo = ocfs2_lock_res_qinfo(lockres); 3809 struct mem_dqinfo *info = sb_dqinfo(oinfo->dqi_gi.dqi_sb, 3810 oinfo->dqi_gi.dqi_type); 3811 3812 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 3813 lvb->lvb_version = OCFS2_QINFO_LVB_VERSION; 3814 lvb->lvb_bgrace = cpu_to_be32(info->dqi_bgrace); 3815 lvb->lvb_igrace = cpu_to_be32(info->dqi_igrace); 3816 lvb->lvb_syncms = cpu_to_be32(oinfo->dqi_syncms); 3817 lvb->lvb_blocks = cpu_to_be32(oinfo->dqi_gi.dqi_blocks); 3818 lvb->lvb_free_blk = cpu_to_be32(oinfo->dqi_gi.dqi_free_blk); 3819 lvb->lvb_free_entry = cpu_to_be32(oinfo->dqi_gi.dqi_free_entry); 3820 } 3821 3822 void ocfs2_qinfo_unlock(struct ocfs2_mem_dqinfo *oinfo, int ex) 3823 { 3824 struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock; 3825 struct ocfs2_super *osb = OCFS2_SB(oinfo->dqi_gi.dqi_sb); 3826 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 3827 3828 if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb)) 3829 ocfs2_cluster_unlock(osb, lockres, level); 3830 } 3831 3832 static int ocfs2_refresh_qinfo(struct ocfs2_mem_dqinfo *oinfo) 3833 { 3834 struct mem_dqinfo *info = sb_dqinfo(oinfo->dqi_gi.dqi_sb, 3835 oinfo->dqi_gi.dqi_type); 3836 struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock; 3837 struct ocfs2_qinfo_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 3838 struct buffer_head *bh = NULL; 3839 struct ocfs2_global_disk_dqinfo *gdinfo; 3840 int status = 0; 3841 3842 if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) && 3843 lvb->lvb_version == OCFS2_QINFO_LVB_VERSION) { 3844 info->dqi_bgrace = be32_to_cpu(lvb->lvb_bgrace); 3845 info->dqi_igrace = be32_to_cpu(lvb->lvb_igrace); 3846 oinfo->dqi_syncms = be32_to_cpu(lvb->lvb_syncms); 3847 oinfo->dqi_gi.dqi_blocks = be32_to_cpu(lvb->lvb_blocks); 3848 oinfo->dqi_gi.dqi_free_blk = be32_to_cpu(lvb->lvb_free_blk); 3849 oinfo->dqi_gi.dqi_free_entry = 3850 be32_to_cpu(lvb->lvb_free_entry); 3851 } else { 3852 status = ocfs2_read_quota_phys_block(oinfo->dqi_gqinode, 3853 oinfo->dqi_giblk, &bh); 3854 if (status) { 3855 mlog_errno(status); 3856 goto bail; 3857 } 3858 gdinfo = (struct ocfs2_global_disk_dqinfo *) 3859 (bh->b_data + OCFS2_GLOBAL_INFO_OFF); 3860 info->dqi_bgrace = le32_to_cpu(gdinfo->dqi_bgrace); 3861 info->dqi_igrace = le32_to_cpu(gdinfo->dqi_igrace); 3862 oinfo->dqi_syncms = le32_to_cpu(gdinfo->dqi_syncms); 3863 oinfo->dqi_gi.dqi_blocks = le32_to_cpu(gdinfo->dqi_blocks); 3864 oinfo->dqi_gi.dqi_free_blk = le32_to_cpu(gdinfo->dqi_free_blk); 3865 oinfo->dqi_gi.dqi_free_entry = 3866 le32_to_cpu(gdinfo->dqi_free_entry); 3867 brelse(bh); 3868 ocfs2_track_lock_refresh(lockres); 3869 } 3870 3871 bail: 3872 return status; 3873 } 3874 3875 /* Lock quota info, this function expects at least shared lock on the quota file 3876 * so that we can safely refresh quota info from disk. */ 3877 int ocfs2_qinfo_lock(struct ocfs2_mem_dqinfo *oinfo, int ex) 3878 { 3879 struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock; 3880 struct ocfs2_super *osb = OCFS2_SB(oinfo->dqi_gi.dqi_sb); 3881 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 3882 int status = 0; 3883 3884 /* On RO devices, locking really isn't needed... */ 3885 if (ocfs2_is_hard_readonly(osb)) { 3886 if (ex) 3887 status = -EROFS; 3888 goto bail; 3889 } 3890 if (ocfs2_mount_local(osb)) 3891 goto bail; 3892 3893 status = ocfs2_cluster_lock(osb, lockres, level, 0, 0); 3894 if (status < 0) { 3895 mlog_errno(status); 3896 goto bail; 3897 } 3898 if (!ocfs2_should_refresh_lock_res(lockres)) 3899 goto bail; 3900 /* OK, we have the lock but we need to refresh the quota info */ 3901 status = ocfs2_refresh_qinfo(oinfo); 3902 if (status) 3903 ocfs2_qinfo_unlock(oinfo, ex); 3904 ocfs2_complete_lock_res_refresh(lockres, status); 3905 bail: 3906 return status; 3907 } 3908 3909 int ocfs2_refcount_lock(struct ocfs2_refcount_tree *ref_tree, int ex) 3910 { 3911 int status; 3912 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 3913 struct ocfs2_lock_res *lockres = &ref_tree->rf_lockres; 3914 struct ocfs2_super *osb = lockres->l_priv; 3915 3916 3917 if (ocfs2_is_hard_readonly(osb)) 3918 return -EROFS; 3919 3920 if (ocfs2_mount_local(osb)) 3921 return 0; 3922 3923 status = ocfs2_cluster_lock(osb, lockres, level, 0, 0); 3924 if (status < 0) 3925 mlog_errno(status); 3926 3927 return status; 3928 } 3929 3930 void ocfs2_refcount_unlock(struct ocfs2_refcount_tree *ref_tree, int ex) 3931 { 3932 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 3933 struct ocfs2_lock_res *lockres = &ref_tree->rf_lockres; 3934 struct ocfs2_super *osb = lockres->l_priv; 3935 3936 if (!ocfs2_mount_local(osb)) 3937 ocfs2_cluster_unlock(osb, lockres, level); 3938 } 3939 3940 static void ocfs2_process_blocked_lock(struct ocfs2_super *osb, 3941 struct ocfs2_lock_res *lockres) 3942 { 3943 int status; 3944 struct ocfs2_unblock_ctl ctl = {0, 0,}; 3945 unsigned long flags; 3946 3947 /* Our reference to the lockres in this function can be 3948 * considered valid until we remove the OCFS2_LOCK_QUEUED 3949 * flag. */ 3950 3951 BUG_ON(!lockres); 3952 BUG_ON(!lockres->l_ops); 3953 3954 mlog(ML_BASTS, "lockres %s blocked\n", lockres->l_name); 3955 3956 /* Detect whether a lock has been marked as going away while 3957 * the downconvert thread was processing other things. A lock can 3958 * still be marked with OCFS2_LOCK_FREEING after this check, 3959 * but short circuiting here will still save us some 3960 * performance. */ 3961 spin_lock_irqsave(&lockres->l_lock, flags); 3962 if (lockres->l_flags & OCFS2_LOCK_FREEING) 3963 goto unqueue; 3964 spin_unlock_irqrestore(&lockres->l_lock, flags); 3965 3966 status = ocfs2_unblock_lock(osb, lockres, &ctl); 3967 if (status < 0) 3968 mlog_errno(status); 3969 3970 spin_lock_irqsave(&lockres->l_lock, flags); 3971 unqueue: 3972 if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) { 3973 lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED); 3974 } else 3975 ocfs2_schedule_blocked_lock(osb, lockres); 3976 3977 mlog(ML_BASTS, "lockres %s, requeue = %s.\n", lockres->l_name, 3978 ctl.requeue ? "yes" : "no"); 3979 spin_unlock_irqrestore(&lockres->l_lock, flags); 3980 3981 if (ctl.unblock_action != UNBLOCK_CONTINUE 3982 && lockres->l_ops->post_unlock) 3983 lockres->l_ops->post_unlock(osb, lockres); 3984 } 3985 3986 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb, 3987 struct ocfs2_lock_res *lockres) 3988 { 3989 unsigned long flags; 3990 3991 assert_spin_locked(&lockres->l_lock); 3992 3993 if (lockres->l_flags & OCFS2_LOCK_FREEING) { 3994 /* Do not schedule a lock for downconvert when it's on 3995 * the way to destruction - any nodes wanting access 3996 * to the resource will get it soon. */ 3997 mlog(ML_BASTS, "lockres %s won't be scheduled: flags 0x%lx\n", 3998 lockres->l_name, lockres->l_flags); 3999 return; 4000 } 4001 4002 lockres_or_flags(lockres, OCFS2_LOCK_QUEUED); 4003 4004 spin_lock_irqsave(&osb->dc_task_lock, flags); 4005 if (list_empty(&lockres->l_blocked_list)) { 4006 list_add_tail(&lockres->l_blocked_list, 4007 &osb->blocked_lock_list); 4008 osb->blocked_lock_count++; 4009 } 4010 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4011 } 4012 4013 static void ocfs2_downconvert_thread_do_work(struct ocfs2_super *osb) 4014 { 4015 unsigned long processed; 4016 unsigned long flags; 4017 struct ocfs2_lock_res *lockres; 4018 4019 spin_lock_irqsave(&osb->dc_task_lock, flags); 4020 /* grab this early so we know to try again if a state change and 4021 * wake happens part-way through our work */ 4022 osb->dc_work_sequence = osb->dc_wake_sequence; 4023 4024 processed = osb->blocked_lock_count; 4025 /* 4026 * blocked lock processing in this loop might call iput which can 4027 * remove items off osb->blocked_lock_list. Downconvert up to 4028 * 'processed' number of locks, but stop short if we had some 4029 * removed in ocfs2_mark_lockres_freeing when downconverting. 4030 */ 4031 while (processed && !list_empty(&osb->blocked_lock_list)) { 4032 lockres = list_entry(osb->blocked_lock_list.next, 4033 struct ocfs2_lock_res, l_blocked_list); 4034 list_del_init(&lockres->l_blocked_list); 4035 osb->blocked_lock_count--; 4036 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4037 4038 BUG_ON(!processed); 4039 processed--; 4040 4041 ocfs2_process_blocked_lock(osb, lockres); 4042 4043 spin_lock_irqsave(&osb->dc_task_lock, flags); 4044 } 4045 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4046 } 4047 4048 static int ocfs2_downconvert_thread_lists_empty(struct ocfs2_super *osb) 4049 { 4050 int empty = 0; 4051 unsigned long flags; 4052 4053 spin_lock_irqsave(&osb->dc_task_lock, flags); 4054 if (list_empty(&osb->blocked_lock_list)) 4055 empty = 1; 4056 4057 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4058 return empty; 4059 } 4060 4061 static int ocfs2_downconvert_thread_should_wake(struct ocfs2_super *osb) 4062 { 4063 int should_wake = 0; 4064 unsigned long flags; 4065 4066 spin_lock_irqsave(&osb->dc_task_lock, flags); 4067 if (osb->dc_work_sequence != osb->dc_wake_sequence) 4068 should_wake = 1; 4069 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4070 4071 return should_wake; 4072 } 4073 4074 static int ocfs2_downconvert_thread(void *arg) 4075 { 4076 int status = 0; 4077 struct ocfs2_super *osb = arg; 4078 4079 /* only quit once we've been asked to stop and there is no more 4080 * work available */ 4081 while (!(kthread_should_stop() && 4082 ocfs2_downconvert_thread_lists_empty(osb))) { 4083 4084 wait_event_interruptible(osb->dc_event, 4085 ocfs2_downconvert_thread_should_wake(osb) || 4086 kthread_should_stop()); 4087 4088 mlog(0, "downconvert_thread: awoken\n"); 4089 4090 ocfs2_downconvert_thread_do_work(osb); 4091 } 4092 4093 osb->dc_task = NULL; 4094 return status; 4095 } 4096 4097 void ocfs2_wake_downconvert_thread(struct ocfs2_super *osb) 4098 { 4099 unsigned long flags; 4100 4101 spin_lock_irqsave(&osb->dc_task_lock, flags); 4102 /* make sure the voting thread gets a swipe at whatever changes 4103 * the caller may have made to the voting state */ 4104 osb->dc_wake_sequence++; 4105 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4106 wake_up(&osb->dc_event); 4107 } 4108