1 /* -*- mode: c; c-basic-offset: 8; -*- 2 * vim: noexpandtab sw=8 ts=8 sts=0: 3 * 4 * dlmglue.c 5 * 6 * Code which implements an OCFS2 specific interface to our DLM. 7 * 8 * Copyright (C) 2003, 2004 Oracle. All rights reserved. 9 * 10 * This program is free software; you can redistribute it and/or 11 * modify it under the terms of the GNU General Public 12 * License as published by the Free Software Foundation; either 13 * version 2 of the License, or (at your option) any later version. 14 * 15 * This program is distributed in the hope that it will be useful, 16 * but WITHOUT ANY WARRANTY; without even the implied warranty of 17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 18 * General Public License for more details. 19 * 20 * You should have received a copy of the GNU General Public 21 * License along with this program; if not, write to the 22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 23 * Boston, MA 021110-1307, USA. 24 */ 25 26 #include <linux/types.h> 27 #include <linux/slab.h> 28 #include <linux/highmem.h> 29 #include <linux/mm.h> 30 #include <linux/kthread.h> 31 #include <linux/pagemap.h> 32 #include <linux/debugfs.h> 33 #include <linux/seq_file.h> 34 #include <linux/time.h> 35 #include <linux/quotaops.h> 36 37 #define MLOG_MASK_PREFIX ML_DLM_GLUE 38 #include <cluster/masklog.h> 39 40 #include "ocfs2.h" 41 #include "ocfs2_lockingver.h" 42 43 #include "alloc.h" 44 #include "dcache.h" 45 #include "dlmglue.h" 46 #include "extent_map.h" 47 #include "file.h" 48 #include "heartbeat.h" 49 #include "inode.h" 50 #include "journal.h" 51 #include "stackglue.h" 52 #include "slot_map.h" 53 #include "super.h" 54 #include "uptodate.h" 55 #include "quota.h" 56 #include "refcounttree.h" 57 58 #include "buffer_head_io.h" 59 60 struct ocfs2_mask_waiter { 61 struct list_head mw_item; 62 int mw_status; 63 struct completion mw_complete; 64 unsigned long mw_mask; 65 unsigned long mw_goal; 66 #ifdef CONFIG_OCFS2_FS_STATS 67 ktime_t mw_lock_start; 68 #endif 69 }; 70 71 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres); 72 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres); 73 static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres); 74 static struct ocfs2_super *ocfs2_get_qinfo_osb(struct ocfs2_lock_res *lockres); 75 76 /* 77 * Return value from ->downconvert_worker functions. 78 * 79 * These control the precise actions of ocfs2_unblock_lock() 80 * and ocfs2_process_blocked_lock() 81 * 82 */ 83 enum ocfs2_unblock_action { 84 UNBLOCK_CONTINUE = 0, /* Continue downconvert */ 85 UNBLOCK_CONTINUE_POST = 1, /* Continue downconvert, fire 86 * ->post_unlock callback */ 87 UNBLOCK_STOP_POST = 2, /* Do not downconvert, fire 88 * ->post_unlock() callback. */ 89 }; 90 91 struct ocfs2_unblock_ctl { 92 int requeue; 93 enum ocfs2_unblock_action unblock_action; 94 }; 95 96 /* Lockdep class keys */ 97 struct lock_class_key lockdep_keys[OCFS2_NUM_LOCK_TYPES]; 98 99 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres, 100 int new_level); 101 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres); 102 103 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres, 104 int blocking); 105 106 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres, 107 int blocking); 108 109 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb, 110 struct ocfs2_lock_res *lockres); 111 112 static void ocfs2_set_qinfo_lvb(struct ocfs2_lock_res *lockres); 113 114 static int ocfs2_check_refcount_downconvert(struct ocfs2_lock_res *lockres, 115 int new_level); 116 static int ocfs2_refcount_convert_worker(struct ocfs2_lock_res *lockres, 117 int blocking); 118 119 #define mlog_meta_lvb(__level, __lockres) ocfs2_dump_meta_lvb_info(__level, __PRETTY_FUNCTION__, __LINE__, __lockres) 120 121 /* This aids in debugging situations where a bad LVB might be involved. */ 122 static void ocfs2_dump_meta_lvb_info(u64 level, 123 const char *function, 124 unsigned int line, 125 struct ocfs2_lock_res *lockres) 126 { 127 struct ocfs2_meta_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 128 129 mlog(level, "LVB information for %s (called from %s:%u):\n", 130 lockres->l_name, function, line); 131 mlog(level, "version: %u, clusters: %u, generation: 0x%x\n", 132 lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters), 133 be32_to_cpu(lvb->lvb_igeneration)); 134 mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n", 135 (unsigned long long)be64_to_cpu(lvb->lvb_isize), 136 be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid), 137 be16_to_cpu(lvb->lvb_imode)); 138 mlog(level, "nlink %u, atime_packed 0x%llx, ctime_packed 0x%llx, " 139 "mtime_packed 0x%llx iattr 0x%x\n", be16_to_cpu(lvb->lvb_inlink), 140 (long long)be64_to_cpu(lvb->lvb_iatime_packed), 141 (long long)be64_to_cpu(lvb->lvb_ictime_packed), 142 (long long)be64_to_cpu(lvb->lvb_imtime_packed), 143 be32_to_cpu(lvb->lvb_iattr)); 144 } 145 146 147 /* 148 * OCFS2 Lock Resource Operations 149 * 150 * These fine tune the behavior of the generic dlmglue locking infrastructure. 151 * 152 * The most basic of lock types can point ->l_priv to their respective 153 * struct ocfs2_super and allow the default actions to manage things. 154 * 155 * Right now, each lock type also needs to implement an init function, 156 * and trivial lock/unlock wrappers. ocfs2_simple_drop_lockres() 157 * should be called when the lock is no longer needed (i.e., object 158 * destruction time). 159 */ 160 struct ocfs2_lock_res_ops { 161 /* 162 * Translate an ocfs2_lock_res * into an ocfs2_super *. Define 163 * this callback if ->l_priv is not an ocfs2_super pointer 164 */ 165 struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *); 166 167 /* 168 * Optionally called in the downconvert thread after a 169 * successful downconvert. The lockres will not be referenced 170 * after this callback is called, so it is safe to free 171 * memory, etc. 172 * 173 * The exact semantics of when this is called are controlled 174 * by ->downconvert_worker() 175 */ 176 void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *); 177 178 /* 179 * Allow a lock type to add checks to determine whether it is 180 * safe to downconvert a lock. Return 0 to re-queue the 181 * downconvert at a later time, nonzero to continue. 182 * 183 * For most locks, the default checks that there are no 184 * incompatible holders are sufficient. 185 * 186 * Called with the lockres spinlock held. 187 */ 188 int (*check_downconvert)(struct ocfs2_lock_res *, int); 189 190 /* 191 * Allows a lock type to populate the lock value block. This 192 * is called on downconvert, and when we drop a lock. 193 * 194 * Locks that want to use this should set LOCK_TYPE_USES_LVB 195 * in the flags field. 196 * 197 * Called with the lockres spinlock held. 198 */ 199 void (*set_lvb)(struct ocfs2_lock_res *); 200 201 /* 202 * Called from the downconvert thread when it is determined 203 * that a lock will be downconverted. This is called without 204 * any locks held so the function can do work that might 205 * schedule (syncing out data, etc). 206 * 207 * This should return any one of the ocfs2_unblock_action 208 * values, depending on what it wants the thread to do. 209 */ 210 int (*downconvert_worker)(struct ocfs2_lock_res *, int); 211 212 /* 213 * LOCK_TYPE_* flags which describe the specific requirements 214 * of a lock type. Descriptions of each individual flag follow. 215 */ 216 int flags; 217 }; 218 219 /* 220 * Some locks want to "refresh" potentially stale data when a 221 * meaningful (PRMODE or EXMODE) lock level is first obtained. If this 222 * flag is set, the OCFS2_LOCK_NEEDS_REFRESH flag will be set on the 223 * individual lockres l_flags member from the ast function. It is 224 * expected that the locking wrapper will clear the 225 * OCFS2_LOCK_NEEDS_REFRESH flag when done. 226 */ 227 #define LOCK_TYPE_REQUIRES_REFRESH 0x1 228 229 /* 230 * Indicate that a lock type makes use of the lock value block. The 231 * ->set_lvb lock type callback must be defined. 232 */ 233 #define LOCK_TYPE_USES_LVB 0x2 234 235 static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = { 236 .get_osb = ocfs2_get_inode_osb, 237 .flags = 0, 238 }; 239 240 static struct ocfs2_lock_res_ops ocfs2_inode_inode_lops = { 241 .get_osb = ocfs2_get_inode_osb, 242 .check_downconvert = ocfs2_check_meta_downconvert, 243 .set_lvb = ocfs2_set_meta_lvb, 244 .downconvert_worker = ocfs2_data_convert_worker, 245 .flags = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB, 246 }; 247 248 static struct ocfs2_lock_res_ops ocfs2_super_lops = { 249 .flags = LOCK_TYPE_REQUIRES_REFRESH, 250 }; 251 252 static struct ocfs2_lock_res_ops ocfs2_rename_lops = { 253 .flags = 0, 254 }; 255 256 static struct ocfs2_lock_res_ops ocfs2_nfs_sync_lops = { 257 .flags = 0, 258 }; 259 260 static struct ocfs2_lock_res_ops ocfs2_orphan_scan_lops = { 261 .flags = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB, 262 }; 263 264 static struct ocfs2_lock_res_ops ocfs2_dentry_lops = { 265 .get_osb = ocfs2_get_dentry_osb, 266 .post_unlock = ocfs2_dentry_post_unlock, 267 .downconvert_worker = ocfs2_dentry_convert_worker, 268 .flags = 0, 269 }; 270 271 static struct ocfs2_lock_res_ops ocfs2_inode_open_lops = { 272 .get_osb = ocfs2_get_inode_osb, 273 .flags = 0, 274 }; 275 276 static struct ocfs2_lock_res_ops ocfs2_flock_lops = { 277 .get_osb = ocfs2_get_file_osb, 278 .flags = 0, 279 }; 280 281 static struct ocfs2_lock_res_ops ocfs2_qinfo_lops = { 282 .set_lvb = ocfs2_set_qinfo_lvb, 283 .get_osb = ocfs2_get_qinfo_osb, 284 .flags = LOCK_TYPE_REQUIRES_REFRESH | LOCK_TYPE_USES_LVB, 285 }; 286 287 static struct ocfs2_lock_res_ops ocfs2_refcount_block_lops = { 288 .check_downconvert = ocfs2_check_refcount_downconvert, 289 .downconvert_worker = ocfs2_refcount_convert_worker, 290 .flags = 0, 291 }; 292 293 static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres) 294 { 295 return lockres->l_type == OCFS2_LOCK_TYPE_META || 296 lockres->l_type == OCFS2_LOCK_TYPE_RW || 297 lockres->l_type == OCFS2_LOCK_TYPE_OPEN; 298 } 299 300 static inline struct ocfs2_lock_res *ocfs2_lksb_to_lock_res(struct ocfs2_dlm_lksb *lksb) 301 { 302 return container_of(lksb, struct ocfs2_lock_res, l_lksb); 303 } 304 305 static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres) 306 { 307 BUG_ON(!ocfs2_is_inode_lock(lockres)); 308 309 return (struct inode *) lockres->l_priv; 310 } 311 312 static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres) 313 { 314 BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY); 315 316 return (struct ocfs2_dentry_lock *)lockres->l_priv; 317 } 318 319 static inline struct ocfs2_mem_dqinfo *ocfs2_lock_res_qinfo(struct ocfs2_lock_res *lockres) 320 { 321 BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_QINFO); 322 323 return (struct ocfs2_mem_dqinfo *)lockres->l_priv; 324 } 325 326 static inline struct ocfs2_refcount_tree * 327 ocfs2_lock_res_refcount_tree(struct ocfs2_lock_res *res) 328 { 329 return container_of(res, struct ocfs2_refcount_tree, rf_lockres); 330 } 331 332 static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *lockres) 333 { 334 if (lockres->l_ops->get_osb) 335 return lockres->l_ops->get_osb(lockres); 336 337 return (struct ocfs2_super *)lockres->l_priv; 338 } 339 340 static int ocfs2_lock_create(struct ocfs2_super *osb, 341 struct ocfs2_lock_res *lockres, 342 int level, 343 u32 dlm_flags); 344 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres, 345 int wanted); 346 static void __ocfs2_cluster_unlock(struct ocfs2_super *osb, 347 struct ocfs2_lock_res *lockres, 348 int level, unsigned long caller_ip); 349 static inline void ocfs2_cluster_unlock(struct ocfs2_super *osb, 350 struct ocfs2_lock_res *lockres, 351 int level) 352 { 353 __ocfs2_cluster_unlock(osb, lockres, level, _RET_IP_); 354 } 355 356 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres); 357 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres); 358 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres); 359 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, int level); 360 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb, 361 struct ocfs2_lock_res *lockres); 362 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres, 363 int convert); 364 #define ocfs2_log_dlm_error(_func, _err, _lockres) do { \ 365 if ((_lockres)->l_type != OCFS2_LOCK_TYPE_DENTRY) \ 366 mlog(ML_ERROR, "DLM error %d while calling %s on resource %s\n", \ 367 _err, _func, _lockres->l_name); \ 368 else \ 369 mlog(ML_ERROR, "DLM error %d while calling %s on resource %.*s%08x\n", \ 370 _err, _func, OCFS2_DENTRY_LOCK_INO_START - 1, (_lockres)->l_name, \ 371 (unsigned int)ocfs2_get_dentry_lock_ino(_lockres)); \ 372 } while (0) 373 static int ocfs2_downconvert_thread(void *arg); 374 static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb, 375 struct ocfs2_lock_res *lockres); 376 static int ocfs2_inode_lock_update(struct inode *inode, 377 struct buffer_head **bh); 378 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb); 379 static inline int ocfs2_highest_compat_lock_level(int level); 380 static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres, 381 int new_level); 382 static int ocfs2_downconvert_lock(struct ocfs2_super *osb, 383 struct ocfs2_lock_res *lockres, 384 int new_level, 385 int lvb, 386 unsigned int generation); 387 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb, 388 struct ocfs2_lock_res *lockres); 389 static int ocfs2_cancel_convert(struct ocfs2_super *osb, 390 struct ocfs2_lock_res *lockres); 391 392 393 static void ocfs2_build_lock_name(enum ocfs2_lock_type type, 394 u64 blkno, 395 u32 generation, 396 char *name) 397 { 398 int len; 399 400 BUG_ON(type >= OCFS2_NUM_LOCK_TYPES); 401 402 len = snprintf(name, OCFS2_LOCK_ID_MAX_LEN, "%c%s%016llx%08x", 403 ocfs2_lock_type_char(type), OCFS2_LOCK_ID_PAD, 404 (long long)blkno, generation); 405 406 BUG_ON(len != (OCFS2_LOCK_ID_MAX_LEN - 1)); 407 408 mlog(0, "built lock resource with name: %s\n", name); 409 } 410 411 static DEFINE_SPINLOCK(ocfs2_dlm_tracking_lock); 412 413 static void ocfs2_add_lockres_tracking(struct ocfs2_lock_res *res, 414 struct ocfs2_dlm_debug *dlm_debug) 415 { 416 mlog(0, "Add tracking for lockres %s\n", res->l_name); 417 418 spin_lock(&ocfs2_dlm_tracking_lock); 419 list_add(&res->l_debug_list, &dlm_debug->d_lockres_tracking); 420 spin_unlock(&ocfs2_dlm_tracking_lock); 421 } 422 423 static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res) 424 { 425 spin_lock(&ocfs2_dlm_tracking_lock); 426 if (!list_empty(&res->l_debug_list)) 427 list_del_init(&res->l_debug_list); 428 spin_unlock(&ocfs2_dlm_tracking_lock); 429 } 430 431 #ifdef CONFIG_OCFS2_FS_STATS 432 static void ocfs2_init_lock_stats(struct ocfs2_lock_res *res) 433 { 434 res->l_lock_refresh = 0; 435 memset(&res->l_lock_prmode, 0, sizeof(struct ocfs2_lock_stats)); 436 memset(&res->l_lock_exmode, 0, sizeof(struct ocfs2_lock_stats)); 437 } 438 439 static void ocfs2_update_lock_stats(struct ocfs2_lock_res *res, int level, 440 struct ocfs2_mask_waiter *mw, int ret) 441 { 442 u32 usec; 443 ktime_t kt; 444 struct ocfs2_lock_stats *stats; 445 446 if (level == LKM_PRMODE) 447 stats = &res->l_lock_prmode; 448 else if (level == LKM_EXMODE) 449 stats = &res->l_lock_exmode; 450 else 451 return; 452 453 kt = ktime_sub(ktime_get(), mw->mw_lock_start); 454 usec = ktime_to_us(kt); 455 456 stats->ls_gets++; 457 stats->ls_total += ktime_to_ns(kt); 458 /* overflow */ 459 if (unlikely(stats->ls_gets == 0)) { 460 stats->ls_gets++; 461 stats->ls_total = ktime_to_ns(kt); 462 } 463 464 if (stats->ls_max < usec) 465 stats->ls_max = usec; 466 467 if (ret) 468 stats->ls_fail++; 469 } 470 471 static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres) 472 { 473 lockres->l_lock_refresh++; 474 } 475 476 static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw) 477 { 478 mw->mw_lock_start = ktime_get(); 479 } 480 #else 481 static inline void ocfs2_init_lock_stats(struct ocfs2_lock_res *res) 482 { 483 } 484 static inline void ocfs2_update_lock_stats(struct ocfs2_lock_res *res, 485 int level, struct ocfs2_mask_waiter *mw, int ret) 486 { 487 } 488 static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres) 489 { 490 } 491 static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw) 492 { 493 } 494 #endif 495 496 static void ocfs2_lock_res_init_common(struct ocfs2_super *osb, 497 struct ocfs2_lock_res *res, 498 enum ocfs2_lock_type type, 499 struct ocfs2_lock_res_ops *ops, 500 void *priv) 501 { 502 res->l_type = type; 503 res->l_ops = ops; 504 res->l_priv = priv; 505 506 res->l_level = DLM_LOCK_IV; 507 res->l_requested = DLM_LOCK_IV; 508 res->l_blocking = DLM_LOCK_IV; 509 res->l_action = OCFS2_AST_INVALID; 510 res->l_unlock_action = OCFS2_UNLOCK_INVALID; 511 512 res->l_flags = OCFS2_LOCK_INITIALIZED; 513 514 ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug); 515 516 ocfs2_init_lock_stats(res); 517 #ifdef CONFIG_DEBUG_LOCK_ALLOC 518 if (type != OCFS2_LOCK_TYPE_OPEN) 519 lockdep_init_map(&res->l_lockdep_map, ocfs2_lock_type_strings[type], 520 &lockdep_keys[type], 0); 521 else 522 res->l_lockdep_map.key = NULL; 523 #endif 524 } 525 526 void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res) 527 { 528 /* This also clears out the lock status block */ 529 memset(res, 0, sizeof(struct ocfs2_lock_res)); 530 spin_lock_init(&res->l_lock); 531 init_waitqueue_head(&res->l_event); 532 INIT_LIST_HEAD(&res->l_blocked_list); 533 INIT_LIST_HEAD(&res->l_mask_waiters); 534 } 535 536 void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res, 537 enum ocfs2_lock_type type, 538 unsigned int generation, 539 struct inode *inode) 540 { 541 struct ocfs2_lock_res_ops *ops; 542 543 switch(type) { 544 case OCFS2_LOCK_TYPE_RW: 545 ops = &ocfs2_inode_rw_lops; 546 break; 547 case OCFS2_LOCK_TYPE_META: 548 ops = &ocfs2_inode_inode_lops; 549 break; 550 case OCFS2_LOCK_TYPE_OPEN: 551 ops = &ocfs2_inode_open_lops; 552 break; 553 default: 554 mlog_bug_on_msg(1, "type: %d\n", type); 555 ops = NULL; /* thanks, gcc */ 556 break; 557 }; 558 559 ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno, 560 generation, res->l_name); 561 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode); 562 } 563 564 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres) 565 { 566 struct inode *inode = ocfs2_lock_res_inode(lockres); 567 568 return OCFS2_SB(inode->i_sb); 569 } 570 571 static struct ocfs2_super *ocfs2_get_qinfo_osb(struct ocfs2_lock_res *lockres) 572 { 573 struct ocfs2_mem_dqinfo *info = lockres->l_priv; 574 575 return OCFS2_SB(info->dqi_gi.dqi_sb); 576 } 577 578 static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres) 579 { 580 struct ocfs2_file_private *fp = lockres->l_priv; 581 582 return OCFS2_SB(fp->fp_file->f_mapping->host->i_sb); 583 } 584 585 static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres) 586 { 587 __be64 inode_blkno_be; 588 589 memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], 590 sizeof(__be64)); 591 592 return be64_to_cpu(inode_blkno_be); 593 } 594 595 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres) 596 { 597 struct ocfs2_dentry_lock *dl = lockres->l_priv; 598 599 return OCFS2_SB(dl->dl_inode->i_sb); 600 } 601 602 void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl, 603 u64 parent, struct inode *inode) 604 { 605 int len; 606 u64 inode_blkno = OCFS2_I(inode)->ip_blkno; 607 __be64 inode_blkno_be = cpu_to_be64(inode_blkno); 608 struct ocfs2_lock_res *lockres = &dl->dl_lockres; 609 610 ocfs2_lock_res_init_once(lockres); 611 612 /* 613 * Unfortunately, the standard lock naming scheme won't work 614 * here because we have two 16 byte values to use. Instead, 615 * we'll stuff the inode number as a binary value. We still 616 * want error prints to show something without garbling the 617 * display, so drop a null byte in there before the inode 618 * number. A future version of OCFS2 will likely use all 619 * binary lock names. The stringified names have been a 620 * tremendous aid in debugging, but now that the debugfs 621 * interface exists, we can mangle things there if need be. 622 * 623 * NOTE: We also drop the standard "pad" value (the total lock 624 * name size stays the same though - the last part is all 625 * zeros due to the memset in ocfs2_lock_res_init_once() 626 */ 627 len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START, 628 "%c%016llx", 629 ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY), 630 (long long)parent); 631 632 BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1)); 633 634 memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be, 635 sizeof(__be64)); 636 637 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres, 638 OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops, 639 dl); 640 } 641 642 static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res, 643 struct ocfs2_super *osb) 644 { 645 /* Superblock lockres doesn't come from a slab so we call init 646 * once on it manually. */ 647 ocfs2_lock_res_init_once(res); 648 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO, 649 0, res->l_name); 650 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER, 651 &ocfs2_super_lops, osb); 652 } 653 654 static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res, 655 struct ocfs2_super *osb) 656 { 657 /* Rename lockres doesn't come from a slab so we call init 658 * once on it manually. */ 659 ocfs2_lock_res_init_once(res); 660 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name); 661 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME, 662 &ocfs2_rename_lops, osb); 663 } 664 665 static void ocfs2_nfs_sync_lock_res_init(struct ocfs2_lock_res *res, 666 struct ocfs2_super *osb) 667 { 668 /* nfs_sync lockres doesn't come from a slab so we call init 669 * once on it manually. */ 670 ocfs2_lock_res_init_once(res); 671 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_NFS_SYNC, 0, 0, res->l_name); 672 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_NFS_SYNC, 673 &ocfs2_nfs_sync_lops, osb); 674 } 675 676 static void ocfs2_orphan_scan_lock_res_init(struct ocfs2_lock_res *res, 677 struct ocfs2_super *osb) 678 { 679 ocfs2_lock_res_init_once(res); 680 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_ORPHAN_SCAN, 0, 0, res->l_name); 681 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_ORPHAN_SCAN, 682 &ocfs2_orphan_scan_lops, osb); 683 } 684 685 void ocfs2_file_lock_res_init(struct ocfs2_lock_res *lockres, 686 struct ocfs2_file_private *fp) 687 { 688 struct inode *inode = fp->fp_file->f_mapping->host; 689 struct ocfs2_inode_info *oi = OCFS2_I(inode); 690 691 ocfs2_lock_res_init_once(lockres); 692 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_FLOCK, oi->ip_blkno, 693 inode->i_generation, lockres->l_name); 694 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres, 695 OCFS2_LOCK_TYPE_FLOCK, &ocfs2_flock_lops, 696 fp); 697 lockres->l_flags |= OCFS2_LOCK_NOCACHE; 698 } 699 700 void ocfs2_qinfo_lock_res_init(struct ocfs2_lock_res *lockres, 701 struct ocfs2_mem_dqinfo *info) 702 { 703 ocfs2_lock_res_init_once(lockres); 704 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_QINFO, info->dqi_gi.dqi_type, 705 0, lockres->l_name); 706 ocfs2_lock_res_init_common(OCFS2_SB(info->dqi_gi.dqi_sb), lockres, 707 OCFS2_LOCK_TYPE_QINFO, &ocfs2_qinfo_lops, 708 info); 709 } 710 711 void ocfs2_refcount_lock_res_init(struct ocfs2_lock_res *lockres, 712 struct ocfs2_super *osb, u64 ref_blkno, 713 unsigned int generation) 714 { 715 ocfs2_lock_res_init_once(lockres); 716 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_REFCOUNT, ref_blkno, 717 generation, lockres->l_name); 718 ocfs2_lock_res_init_common(osb, lockres, OCFS2_LOCK_TYPE_REFCOUNT, 719 &ocfs2_refcount_block_lops, osb); 720 } 721 722 void ocfs2_lock_res_free(struct ocfs2_lock_res *res) 723 { 724 if (!(res->l_flags & OCFS2_LOCK_INITIALIZED)) 725 return; 726 727 ocfs2_remove_lockres_tracking(res); 728 729 mlog_bug_on_msg(!list_empty(&res->l_blocked_list), 730 "Lockres %s is on the blocked list\n", 731 res->l_name); 732 mlog_bug_on_msg(!list_empty(&res->l_mask_waiters), 733 "Lockres %s has mask waiters pending\n", 734 res->l_name); 735 mlog_bug_on_msg(spin_is_locked(&res->l_lock), 736 "Lockres %s is locked\n", 737 res->l_name); 738 mlog_bug_on_msg(res->l_ro_holders, 739 "Lockres %s has %u ro holders\n", 740 res->l_name, res->l_ro_holders); 741 mlog_bug_on_msg(res->l_ex_holders, 742 "Lockres %s has %u ex holders\n", 743 res->l_name, res->l_ex_holders); 744 745 /* Need to clear out the lock status block for the dlm */ 746 memset(&res->l_lksb, 0, sizeof(res->l_lksb)); 747 748 res->l_flags = 0UL; 749 } 750 751 static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres, 752 int level) 753 { 754 BUG_ON(!lockres); 755 756 switch(level) { 757 case DLM_LOCK_EX: 758 lockres->l_ex_holders++; 759 break; 760 case DLM_LOCK_PR: 761 lockres->l_ro_holders++; 762 break; 763 default: 764 BUG(); 765 } 766 } 767 768 static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres, 769 int level) 770 { 771 BUG_ON(!lockres); 772 773 switch(level) { 774 case DLM_LOCK_EX: 775 BUG_ON(!lockres->l_ex_holders); 776 lockres->l_ex_holders--; 777 break; 778 case DLM_LOCK_PR: 779 BUG_ON(!lockres->l_ro_holders); 780 lockres->l_ro_holders--; 781 break; 782 default: 783 BUG(); 784 } 785 } 786 787 /* WARNING: This function lives in a world where the only three lock 788 * levels are EX, PR, and NL. It *will* have to be adjusted when more 789 * lock types are added. */ 790 static inline int ocfs2_highest_compat_lock_level(int level) 791 { 792 int new_level = DLM_LOCK_EX; 793 794 if (level == DLM_LOCK_EX) 795 new_level = DLM_LOCK_NL; 796 else if (level == DLM_LOCK_PR) 797 new_level = DLM_LOCK_PR; 798 return new_level; 799 } 800 801 static void lockres_set_flags(struct ocfs2_lock_res *lockres, 802 unsigned long newflags) 803 { 804 struct ocfs2_mask_waiter *mw, *tmp; 805 806 assert_spin_locked(&lockres->l_lock); 807 808 lockres->l_flags = newflags; 809 810 list_for_each_entry_safe(mw, tmp, &lockres->l_mask_waiters, mw_item) { 811 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal) 812 continue; 813 814 list_del_init(&mw->mw_item); 815 mw->mw_status = 0; 816 complete(&mw->mw_complete); 817 } 818 } 819 static void lockres_or_flags(struct ocfs2_lock_res *lockres, unsigned long or) 820 { 821 lockres_set_flags(lockres, lockres->l_flags | or); 822 } 823 static void lockres_clear_flags(struct ocfs2_lock_res *lockres, 824 unsigned long clear) 825 { 826 lockres_set_flags(lockres, lockres->l_flags & ~clear); 827 } 828 829 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres) 830 { 831 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 832 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED)); 833 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); 834 BUG_ON(lockres->l_blocking <= DLM_LOCK_NL); 835 836 lockres->l_level = lockres->l_requested; 837 if (lockres->l_level <= 838 ocfs2_highest_compat_lock_level(lockres->l_blocking)) { 839 lockres->l_blocking = DLM_LOCK_NL; 840 lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED); 841 } 842 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 843 } 844 845 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres) 846 { 847 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 848 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED)); 849 850 /* Convert from RO to EX doesn't really need anything as our 851 * information is already up to data. Convert from NL to 852 * *anything* however should mark ourselves as needing an 853 * update */ 854 if (lockres->l_level == DLM_LOCK_NL && 855 lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 856 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 857 858 lockres->l_level = lockres->l_requested; 859 860 /* 861 * We set the OCFS2_LOCK_UPCONVERT_FINISHING flag before clearing 862 * the OCFS2_LOCK_BUSY flag to prevent the dc thread from 863 * downconverting the lock before the upconvert has fully completed. 864 */ 865 lockres_or_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING); 866 867 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 868 } 869 870 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres) 871 { 872 BUG_ON((!(lockres->l_flags & OCFS2_LOCK_BUSY))); 873 BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED); 874 875 if (lockres->l_requested > DLM_LOCK_NL && 876 !(lockres->l_flags & OCFS2_LOCK_LOCAL) && 877 lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 878 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 879 880 lockres->l_level = lockres->l_requested; 881 lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED); 882 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 883 } 884 885 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, 886 int level) 887 { 888 int needs_downconvert = 0; 889 890 assert_spin_locked(&lockres->l_lock); 891 892 if (level > lockres->l_blocking) { 893 /* only schedule a downconvert if we haven't already scheduled 894 * one that goes low enough to satisfy the level we're 895 * blocking. this also catches the case where we get 896 * duplicate BASTs */ 897 if (ocfs2_highest_compat_lock_level(level) < 898 ocfs2_highest_compat_lock_level(lockres->l_blocking)) 899 needs_downconvert = 1; 900 901 lockres->l_blocking = level; 902 } 903 904 mlog(ML_BASTS, "lockres %s, block %d, level %d, l_block %d, dwn %d\n", 905 lockres->l_name, level, lockres->l_level, lockres->l_blocking, 906 needs_downconvert); 907 908 if (needs_downconvert) 909 lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED); 910 mlog(0, "needs_downconvert = %d\n", needs_downconvert); 911 return needs_downconvert; 912 } 913 914 /* 915 * OCFS2_LOCK_PENDING and l_pending_gen. 916 * 917 * Why does OCFS2_LOCK_PENDING exist? To close a race between setting 918 * OCFS2_LOCK_BUSY and calling ocfs2_dlm_lock(). See ocfs2_unblock_lock() 919 * for more details on the race. 920 * 921 * OCFS2_LOCK_PENDING closes the race quite nicely. However, it introduces 922 * a race on itself. In o2dlm, we can get the ast before ocfs2_dlm_lock() 923 * returns. The ast clears OCFS2_LOCK_BUSY, and must therefore clear 924 * OCFS2_LOCK_PENDING at the same time. When ocfs2_dlm_lock() returns, 925 * the caller is going to try to clear PENDING again. If nothing else is 926 * happening, __lockres_clear_pending() sees PENDING is unset and does 927 * nothing. 928 * 929 * But what if another path (eg downconvert thread) has just started a 930 * new locking action? The other path has re-set PENDING. Our path 931 * cannot clear PENDING, because that will re-open the original race 932 * window. 933 * 934 * [Example] 935 * 936 * ocfs2_meta_lock() 937 * ocfs2_cluster_lock() 938 * set BUSY 939 * set PENDING 940 * drop l_lock 941 * ocfs2_dlm_lock() 942 * ocfs2_locking_ast() ocfs2_downconvert_thread() 943 * clear PENDING ocfs2_unblock_lock() 944 * take_l_lock 945 * !BUSY 946 * ocfs2_prepare_downconvert() 947 * set BUSY 948 * set PENDING 949 * drop l_lock 950 * take l_lock 951 * clear PENDING 952 * drop l_lock 953 * <window> 954 * ocfs2_dlm_lock() 955 * 956 * So as you can see, we now have a window where l_lock is not held, 957 * PENDING is not set, and ocfs2_dlm_lock() has not been called. 958 * 959 * The core problem is that ocfs2_cluster_lock() has cleared the PENDING 960 * set by ocfs2_prepare_downconvert(). That wasn't nice. 961 * 962 * To solve this we introduce l_pending_gen. A call to 963 * lockres_clear_pending() will only do so when it is passed a generation 964 * number that matches the lockres. lockres_set_pending() will return the 965 * current generation number. When ocfs2_cluster_lock() goes to clear 966 * PENDING, it passes the generation it got from set_pending(). In our 967 * example above, the generation numbers will *not* match. Thus, 968 * ocfs2_cluster_lock() will not clear the PENDING set by 969 * ocfs2_prepare_downconvert(). 970 */ 971 972 /* Unlocked version for ocfs2_locking_ast() */ 973 static void __lockres_clear_pending(struct ocfs2_lock_res *lockres, 974 unsigned int generation, 975 struct ocfs2_super *osb) 976 { 977 assert_spin_locked(&lockres->l_lock); 978 979 /* 980 * The ast and locking functions can race us here. The winner 981 * will clear pending, the loser will not. 982 */ 983 if (!(lockres->l_flags & OCFS2_LOCK_PENDING) || 984 (lockres->l_pending_gen != generation)) 985 return; 986 987 lockres_clear_flags(lockres, OCFS2_LOCK_PENDING); 988 lockres->l_pending_gen++; 989 990 /* 991 * The downconvert thread may have skipped us because we 992 * were PENDING. Wake it up. 993 */ 994 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) 995 ocfs2_wake_downconvert_thread(osb); 996 } 997 998 /* Locked version for callers of ocfs2_dlm_lock() */ 999 static void lockres_clear_pending(struct ocfs2_lock_res *lockres, 1000 unsigned int generation, 1001 struct ocfs2_super *osb) 1002 { 1003 unsigned long flags; 1004 1005 spin_lock_irqsave(&lockres->l_lock, flags); 1006 __lockres_clear_pending(lockres, generation, osb); 1007 spin_unlock_irqrestore(&lockres->l_lock, flags); 1008 } 1009 1010 static unsigned int lockres_set_pending(struct ocfs2_lock_res *lockres) 1011 { 1012 assert_spin_locked(&lockres->l_lock); 1013 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 1014 1015 lockres_or_flags(lockres, OCFS2_LOCK_PENDING); 1016 1017 return lockres->l_pending_gen; 1018 } 1019 1020 static void ocfs2_blocking_ast(struct ocfs2_dlm_lksb *lksb, int level) 1021 { 1022 struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb); 1023 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 1024 int needs_downconvert; 1025 unsigned long flags; 1026 1027 BUG_ON(level <= DLM_LOCK_NL); 1028 1029 mlog(ML_BASTS, "BAST fired for lockres %s, blocking %d, level %d, " 1030 "type %s\n", lockres->l_name, level, lockres->l_level, 1031 ocfs2_lock_type_string(lockres->l_type)); 1032 1033 /* 1034 * We can skip the bast for locks which don't enable caching - 1035 * they'll be dropped at the earliest possible time anyway. 1036 */ 1037 if (lockres->l_flags & OCFS2_LOCK_NOCACHE) 1038 return; 1039 1040 spin_lock_irqsave(&lockres->l_lock, flags); 1041 needs_downconvert = ocfs2_generic_handle_bast(lockres, level); 1042 if (needs_downconvert) 1043 ocfs2_schedule_blocked_lock(osb, lockres); 1044 spin_unlock_irqrestore(&lockres->l_lock, flags); 1045 1046 wake_up(&lockres->l_event); 1047 1048 ocfs2_wake_downconvert_thread(osb); 1049 } 1050 1051 static void ocfs2_locking_ast(struct ocfs2_dlm_lksb *lksb) 1052 { 1053 struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb); 1054 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 1055 unsigned long flags; 1056 int status; 1057 1058 spin_lock_irqsave(&lockres->l_lock, flags); 1059 1060 status = ocfs2_dlm_lock_status(&lockres->l_lksb); 1061 1062 if (status == -EAGAIN) { 1063 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 1064 goto out; 1065 } 1066 1067 if (status) { 1068 mlog(ML_ERROR, "lockres %s: lksb status value of %d!\n", 1069 lockres->l_name, status); 1070 spin_unlock_irqrestore(&lockres->l_lock, flags); 1071 return; 1072 } 1073 1074 mlog(ML_BASTS, "AST fired for lockres %s, action %d, unlock %d, " 1075 "level %d => %d\n", lockres->l_name, lockres->l_action, 1076 lockres->l_unlock_action, lockres->l_level, lockres->l_requested); 1077 1078 switch(lockres->l_action) { 1079 case OCFS2_AST_ATTACH: 1080 ocfs2_generic_handle_attach_action(lockres); 1081 lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL); 1082 break; 1083 case OCFS2_AST_CONVERT: 1084 ocfs2_generic_handle_convert_action(lockres); 1085 break; 1086 case OCFS2_AST_DOWNCONVERT: 1087 ocfs2_generic_handle_downconvert_action(lockres); 1088 break; 1089 default: 1090 mlog(ML_ERROR, "lockres %s: AST fired with invalid action: %u, " 1091 "flags 0x%lx, unlock: %u\n", 1092 lockres->l_name, lockres->l_action, lockres->l_flags, 1093 lockres->l_unlock_action); 1094 BUG(); 1095 } 1096 out: 1097 /* set it to something invalid so if we get called again we 1098 * can catch it. */ 1099 lockres->l_action = OCFS2_AST_INVALID; 1100 1101 /* Did we try to cancel this lock? Clear that state */ 1102 if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) 1103 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 1104 1105 /* 1106 * We may have beaten the locking functions here. We certainly 1107 * know that dlm_lock() has been called :-) 1108 * Because we can't have two lock calls in flight at once, we 1109 * can use lockres->l_pending_gen. 1110 */ 1111 __lockres_clear_pending(lockres, lockres->l_pending_gen, osb); 1112 1113 wake_up(&lockres->l_event); 1114 spin_unlock_irqrestore(&lockres->l_lock, flags); 1115 } 1116 1117 static void ocfs2_unlock_ast(struct ocfs2_dlm_lksb *lksb, int error) 1118 { 1119 struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb); 1120 unsigned long flags; 1121 1122 mlog(ML_BASTS, "UNLOCK AST fired for lockres %s, action = %d\n", 1123 lockres->l_name, lockres->l_unlock_action); 1124 1125 spin_lock_irqsave(&lockres->l_lock, flags); 1126 if (error) { 1127 mlog(ML_ERROR, "Dlm passes error %d for lock %s, " 1128 "unlock_action %d\n", error, lockres->l_name, 1129 lockres->l_unlock_action); 1130 spin_unlock_irqrestore(&lockres->l_lock, flags); 1131 return; 1132 } 1133 1134 switch(lockres->l_unlock_action) { 1135 case OCFS2_UNLOCK_CANCEL_CONVERT: 1136 mlog(0, "Cancel convert success for %s\n", lockres->l_name); 1137 lockres->l_action = OCFS2_AST_INVALID; 1138 /* Downconvert thread may have requeued this lock, we 1139 * need to wake it. */ 1140 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) 1141 ocfs2_wake_downconvert_thread(ocfs2_get_lockres_osb(lockres)); 1142 break; 1143 case OCFS2_UNLOCK_DROP_LOCK: 1144 lockres->l_level = DLM_LOCK_IV; 1145 break; 1146 default: 1147 BUG(); 1148 } 1149 1150 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 1151 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 1152 wake_up(&lockres->l_event); 1153 spin_unlock_irqrestore(&lockres->l_lock, flags); 1154 } 1155 1156 /* 1157 * This is the filesystem locking protocol. It provides the lock handling 1158 * hooks for the underlying DLM. It has a maximum version number. 1159 * The version number allows interoperability with systems running at 1160 * the same major number and an equal or smaller minor number. 1161 * 1162 * Whenever the filesystem does new things with locks (adds or removes a 1163 * lock, orders them differently, does different things underneath a lock), 1164 * the version must be changed. The protocol is negotiated when joining 1165 * the dlm domain. A node may join the domain if its major version is 1166 * identical to all other nodes and its minor version is greater than 1167 * or equal to all other nodes. When its minor version is greater than 1168 * the other nodes, it will run at the minor version specified by the 1169 * other nodes. 1170 * 1171 * If a locking change is made that will not be compatible with older 1172 * versions, the major number must be increased and the minor version set 1173 * to zero. If a change merely adds a behavior that can be disabled when 1174 * speaking to older versions, the minor version must be increased. If a 1175 * change adds a fully backwards compatible change (eg, LVB changes that 1176 * are just ignored by older versions), the version does not need to be 1177 * updated. 1178 */ 1179 static struct ocfs2_locking_protocol lproto = { 1180 .lp_max_version = { 1181 .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR, 1182 .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR, 1183 }, 1184 .lp_lock_ast = ocfs2_locking_ast, 1185 .lp_blocking_ast = ocfs2_blocking_ast, 1186 .lp_unlock_ast = ocfs2_unlock_ast, 1187 }; 1188 1189 void ocfs2_set_locking_protocol(void) 1190 { 1191 ocfs2_stack_glue_set_max_proto_version(&lproto.lp_max_version); 1192 } 1193 1194 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres, 1195 int convert) 1196 { 1197 unsigned long flags; 1198 1199 spin_lock_irqsave(&lockres->l_lock, flags); 1200 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 1201 lockres_clear_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING); 1202 if (convert) 1203 lockres->l_action = OCFS2_AST_INVALID; 1204 else 1205 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 1206 spin_unlock_irqrestore(&lockres->l_lock, flags); 1207 1208 wake_up(&lockres->l_event); 1209 } 1210 1211 /* Note: If we detect another process working on the lock (i.e., 1212 * OCFS2_LOCK_BUSY), we'll bail out returning 0. It's up to the caller 1213 * to do the right thing in that case. 1214 */ 1215 static int ocfs2_lock_create(struct ocfs2_super *osb, 1216 struct ocfs2_lock_res *lockres, 1217 int level, 1218 u32 dlm_flags) 1219 { 1220 int ret = 0; 1221 unsigned long flags; 1222 unsigned int gen; 1223 1224 mlog(0, "lock %s, level = %d, flags = %u\n", lockres->l_name, level, 1225 dlm_flags); 1226 1227 spin_lock_irqsave(&lockres->l_lock, flags); 1228 if ((lockres->l_flags & OCFS2_LOCK_ATTACHED) || 1229 (lockres->l_flags & OCFS2_LOCK_BUSY)) { 1230 spin_unlock_irqrestore(&lockres->l_lock, flags); 1231 goto bail; 1232 } 1233 1234 lockres->l_action = OCFS2_AST_ATTACH; 1235 lockres->l_requested = level; 1236 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 1237 gen = lockres_set_pending(lockres); 1238 spin_unlock_irqrestore(&lockres->l_lock, flags); 1239 1240 ret = ocfs2_dlm_lock(osb->cconn, 1241 level, 1242 &lockres->l_lksb, 1243 dlm_flags, 1244 lockres->l_name, 1245 OCFS2_LOCK_ID_MAX_LEN - 1); 1246 lockres_clear_pending(lockres, gen, osb); 1247 if (ret) { 1248 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 1249 ocfs2_recover_from_dlm_error(lockres, 1); 1250 } 1251 1252 mlog(0, "lock %s, return from ocfs2_dlm_lock\n", lockres->l_name); 1253 1254 bail: 1255 return ret; 1256 } 1257 1258 static inline int ocfs2_check_wait_flag(struct ocfs2_lock_res *lockres, 1259 int flag) 1260 { 1261 unsigned long flags; 1262 int ret; 1263 1264 spin_lock_irqsave(&lockres->l_lock, flags); 1265 ret = lockres->l_flags & flag; 1266 spin_unlock_irqrestore(&lockres->l_lock, flags); 1267 1268 return ret; 1269 } 1270 1271 static inline void ocfs2_wait_on_busy_lock(struct ocfs2_lock_res *lockres) 1272 1273 { 1274 wait_event(lockres->l_event, 1275 !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_BUSY)); 1276 } 1277 1278 static inline void ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res *lockres) 1279 1280 { 1281 wait_event(lockres->l_event, 1282 !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_REFRESHING)); 1283 } 1284 1285 /* predict what lock level we'll be dropping down to on behalf 1286 * of another node, and return true if the currently wanted 1287 * level will be compatible with it. */ 1288 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres, 1289 int wanted) 1290 { 1291 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); 1292 1293 return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking); 1294 } 1295 1296 static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw) 1297 { 1298 INIT_LIST_HEAD(&mw->mw_item); 1299 init_completion(&mw->mw_complete); 1300 ocfs2_init_start_time(mw); 1301 } 1302 1303 static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw) 1304 { 1305 wait_for_completion(&mw->mw_complete); 1306 /* Re-arm the completion in case we want to wait on it again */ 1307 reinit_completion(&mw->mw_complete); 1308 return mw->mw_status; 1309 } 1310 1311 static void lockres_add_mask_waiter(struct ocfs2_lock_res *lockres, 1312 struct ocfs2_mask_waiter *mw, 1313 unsigned long mask, 1314 unsigned long goal) 1315 { 1316 BUG_ON(!list_empty(&mw->mw_item)); 1317 1318 assert_spin_locked(&lockres->l_lock); 1319 1320 list_add_tail(&mw->mw_item, &lockres->l_mask_waiters); 1321 mw->mw_mask = mask; 1322 mw->mw_goal = goal; 1323 } 1324 1325 /* returns 0 if the mw that was removed was already satisfied, -EBUSY 1326 * if the mask still hadn't reached its goal */ 1327 static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres, 1328 struct ocfs2_mask_waiter *mw) 1329 { 1330 unsigned long flags; 1331 int ret = 0; 1332 1333 spin_lock_irqsave(&lockres->l_lock, flags); 1334 if (!list_empty(&mw->mw_item)) { 1335 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal) 1336 ret = -EBUSY; 1337 1338 list_del_init(&mw->mw_item); 1339 init_completion(&mw->mw_complete); 1340 } 1341 spin_unlock_irqrestore(&lockres->l_lock, flags); 1342 1343 return ret; 1344 1345 } 1346 1347 static int ocfs2_wait_for_mask_interruptible(struct ocfs2_mask_waiter *mw, 1348 struct ocfs2_lock_res *lockres) 1349 { 1350 int ret; 1351 1352 ret = wait_for_completion_interruptible(&mw->mw_complete); 1353 if (ret) 1354 lockres_remove_mask_waiter(lockres, mw); 1355 else 1356 ret = mw->mw_status; 1357 /* Re-arm the completion in case we want to wait on it again */ 1358 reinit_completion(&mw->mw_complete); 1359 return ret; 1360 } 1361 1362 static int __ocfs2_cluster_lock(struct ocfs2_super *osb, 1363 struct ocfs2_lock_res *lockres, 1364 int level, 1365 u32 lkm_flags, 1366 int arg_flags, 1367 int l_subclass, 1368 unsigned long caller_ip) 1369 { 1370 struct ocfs2_mask_waiter mw; 1371 int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR); 1372 int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */ 1373 unsigned long flags; 1374 unsigned int gen; 1375 int noqueue_attempted = 0; 1376 1377 ocfs2_init_mask_waiter(&mw); 1378 1379 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) 1380 lkm_flags |= DLM_LKF_VALBLK; 1381 1382 again: 1383 wait = 0; 1384 1385 spin_lock_irqsave(&lockres->l_lock, flags); 1386 1387 if (catch_signals && signal_pending(current)) { 1388 ret = -ERESTARTSYS; 1389 goto unlock; 1390 } 1391 1392 mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING, 1393 "Cluster lock called on freeing lockres %s! flags " 1394 "0x%lx\n", lockres->l_name, lockres->l_flags); 1395 1396 /* We only compare against the currently granted level 1397 * here. If the lock is blocked waiting on a downconvert, 1398 * we'll get caught below. */ 1399 if (lockres->l_flags & OCFS2_LOCK_BUSY && 1400 level > lockres->l_level) { 1401 /* is someone sitting in dlm_lock? If so, wait on 1402 * them. */ 1403 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1404 wait = 1; 1405 goto unlock; 1406 } 1407 1408 if (lockres->l_flags & OCFS2_LOCK_UPCONVERT_FINISHING) { 1409 /* 1410 * We've upconverted. If the lock now has a level we can 1411 * work with, we take it. If, however, the lock is not at the 1412 * required level, we go thru the full cycle. One way this could 1413 * happen is if a process requesting an upconvert to PR is 1414 * closely followed by another requesting upconvert to an EX. 1415 * If the process requesting EX lands here, we want it to 1416 * continue attempting to upconvert and let the process 1417 * requesting PR take the lock. 1418 * If multiple processes request upconvert to PR, the first one 1419 * here will take the lock. The others will have to go thru the 1420 * OCFS2_LOCK_BLOCKED check to ensure that there is no pending 1421 * downconvert request. 1422 */ 1423 if (level <= lockres->l_level) 1424 goto update_holders; 1425 } 1426 1427 if (lockres->l_flags & OCFS2_LOCK_BLOCKED && 1428 !ocfs2_may_continue_on_blocked_lock(lockres, level)) { 1429 /* is the lock is currently blocked on behalf of 1430 * another node */ 1431 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BLOCKED, 0); 1432 wait = 1; 1433 goto unlock; 1434 } 1435 1436 if (level > lockres->l_level) { 1437 if (noqueue_attempted > 0) { 1438 ret = -EAGAIN; 1439 goto unlock; 1440 } 1441 if (lkm_flags & DLM_LKF_NOQUEUE) 1442 noqueue_attempted = 1; 1443 1444 if (lockres->l_action != OCFS2_AST_INVALID) 1445 mlog(ML_ERROR, "lockres %s has action %u pending\n", 1446 lockres->l_name, lockres->l_action); 1447 1448 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 1449 lockres->l_action = OCFS2_AST_ATTACH; 1450 lkm_flags &= ~DLM_LKF_CONVERT; 1451 } else { 1452 lockres->l_action = OCFS2_AST_CONVERT; 1453 lkm_flags |= DLM_LKF_CONVERT; 1454 } 1455 1456 lockres->l_requested = level; 1457 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 1458 gen = lockres_set_pending(lockres); 1459 spin_unlock_irqrestore(&lockres->l_lock, flags); 1460 1461 BUG_ON(level == DLM_LOCK_IV); 1462 BUG_ON(level == DLM_LOCK_NL); 1463 1464 mlog(ML_BASTS, "lockres %s, convert from %d to %d\n", 1465 lockres->l_name, lockres->l_level, level); 1466 1467 /* call dlm_lock to upgrade lock now */ 1468 ret = ocfs2_dlm_lock(osb->cconn, 1469 level, 1470 &lockres->l_lksb, 1471 lkm_flags, 1472 lockres->l_name, 1473 OCFS2_LOCK_ID_MAX_LEN - 1); 1474 lockres_clear_pending(lockres, gen, osb); 1475 if (ret) { 1476 if (!(lkm_flags & DLM_LKF_NOQUEUE) || 1477 (ret != -EAGAIN)) { 1478 ocfs2_log_dlm_error("ocfs2_dlm_lock", 1479 ret, lockres); 1480 } 1481 ocfs2_recover_from_dlm_error(lockres, 1); 1482 goto out; 1483 } 1484 1485 mlog(0, "lock %s, successful return from ocfs2_dlm_lock\n", 1486 lockres->l_name); 1487 1488 /* At this point we've gone inside the dlm and need to 1489 * complete our work regardless. */ 1490 catch_signals = 0; 1491 1492 /* wait for busy to clear and carry on */ 1493 goto again; 1494 } 1495 1496 update_holders: 1497 /* Ok, if we get here then we're good to go. */ 1498 ocfs2_inc_holders(lockres, level); 1499 1500 ret = 0; 1501 unlock: 1502 lockres_clear_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING); 1503 1504 spin_unlock_irqrestore(&lockres->l_lock, flags); 1505 out: 1506 /* 1507 * This is helping work around a lock inversion between the page lock 1508 * and dlm locks. One path holds the page lock while calling aops 1509 * which block acquiring dlm locks. The voting thread holds dlm 1510 * locks while acquiring page locks while down converting data locks. 1511 * This block is helping an aop path notice the inversion and back 1512 * off to unlock its page lock before trying the dlm lock again. 1513 */ 1514 if (wait && arg_flags & OCFS2_LOCK_NONBLOCK && 1515 mw.mw_mask & (OCFS2_LOCK_BUSY|OCFS2_LOCK_BLOCKED)) { 1516 wait = 0; 1517 if (lockres_remove_mask_waiter(lockres, &mw)) 1518 ret = -EAGAIN; 1519 else 1520 goto again; 1521 } 1522 if (wait) { 1523 ret = ocfs2_wait_for_mask(&mw); 1524 if (ret == 0) 1525 goto again; 1526 mlog_errno(ret); 1527 } 1528 ocfs2_update_lock_stats(lockres, level, &mw, ret); 1529 1530 #ifdef CONFIG_DEBUG_LOCK_ALLOC 1531 if (!ret && lockres->l_lockdep_map.key != NULL) { 1532 if (level == DLM_LOCK_PR) 1533 rwsem_acquire_read(&lockres->l_lockdep_map, l_subclass, 1534 !!(arg_flags & OCFS2_META_LOCK_NOQUEUE), 1535 caller_ip); 1536 else 1537 rwsem_acquire(&lockres->l_lockdep_map, l_subclass, 1538 !!(arg_flags & OCFS2_META_LOCK_NOQUEUE), 1539 caller_ip); 1540 } 1541 #endif 1542 return ret; 1543 } 1544 1545 static inline int ocfs2_cluster_lock(struct ocfs2_super *osb, 1546 struct ocfs2_lock_res *lockres, 1547 int level, 1548 u32 lkm_flags, 1549 int arg_flags) 1550 { 1551 return __ocfs2_cluster_lock(osb, lockres, level, lkm_flags, arg_flags, 1552 0, _RET_IP_); 1553 } 1554 1555 1556 static void __ocfs2_cluster_unlock(struct ocfs2_super *osb, 1557 struct ocfs2_lock_res *lockres, 1558 int level, 1559 unsigned long caller_ip) 1560 { 1561 unsigned long flags; 1562 1563 spin_lock_irqsave(&lockres->l_lock, flags); 1564 ocfs2_dec_holders(lockres, level); 1565 ocfs2_downconvert_on_unlock(osb, lockres); 1566 spin_unlock_irqrestore(&lockres->l_lock, flags); 1567 #ifdef CONFIG_DEBUG_LOCK_ALLOC 1568 if (lockres->l_lockdep_map.key != NULL) 1569 rwsem_release(&lockres->l_lockdep_map, 1, caller_ip); 1570 #endif 1571 } 1572 1573 static int ocfs2_create_new_lock(struct ocfs2_super *osb, 1574 struct ocfs2_lock_res *lockres, 1575 int ex, 1576 int local) 1577 { 1578 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 1579 unsigned long flags; 1580 u32 lkm_flags = local ? DLM_LKF_LOCAL : 0; 1581 1582 spin_lock_irqsave(&lockres->l_lock, flags); 1583 BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED); 1584 lockres_or_flags(lockres, OCFS2_LOCK_LOCAL); 1585 spin_unlock_irqrestore(&lockres->l_lock, flags); 1586 1587 return ocfs2_lock_create(osb, lockres, level, lkm_flags); 1588 } 1589 1590 /* Grants us an EX lock on the data and metadata resources, skipping 1591 * the normal cluster directory lookup. Use this ONLY on newly created 1592 * inodes which other nodes can't possibly see, and which haven't been 1593 * hashed in the inode hash yet. This can give us a good performance 1594 * increase as it'll skip the network broadcast normally associated 1595 * with creating a new lock resource. */ 1596 int ocfs2_create_new_inode_locks(struct inode *inode) 1597 { 1598 int ret; 1599 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1600 1601 BUG_ON(!inode); 1602 BUG_ON(!ocfs2_inode_is_new(inode)); 1603 1604 mlog(0, "Inode %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno); 1605 1606 /* NOTE: That we don't increment any of the holder counts, nor 1607 * do we add anything to a journal handle. Since this is 1608 * supposed to be a new inode which the cluster doesn't know 1609 * about yet, there is no need to. As far as the LVB handling 1610 * is concerned, this is basically like acquiring an EX lock 1611 * on a resource which has an invalid one -- we'll set it 1612 * valid when we release the EX. */ 1613 1614 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1, 1); 1615 if (ret) { 1616 mlog_errno(ret); 1617 goto bail; 1618 } 1619 1620 /* 1621 * We don't want to use DLM_LKF_LOCAL on a meta data lock as they 1622 * don't use a generation in their lock names. 1623 */ 1624 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_inode_lockres, 1, 0); 1625 if (ret) { 1626 mlog_errno(ret); 1627 goto bail; 1628 } 1629 1630 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_open_lockres, 0, 0); 1631 if (ret) { 1632 mlog_errno(ret); 1633 goto bail; 1634 } 1635 1636 bail: 1637 return ret; 1638 } 1639 1640 int ocfs2_rw_lock(struct inode *inode, int write) 1641 { 1642 int status, level; 1643 struct ocfs2_lock_res *lockres; 1644 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1645 1646 BUG_ON(!inode); 1647 1648 mlog(0, "inode %llu take %s RW lock\n", 1649 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1650 write ? "EXMODE" : "PRMODE"); 1651 1652 if (ocfs2_mount_local(osb)) 1653 return 0; 1654 1655 lockres = &OCFS2_I(inode)->ip_rw_lockres; 1656 1657 level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1658 1659 status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0, 1660 0); 1661 if (status < 0) 1662 mlog_errno(status); 1663 1664 return status; 1665 } 1666 1667 void ocfs2_rw_unlock(struct inode *inode, int write) 1668 { 1669 int level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1670 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres; 1671 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1672 1673 mlog(0, "inode %llu drop %s RW lock\n", 1674 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1675 write ? "EXMODE" : "PRMODE"); 1676 1677 if (!ocfs2_mount_local(osb)) 1678 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level); 1679 } 1680 1681 /* 1682 * ocfs2_open_lock always get PR mode lock. 1683 */ 1684 int ocfs2_open_lock(struct inode *inode) 1685 { 1686 int status = 0; 1687 struct ocfs2_lock_res *lockres; 1688 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1689 1690 BUG_ON(!inode); 1691 1692 mlog(0, "inode %llu take PRMODE open lock\n", 1693 (unsigned long long)OCFS2_I(inode)->ip_blkno); 1694 1695 if (ocfs2_is_hard_readonly(osb) || ocfs2_mount_local(osb)) 1696 goto out; 1697 1698 lockres = &OCFS2_I(inode)->ip_open_lockres; 1699 1700 status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, 1701 DLM_LOCK_PR, 0, 0); 1702 if (status < 0) 1703 mlog_errno(status); 1704 1705 out: 1706 return status; 1707 } 1708 1709 int ocfs2_try_open_lock(struct inode *inode, int write) 1710 { 1711 int status = 0, level; 1712 struct ocfs2_lock_res *lockres; 1713 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1714 1715 BUG_ON(!inode); 1716 1717 mlog(0, "inode %llu try to take %s open lock\n", 1718 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1719 write ? "EXMODE" : "PRMODE"); 1720 1721 if (ocfs2_is_hard_readonly(osb)) { 1722 if (write) 1723 status = -EROFS; 1724 goto out; 1725 } 1726 1727 if (ocfs2_mount_local(osb)) 1728 goto out; 1729 1730 lockres = &OCFS2_I(inode)->ip_open_lockres; 1731 1732 level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1733 1734 /* 1735 * The file system may already holding a PRMODE/EXMODE open lock. 1736 * Since we pass DLM_LKF_NOQUEUE, the request won't block waiting on 1737 * other nodes and the -EAGAIN will indicate to the caller that 1738 * this inode is still in use. 1739 */ 1740 status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, 1741 level, DLM_LKF_NOQUEUE, 0); 1742 1743 out: 1744 return status; 1745 } 1746 1747 /* 1748 * ocfs2_open_unlock unlock PR and EX mode open locks. 1749 */ 1750 void ocfs2_open_unlock(struct inode *inode) 1751 { 1752 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_open_lockres; 1753 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1754 1755 mlog(0, "inode %llu drop open lock\n", 1756 (unsigned long long)OCFS2_I(inode)->ip_blkno); 1757 1758 if (ocfs2_mount_local(osb)) 1759 goto out; 1760 1761 if(lockres->l_ro_holders) 1762 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, 1763 DLM_LOCK_PR); 1764 if(lockres->l_ex_holders) 1765 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, 1766 DLM_LOCK_EX); 1767 1768 out: 1769 return; 1770 } 1771 1772 static int ocfs2_flock_handle_signal(struct ocfs2_lock_res *lockres, 1773 int level) 1774 { 1775 int ret; 1776 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 1777 unsigned long flags; 1778 struct ocfs2_mask_waiter mw; 1779 1780 ocfs2_init_mask_waiter(&mw); 1781 1782 retry_cancel: 1783 spin_lock_irqsave(&lockres->l_lock, flags); 1784 if (lockres->l_flags & OCFS2_LOCK_BUSY) { 1785 ret = ocfs2_prepare_cancel_convert(osb, lockres); 1786 if (ret) { 1787 spin_unlock_irqrestore(&lockres->l_lock, flags); 1788 ret = ocfs2_cancel_convert(osb, lockres); 1789 if (ret < 0) { 1790 mlog_errno(ret); 1791 goto out; 1792 } 1793 goto retry_cancel; 1794 } 1795 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1796 spin_unlock_irqrestore(&lockres->l_lock, flags); 1797 1798 ocfs2_wait_for_mask(&mw); 1799 goto retry_cancel; 1800 } 1801 1802 ret = -ERESTARTSYS; 1803 /* 1804 * We may still have gotten the lock, in which case there's no 1805 * point to restarting the syscall. 1806 */ 1807 if (lockres->l_level == level) 1808 ret = 0; 1809 1810 mlog(0, "Cancel returning %d. flags: 0x%lx, level: %d, act: %d\n", ret, 1811 lockres->l_flags, lockres->l_level, lockres->l_action); 1812 1813 spin_unlock_irqrestore(&lockres->l_lock, flags); 1814 1815 out: 1816 return ret; 1817 } 1818 1819 /* 1820 * ocfs2_file_lock() and ocfs2_file_unlock() map to a single pair of 1821 * flock() calls. The locking approach this requires is sufficiently 1822 * different from all other cluster lock types that we implement a 1823 * separate path to the "low-level" dlm calls. In particular: 1824 * 1825 * - No optimization of lock levels is done - we take at exactly 1826 * what's been requested. 1827 * 1828 * - No lock caching is employed. We immediately downconvert to 1829 * no-lock at unlock time. This also means flock locks never go on 1830 * the blocking list). 1831 * 1832 * - Since userspace can trivially deadlock itself with flock, we make 1833 * sure to allow cancellation of a misbehaving applications flock() 1834 * request. 1835 * 1836 * - Access to any flock lockres doesn't require concurrency, so we 1837 * can simplify the code by requiring the caller to guarantee 1838 * serialization of dlmglue flock calls. 1839 */ 1840 int ocfs2_file_lock(struct file *file, int ex, int trylock) 1841 { 1842 int ret, level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 1843 unsigned int lkm_flags = trylock ? DLM_LKF_NOQUEUE : 0; 1844 unsigned long flags; 1845 struct ocfs2_file_private *fp = file->private_data; 1846 struct ocfs2_lock_res *lockres = &fp->fp_flock; 1847 struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb); 1848 struct ocfs2_mask_waiter mw; 1849 1850 ocfs2_init_mask_waiter(&mw); 1851 1852 if ((lockres->l_flags & OCFS2_LOCK_BUSY) || 1853 (lockres->l_level > DLM_LOCK_NL)) { 1854 mlog(ML_ERROR, 1855 "File lock \"%s\" has busy or locked state: flags: 0x%lx, " 1856 "level: %u\n", lockres->l_name, lockres->l_flags, 1857 lockres->l_level); 1858 return -EINVAL; 1859 } 1860 1861 spin_lock_irqsave(&lockres->l_lock, flags); 1862 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 1863 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1864 spin_unlock_irqrestore(&lockres->l_lock, flags); 1865 1866 /* 1867 * Get the lock at NLMODE to start - that way we 1868 * can cancel the upconvert request if need be. 1869 */ 1870 ret = ocfs2_lock_create(osb, lockres, DLM_LOCK_NL, 0); 1871 if (ret < 0) { 1872 mlog_errno(ret); 1873 goto out; 1874 } 1875 1876 ret = ocfs2_wait_for_mask(&mw); 1877 if (ret) { 1878 mlog_errno(ret); 1879 goto out; 1880 } 1881 spin_lock_irqsave(&lockres->l_lock, flags); 1882 } 1883 1884 lockres->l_action = OCFS2_AST_CONVERT; 1885 lkm_flags |= DLM_LKF_CONVERT; 1886 lockres->l_requested = level; 1887 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 1888 1889 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1890 spin_unlock_irqrestore(&lockres->l_lock, flags); 1891 1892 ret = ocfs2_dlm_lock(osb->cconn, level, &lockres->l_lksb, lkm_flags, 1893 lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1); 1894 if (ret) { 1895 if (!trylock || (ret != -EAGAIN)) { 1896 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 1897 ret = -EINVAL; 1898 } 1899 1900 ocfs2_recover_from_dlm_error(lockres, 1); 1901 lockres_remove_mask_waiter(lockres, &mw); 1902 goto out; 1903 } 1904 1905 ret = ocfs2_wait_for_mask_interruptible(&mw, lockres); 1906 if (ret == -ERESTARTSYS) { 1907 /* 1908 * Userspace can cause deadlock itself with 1909 * flock(). Current behavior locally is to allow the 1910 * deadlock, but abort the system call if a signal is 1911 * received. We follow this example, otherwise a 1912 * poorly written program could sit in kernel until 1913 * reboot. 1914 * 1915 * Handling this is a bit more complicated for Ocfs2 1916 * though. We can't exit this function with an 1917 * outstanding lock request, so a cancel convert is 1918 * required. We intentionally overwrite 'ret' - if the 1919 * cancel fails and the lock was granted, it's easier 1920 * to just bubble success back up to the user. 1921 */ 1922 ret = ocfs2_flock_handle_signal(lockres, level); 1923 } else if (!ret && (level > lockres->l_level)) { 1924 /* Trylock failed asynchronously */ 1925 BUG_ON(!trylock); 1926 ret = -EAGAIN; 1927 } 1928 1929 out: 1930 1931 mlog(0, "Lock: \"%s\" ex: %d, trylock: %d, returns: %d\n", 1932 lockres->l_name, ex, trylock, ret); 1933 return ret; 1934 } 1935 1936 void ocfs2_file_unlock(struct file *file) 1937 { 1938 int ret; 1939 unsigned int gen; 1940 unsigned long flags; 1941 struct ocfs2_file_private *fp = file->private_data; 1942 struct ocfs2_lock_res *lockres = &fp->fp_flock; 1943 struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb); 1944 struct ocfs2_mask_waiter mw; 1945 1946 ocfs2_init_mask_waiter(&mw); 1947 1948 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) 1949 return; 1950 1951 if (lockres->l_level == DLM_LOCK_NL) 1952 return; 1953 1954 mlog(0, "Unlock: \"%s\" flags: 0x%lx, level: %d, act: %d\n", 1955 lockres->l_name, lockres->l_flags, lockres->l_level, 1956 lockres->l_action); 1957 1958 spin_lock_irqsave(&lockres->l_lock, flags); 1959 /* 1960 * Fake a blocking ast for the downconvert code. 1961 */ 1962 lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED); 1963 lockres->l_blocking = DLM_LOCK_EX; 1964 1965 gen = ocfs2_prepare_downconvert(lockres, DLM_LOCK_NL); 1966 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1967 spin_unlock_irqrestore(&lockres->l_lock, flags); 1968 1969 ret = ocfs2_downconvert_lock(osb, lockres, DLM_LOCK_NL, 0, gen); 1970 if (ret) { 1971 mlog_errno(ret); 1972 return; 1973 } 1974 1975 ret = ocfs2_wait_for_mask(&mw); 1976 if (ret) 1977 mlog_errno(ret); 1978 } 1979 1980 static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb, 1981 struct ocfs2_lock_res *lockres) 1982 { 1983 int kick = 0; 1984 1985 /* If we know that another node is waiting on our lock, kick 1986 * the downconvert thread * pre-emptively when we reach a release 1987 * condition. */ 1988 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) { 1989 switch(lockres->l_blocking) { 1990 case DLM_LOCK_EX: 1991 if (!lockres->l_ex_holders && !lockres->l_ro_holders) 1992 kick = 1; 1993 break; 1994 case DLM_LOCK_PR: 1995 if (!lockres->l_ex_holders) 1996 kick = 1; 1997 break; 1998 default: 1999 BUG(); 2000 } 2001 } 2002 2003 if (kick) 2004 ocfs2_wake_downconvert_thread(osb); 2005 } 2006 2007 #define OCFS2_SEC_BITS 34 2008 #define OCFS2_SEC_SHIFT (64 - 34) 2009 #define OCFS2_NSEC_MASK ((1ULL << OCFS2_SEC_SHIFT) - 1) 2010 2011 /* LVB only has room for 64 bits of time here so we pack it for 2012 * now. */ 2013 static u64 ocfs2_pack_timespec(struct timespec *spec) 2014 { 2015 u64 res; 2016 u64 sec = spec->tv_sec; 2017 u32 nsec = spec->tv_nsec; 2018 2019 res = (sec << OCFS2_SEC_SHIFT) | (nsec & OCFS2_NSEC_MASK); 2020 2021 return res; 2022 } 2023 2024 /* Call this with the lockres locked. I am reasonably sure we don't 2025 * need ip_lock in this function as anyone who would be changing those 2026 * values is supposed to be blocked in ocfs2_inode_lock right now. */ 2027 static void __ocfs2_stuff_meta_lvb(struct inode *inode) 2028 { 2029 struct ocfs2_inode_info *oi = OCFS2_I(inode); 2030 struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; 2031 struct ocfs2_meta_lvb *lvb; 2032 2033 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2034 2035 /* 2036 * Invalidate the LVB of a deleted inode - this way other 2037 * nodes are forced to go to disk and discover the new inode 2038 * status. 2039 */ 2040 if (oi->ip_flags & OCFS2_INODE_DELETED) { 2041 lvb->lvb_version = 0; 2042 goto out; 2043 } 2044 2045 lvb->lvb_version = OCFS2_LVB_VERSION; 2046 lvb->lvb_isize = cpu_to_be64(i_size_read(inode)); 2047 lvb->lvb_iclusters = cpu_to_be32(oi->ip_clusters); 2048 lvb->lvb_iuid = cpu_to_be32(i_uid_read(inode)); 2049 lvb->lvb_igid = cpu_to_be32(i_gid_read(inode)); 2050 lvb->lvb_imode = cpu_to_be16(inode->i_mode); 2051 lvb->lvb_inlink = cpu_to_be16(inode->i_nlink); 2052 lvb->lvb_iatime_packed = 2053 cpu_to_be64(ocfs2_pack_timespec(&inode->i_atime)); 2054 lvb->lvb_ictime_packed = 2055 cpu_to_be64(ocfs2_pack_timespec(&inode->i_ctime)); 2056 lvb->lvb_imtime_packed = 2057 cpu_to_be64(ocfs2_pack_timespec(&inode->i_mtime)); 2058 lvb->lvb_iattr = cpu_to_be32(oi->ip_attr); 2059 lvb->lvb_idynfeatures = cpu_to_be16(oi->ip_dyn_features); 2060 lvb->lvb_igeneration = cpu_to_be32(inode->i_generation); 2061 2062 out: 2063 mlog_meta_lvb(0, lockres); 2064 } 2065 2066 static void ocfs2_unpack_timespec(struct timespec *spec, 2067 u64 packed_time) 2068 { 2069 spec->tv_sec = packed_time >> OCFS2_SEC_SHIFT; 2070 spec->tv_nsec = packed_time & OCFS2_NSEC_MASK; 2071 } 2072 2073 static void ocfs2_refresh_inode_from_lvb(struct inode *inode) 2074 { 2075 struct ocfs2_inode_info *oi = OCFS2_I(inode); 2076 struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; 2077 struct ocfs2_meta_lvb *lvb; 2078 2079 mlog_meta_lvb(0, lockres); 2080 2081 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2082 2083 /* We're safe here without the lockres lock... */ 2084 spin_lock(&oi->ip_lock); 2085 oi->ip_clusters = be32_to_cpu(lvb->lvb_iclusters); 2086 i_size_write(inode, be64_to_cpu(lvb->lvb_isize)); 2087 2088 oi->ip_attr = be32_to_cpu(lvb->lvb_iattr); 2089 oi->ip_dyn_features = be16_to_cpu(lvb->lvb_idynfeatures); 2090 ocfs2_set_inode_flags(inode); 2091 2092 /* fast-symlinks are a special case */ 2093 if (S_ISLNK(inode->i_mode) && !oi->ip_clusters) 2094 inode->i_blocks = 0; 2095 else 2096 inode->i_blocks = ocfs2_inode_sector_count(inode); 2097 2098 i_uid_write(inode, be32_to_cpu(lvb->lvb_iuid)); 2099 i_gid_write(inode, be32_to_cpu(lvb->lvb_igid)); 2100 inode->i_mode = be16_to_cpu(lvb->lvb_imode); 2101 set_nlink(inode, be16_to_cpu(lvb->lvb_inlink)); 2102 ocfs2_unpack_timespec(&inode->i_atime, 2103 be64_to_cpu(lvb->lvb_iatime_packed)); 2104 ocfs2_unpack_timespec(&inode->i_mtime, 2105 be64_to_cpu(lvb->lvb_imtime_packed)); 2106 ocfs2_unpack_timespec(&inode->i_ctime, 2107 be64_to_cpu(lvb->lvb_ictime_packed)); 2108 spin_unlock(&oi->ip_lock); 2109 } 2110 2111 static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode, 2112 struct ocfs2_lock_res *lockres) 2113 { 2114 struct ocfs2_meta_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2115 2116 if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) 2117 && lvb->lvb_version == OCFS2_LVB_VERSION 2118 && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation) 2119 return 1; 2120 return 0; 2121 } 2122 2123 /* Determine whether a lock resource needs to be refreshed, and 2124 * arbitrate who gets to refresh it. 2125 * 2126 * 0 means no refresh needed. 2127 * 2128 * > 0 means you need to refresh this and you MUST call 2129 * ocfs2_complete_lock_res_refresh afterwards. */ 2130 static int ocfs2_should_refresh_lock_res(struct ocfs2_lock_res *lockres) 2131 { 2132 unsigned long flags; 2133 int status = 0; 2134 2135 refresh_check: 2136 spin_lock_irqsave(&lockres->l_lock, flags); 2137 if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) { 2138 spin_unlock_irqrestore(&lockres->l_lock, flags); 2139 goto bail; 2140 } 2141 2142 if (lockres->l_flags & OCFS2_LOCK_REFRESHING) { 2143 spin_unlock_irqrestore(&lockres->l_lock, flags); 2144 2145 ocfs2_wait_on_refreshing_lock(lockres); 2146 goto refresh_check; 2147 } 2148 2149 /* Ok, I'll be the one to refresh this lock. */ 2150 lockres_or_flags(lockres, OCFS2_LOCK_REFRESHING); 2151 spin_unlock_irqrestore(&lockres->l_lock, flags); 2152 2153 status = 1; 2154 bail: 2155 mlog(0, "status %d\n", status); 2156 return status; 2157 } 2158 2159 /* If status is non zero, I'll mark it as not being in refresh 2160 * anymroe, but i won't clear the needs refresh flag. */ 2161 static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockres, 2162 int status) 2163 { 2164 unsigned long flags; 2165 2166 spin_lock_irqsave(&lockres->l_lock, flags); 2167 lockres_clear_flags(lockres, OCFS2_LOCK_REFRESHING); 2168 if (!status) 2169 lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 2170 spin_unlock_irqrestore(&lockres->l_lock, flags); 2171 2172 wake_up(&lockres->l_event); 2173 } 2174 2175 /* may or may not return a bh if it went to disk. */ 2176 static int ocfs2_inode_lock_update(struct inode *inode, 2177 struct buffer_head **bh) 2178 { 2179 int status = 0; 2180 struct ocfs2_inode_info *oi = OCFS2_I(inode); 2181 struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; 2182 struct ocfs2_dinode *fe; 2183 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 2184 2185 if (ocfs2_mount_local(osb)) 2186 goto bail; 2187 2188 spin_lock(&oi->ip_lock); 2189 if (oi->ip_flags & OCFS2_INODE_DELETED) { 2190 mlog(0, "Orphaned inode %llu was deleted while we " 2191 "were waiting on a lock. ip_flags = 0x%x\n", 2192 (unsigned long long)oi->ip_blkno, oi->ip_flags); 2193 spin_unlock(&oi->ip_lock); 2194 status = -ENOENT; 2195 goto bail; 2196 } 2197 spin_unlock(&oi->ip_lock); 2198 2199 if (!ocfs2_should_refresh_lock_res(lockres)) 2200 goto bail; 2201 2202 /* This will discard any caching information we might have had 2203 * for the inode metadata. */ 2204 ocfs2_metadata_cache_purge(INODE_CACHE(inode)); 2205 2206 ocfs2_extent_map_trunc(inode, 0); 2207 2208 if (ocfs2_meta_lvb_is_trustable(inode, lockres)) { 2209 mlog(0, "Trusting LVB on inode %llu\n", 2210 (unsigned long long)oi->ip_blkno); 2211 ocfs2_refresh_inode_from_lvb(inode); 2212 } else { 2213 /* Boo, we have to go to disk. */ 2214 /* read bh, cast, ocfs2_refresh_inode */ 2215 status = ocfs2_read_inode_block(inode, bh); 2216 if (status < 0) { 2217 mlog_errno(status); 2218 goto bail_refresh; 2219 } 2220 fe = (struct ocfs2_dinode *) (*bh)->b_data; 2221 2222 /* This is a good chance to make sure we're not 2223 * locking an invalid object. ocfs2_read_inode_block() 2224 * already checked that the inode block is sane. 2225 * 2226 * We bug on a stale inode here because we checked 2227 * above whether it was wiped from disk. The wiping 2228 * node provides a guarantee that we receive that 2229 * message and can mark the inode before dropping any 2230 * locks associated with it. */ 2231 mlog_bug_on_msg(inode->i_generation != 2232 le32_to_cpu(fe->i_generation), 2233 "Invalid dinode %llu disk generation: %u " 2234 "inode->i_generation: %u\n", 2235 (unsigned long long)oi->ip_blkno, 2236 le32_to_cpu(fe->i_generation), 2237 inode->i_generation); 2238 mlog_bug_on_msg(le64_to_cpu(fe->i_dtime) || 2239 !(fe->i_flags & cpu_to_le32(OCFS2_VALID_FL)), 2240 "Stale dinode %llu dtime: %llu flags: 0x%x\n", 2241 (unsigned long long)oi->ip_blkno, 2242 (unsigned long long)le64_to_cpu(fe->i_dtime), 2243 le32_to_cpu(fe->i_flags)); 2244 2245 ocfs2_refresh_inode(inode, fe); 2246 ocfs2_track_lock_refresh(lockres); 2247 } 2248 2249 status = 0; 2250 bail_refresh: 2251 ocfs2_complete_lock_res_refresh(lockres, status); 2252 bail: 2253 return status; 2254 } 2255 2256 static int ocfs2_assign_bh(struct inode *inode, 2257 struct buffer_head **ret_bh, 2258 struct buffer_head *passed_bh) 2259 { 2260 int status; 2261 2262 if (passed_bh) { 2263 /* Ok, the update went to disk for us, use the 2264 * returned bh. */ 2265 *ret_bh = passed_bh; 2266 get_bh(*ret_bh); 2267 2268 return 0; 2269 } 2270 2271 status = ocfs2_read_inode_block(inode, ret_bh); 2272 if (status < 0) 2273 mlog_errno(status); 2274 2275 return status; 2276 } 2277 2278 /* 2279 * returns < 0 error if the callback will never be called, otherwise 2280 * the result of the lock will be communicated via the callback. 2281 */ 2282 int ocfs2_inode_lock_full_nested(struct inode *inode, 2283 struct buffer_head **ret_bh, 2284 int ex, 2285 int arg_flags, 2286 int subclass) 2287 { 2288 int status, level, acquired; 2289 u32 dlm_flags; 2290 struct ocfs2_lock_res *lockres = NULL; 2291 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 2292 struct buffer_head *local_bh = NULL; 2293 2294 BUG_ON(!inode); 2295 2296 mlog(0, "inode %llu, take %s META lock\n", 2297 (unsigned long long)OCFS2_I(inode)->ip_blkno, 2298 ex ? "EXMODE" : "PRMODE"); 2299 2300 status = 0; 2301 acquired = 0; 2302 /* We'll allow faking a readonly metadata lock for 2303 * rodevices. */ 2304 if (ocfs2_is_hard_readonly(osb)) { 2305 if (ex) 2306 status = -EROFS; 2307 goto getbh; 2308 } 2309 2310 if (ocfs2_mount_local(osb)) 2311 goto local; 2312 2313 if (!(arg_flags & OCFS2_META_LOCK_RECOVERY)) 2314 ocfs2_wait_for_recovery(osb); 2315 2316 lockres = &OCFS2_I(inode)->ip_inode_lockres; 2317 level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2318 dlm_flags = 0; 2319 if (arg_flags & OCFS2_META_LOCK_NOQUEUE) 2320 dlm_flags |= DLM_LKF_NOQUEUE; 2321 2322 status = __ocfs2_cluster_lock(osb, lockres, level, dlm_flags, 2323 arg_flags, subclass, _RET_IP_); 2324 if (status < 0) { 2325 if (status != -EAGAIN) 2326 mlog_errno(status); 2327 goto bail; 2328 } 2329 2330 /* Notify the error cleanup path to drop the cluster lock. */ 2331 acquired = 1; 2332 2333 /* We wait twice because a node may have died while we were in 2334 * the lower dlm layers. The second time though, we've 2335 * committed to owning this lock so we don't allow signals to 2336 * abort the operation. */ 2337 if (!(arg_flags & OCFS2_META_LOCK_RECOVERY)) 2338 ocfs2_wait_for_recovery(osb); 2339 2340 local: 2341 /* 2342 * We only see this flag if we're being called from 2343 * ocfs2_read_locked_inode(). It means we're locking an inode 2344 * which hasn't been populated yet, so clear the refresh flag 2345 * and let the caller handle it. 2346 */ 2347 if (inode->i_state & I_NEW) { 2348 status = 0; 2349 if (lockres) 2350 ocfs2_complete_lock_res_refresh(lockres, 0); 2351 goto bail; 2352 } 2353 2354 /* This is fun. The caller may want a bh back, or it may 2355 * not. ocfs2_inode_lock_update definitely wants one in, but 2356 * may or may not read one, depending on what's in the 2357 * LVB. The result of all of this is that we've *only* gone to 2358 * disk if we have to, so the complexity is worthwhile. */ 2359 status = ocfs2_inode_lock_update(inode, &local_bh); 2360 if (status < 0) { 2361 if (status != -ENOENT) 2362 mlog_errno(status); 2363 goto bail; 2364 } 2365 getbh: 2366 if (ret_bh) { 2367 status = ocfs2_assign_bh(inode, ret_bh, local_bh); 2368 if (status < 0) { 2369 mlog_errno(status); 2370 goto bail; 2371 } 2372 } 2373 2374 bail: 2375 if (status < 0) { 2376 if (ret_bh && (*ret_bh)) { 2377 brelse(*ret_bh); 2378 *ret_bh = NULL; 2379 } 2380 if (acquired) 2381 ocfs2_inode_unlock(inode, ex); 2382 } 2383 2384 if (local_bh) 2385 brelse(local_bh); 2386 2387 return status; 2388 } 2389 2390 /* 2391 * This is working around a lock inversion between tasks acquiring DLM 2392 * locks while holding a page lock and the downconvert thread which 2393 * blocks dlm lock acquiry while acquiring page locks. 2394 * 2395 * ** These _with_page variantes are only intended to be called from aop 2396 * methods that hold page locks and return a very specific *positive* error 2397 * code that aop methods pass up to the VFS -- test for errors with != 0. ** 2398 * 2399 * The DLM is called such that it returns -EAGAIN if it would have 2400 * blocked waiting for the downconvert thread. In that case we unlock 2401 * our page so the downconvert thread can make progress. Once we've 2402 * done this we have to return AOP_TRUNCATED_PAGE so the aop method 2403 * that called us can bubble that back up into the VFS who will then 2404 * immediately retry the aop call. 2405 * 2406 * We do a blocking lock and immediate unlock before returning, though, so that 2407 * the lock has a great chance of being cached on this node by the time the VFS 2408 * calls back to retry the aop. This has a potential to livelock as nodes 2409 * ping locks back and forth, but that's a risk we're willing to take to avoid 2410 * the lock inversion simply. 2411 */ 2412 int ocfs2_inode_lock_with_page(struct inode *inode, 2413 struct buffer_head **ret_bh, 2414 int ex, 2415 struct page *page) 2416 { 2417 int ret; 2418 2419 ret = ocfs2_inode_lock_full(inode, ret_bh, ex, OCFS2_LOCK_NONBLOCK); 2420 if (ret == -EAGAIN) { 2421 unlock_page(page); 2422 if (ocfs2_inode_lock(inode, ret_bh, ex) == 0) 2423 ocfs2_inode_unlock(inode, ex); 2424 ret = AOP_TRUNCATED_PAGE; 2425 } 2426 2427 return ret; 2428 } 2429 2430 int ocfs2_inode_lock_atime(struct inode *inode, 2431 struct vfsmount *vfsmnt, 2432 int *level) 2433 { 2434 int ret; 2435 2436 ret = ocfs2_inode_lock(inode, NULL, 0); 2437 if (ret < 0) { 2438 mlog_errno(ret); 2439 return ret; 2440 } 2441 2442 /* 2443 * If we should update atime, we will get EX lock, 2444 * otherwise we just get PR lock. 2445 */ 2446 if (ocfs2_should_update_atime(inode, vfsmnt)) { 2447 struct buffer_head *bh = NULL; 2448 2449 ocfs2_inode_unlock(inode, 0); 2450 ret = ocfs2_inode_lock(inode, &bh, 1); 2451 if (ret < 0) { 2452 mlog_errno(ret); 2453 return ret; 2454 } 2455 *level = 1; 2456 if (ocfs2_should_update_atime(inode, vfsmnt)) 2457 ocfs2_update_inode_atime(inode, bh); 2458 if (bh) 2459 brelse(bh); 2460 } else 2461 *level = 0; 2462 2463 return ret; 2464 } 2465 2466 void ocfs2_inode_unlock(struct inode *inode, 2467 int ex) 2468 { 2469 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2470 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_inode_lockres; 2471 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 2472 2473 mlog(0, "inode %llu drop %s META lock\n", 2474 (unsigned long long)OCFS2_I(inode)->ip_blkno, 2475 ex ? "EXMODE" : "PRMODE"); 2476 2477 if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)) && 2478 !ocfs2_mount_local(osb)) 2479 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level); 2480 } 2481 2482 int ocfs2_orphan_scan_lock(struct ocfs2_super *osb, u32 *seqno) 2483 { 2484 struct ocfs2_lock_res *lockres; 2485 struct ocfs2_orphan_scan_lvb *lvb; 2486 int status = 0; 2487 2488 if (ocfs2_is_hard_readonly(osb)) 2489 return -EROFS; 2490 2491 if (ocfs2_mount_local(osb)) 2492 return 0; 2493 2494 lockres = &osb->osb_orphan_scan.os_lockres; 2495 status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0); 2496 if (status < 0) 2497 return status; 2498 2499 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2500 if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) && 2501 lvb->lvb_version == OCFS2_ORPHAN_LVB_VERSION) 2502 *seqno = be32_to_cpu(lvb->lvb_os_seqno); 2503 else 2504 *seqno = osb->osb_orphan_scan.os_seqno + 1; 2505 2506 return status; 2507 } 2508 2509 void ocfs2_orphan_scan_unlock(struct ocfs2_super *osb, u32 seqno) 2510 { 2511 struct ocfs2_lock_res *lockres; 2512 struct ocfs2_orphan_scan_lvb *lvb; 2513 2514 if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb)) { 2515 lockres = &osb->osb_orphan_scan.os_lockres; 2516 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2517 lvb->lvb_version = OCFS2_ORPHAN_LVB_VERSION; 2518 lvb->lvb_os_seqno = cpu_to_be32(seqno); 2519 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX); 2520 } 2521 } 2522 2523 int ocfs2_super_lock(struct ocfs2_super *osb, 2524 int ex) 2525 { 2526 int status = 0; 2527 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2528 struct ocfs2_lock_res *lockres = &osb->osb_super_lockres; 2529 2530 if (ocfs2_is_hard_readonly(osb)) 2531 return -EROFS; 2532 2533 if (ocfs2_mount_local(osb)) 2534 goto bail; 2535 2536 status = ocfs2_cluster_lock(osb, lockres, level, 0, 0); 2537 if (status < 0) { 2538 mlog_errno(status); 2539 goto bail; 2540 } 2541 2542 /* The super block lock path is really in the best position to 2543 * know when resources covered by the lock need to be 2544 * refreshed, so we do it here. Of course, making sense of 2545 * everything is up to the caller :) */ 2546 status = ocfs2_should_refresh_lock_res(lockres); 2547 if (status) { 2548 status = ocfs2_refresh_slot_info(osb); 2549 2550 ocfs2_complete_lock_res_refresh(lockres, status); 2551 2552 if (status < 0) { 2553 ocfs2_cluster_unlock(osb, lockres, level); 2554 mlog_errno(status); 2555 } 2556 ocfs2_track_lock_refresh(lockres); 2557 } 2558 bail: 2559 return status; 2560 } 2561 2562 void ocfs2_super_unlock(struct ocfs2_super *osb, 2563 int ex) 2564 { 2565 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2566 struct ocfs2_lock_res *lockres = &osb->osb_super_lockres; 2567 2568 if (!ocfs2_mount_local(osb)) 2569 ocfs2_cluster_unlock(osb, lockres, level); 2570 } 2571 2572 int ocfs2_rename_lock(struct ocfs2_super *osb) 2573 { 2574 int status; 2575 struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres; 2576 2577 if (ocfs2_is_hard_readonly(osb)) 2578 return -EROFS; 2579 2580 if (ocfs2_mount_local(osb)) 2581 return 0; 2582 2583 status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0); 2584 if (status < 0) 2585 mlog_errno(status); 2586 2587 return status; 2588 } 2589 2590 void ocfs2_rename_unlock(struct ocfs2_super *osb) 2591 { 2592 struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres; 2593 2594 if (!ocfs2_mount_local(osb)) 2595 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX); 2596 } 2597 2598 int ocfs2_nfs_sync_lock(struct ocfs2_super *osb, int ex) 2599 { 2600 int status; 2601 struct ocfs2_lock_res *lockres = &osb->osb_nfs_sync_lockres; 2602 2603 if (ocfs2_is_hard_readonly(osb)) 2604 return -EROFS; 2605 2606 if (ocfs2_mount_local(osb)) 2607 return 0; 2608 2609 status = ocfs2_cluster_lock(osb, lockres, ex ? LKM_EXMODE : LKM_PRMODE, 2610 0, 0); 2611 if (status < 0) 2612 mlog(ML_ERROR, "lock on nfs sync lock failed %d\n", status); 2613 2614 return status; 2615 } 2616 2617 void ocfs2_nfs_sync_unlock(struct ocfs2_super *osb, int ex) 2618 { 2619 struct ocfs2_lock_res *lockres = &osb->osb_nfs_sync_lockres; 2620 2621 if (!ocfs2_mount_local(osb)) 2622 ocfs2_cluster_unlock(osb, lockres, 2623 ex ? LKM_EXMODE : LKM_PRMODE); 2624 } 2625 2626 int ocfs2_dentry_lock(struct dentry *dentry, int ex) 2627 { 2628 int ret; 2629 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2630 struct ocfs2_dentry_lock *dl = dentry->d_fsdata; 2631 struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb); 2632 2633 BUG_ON(!dl); 2634 2635 if (ocfs2_is_hard_readonly(osb)) { 2636 if (ex) 2637 return -EROFS; 2638 return 0; 2639 } 2640 2641 if (ocfs2_mount_local(osb)) 2642 return 0; 2643 2644 ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0); 2645 if (ret < 0) 2646 mlog_errno(ret); 2647 2648 return ret; 2649 } 2650 2651 void ocfs2_dentry_unlock(struct dentry *dentry, int ex) 2652 { 2653 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2654 struct ocfs2_dentry_lock *dl = dentry->d_fsdata; 2655 struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb); 2656 2657 if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb)) 2658 ocfs2_cluster_unlock(osb, &dl->dl_lockres, level); 2659 } 2660 2661 /* Reference counting of the dlm debug structure. We want this because 2662 * open references on the debug inodes can live on after a mount, so 2663 * we can't rely on the ocfs2_super to always exist. */ 2664 static void ocfs2_dlm_debug_free(struct kref *kref) 2665 { 2666 struct ocfs2_dlm_debug *dlm_debug; 2667 2668 dlm_debug = container_of(kref, struct ocfs2_dlm_debug, d_refcnt); 2669 2670 kfree(dlm_debug); 2671 } 2672 2673 void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug) 2674 { 2675 if (dlm_debug) 2676 kref_put(&dlm_debug->d_refcnt, ocfs2_dlm_debug_free); 2677 } 2678 2679 static void ocfs2_get_dlm_debug(struct ocfs2_dlm_debug *debug) 2680 { 2681 kref_get(&debug->d_refcnt); 2682 } 2683 2684 struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void) 2685 { 2686 struct ocfs2_dlm_debug *dlm_debug; 2687 2688 dlm_debug = kmalloc(sizeof(struct ocfs2_dlm_debug), GFP_KERNEL); 2689 if (!dlm_debug) { 2690 mlog_errno(-ENOMEM); 2691 goto out; 2692 } 2693 2694 kref_init(&dlm_debug->d_refcnt); 2695 INIT_LIST_HEAD(&dlm_debug->d_lockres_tracking); 2696 dlm_debug->d_locking_state = NULL; 2697 out: 2698 return dlm_debug; 2699 } 2700 2701 /* Access to this is arbitrated for us via seq_file->sem. */ 2702 struct ocfs2_dlm_seq_priv { 2703 struct ocfs2_dlm_debug *p_dlm_debug; 2704 struct ocfs2_lock_res p_iter_res; 2705 struct ocfs2_lock_res p_tmp_res; 2706 }; 2707 2708 static struct ocfs2_lock_res *ocfs2_dlm_next_res(struct ocfs2_lock_res *start, 2709 struct ocfs2_dlm_seq_priv *priv) 2710 { 2711 struct ocfs2_lock_res *iter, *ret = NULL; 2712 struct ocfs2_dlm_debug *dlm_debug = priv->p_dlm_debug; 2713 2714 assert_spin_locked(&ocfs2_dlm_tracking_lock); 2715 2716 list_for_each_entry(iter, &start->l_debug_list, l_debug_list) { 2717 /* discover the head of the list */ 2718 if (&iter->l_debug_list == &dlm_debug->d_lockres_tracking) { 2719 mlog(0, "End of list found, %p\n", ret); 2720 break; 2721 } 2722 2723 /* We track our "dummy" iteration lockres' by a NULL 2724 * l_ops field. */ 2725 if (iter->l_ops != NULL) { 2726 ret = iter; 2727 break; 2728 } 2729 } 2730 2731 return ret; 2732 } 2733 2734 static void *ocfs2_dlm_seq_start(struct seq_file *m, loff_t *pos) 2735 { 2736 struct ocfs2_dlm_seq_priv *priv = m->private; 2737 struct ocfs2_lock_res *iter; 2738 2739 spin_lock(&ocfs2_dlm_tracking_lock); 2740 iter = ocfs2_dlm_next_res(&priv->p_iter_res, priv); 2741 if (iter) { 2742 /* Since lockres' have the lifetime of their container 2743 * (which can be inodes, ocfs2_supers, etc) we want to 2744 * copy this out to a temporary lockres while still 2745 * under the spinlock. Obviously after this we can't 2746 * trust any pointers on the copy returned, but that's 2747 * ok as the information we want isn't typically held 2748 * in them. */ 2749 priv->p_tmp_res = *iter; 2750 iter = &priv->p_tmp_res; 2751 } 2752 spin_unlock(&ocfs2_dlm_tracking_lock); 2753 2754 return iter; 2755 } 2756 2757 static void ocfs2_dlm_seq_stop(struct seq_file *m, void *v) 2758 { 2759 } 2760 2761 static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos) 2762 { 2763 struct ocfs2_dlm_seq_priv *priv = m->private; 2764 struct ocfs2_lock_res *iter = v; 2765 struct ocfs2_lock_res *dummy = &priv->p_iter_res; 2766 2767 spin_lock(&ocfs2_dlm_tracking_lock); 2768 iter = ocfs2_dlm_next_res(iter, priv); 2769 list_del_init(&dummy->l_debug_list); 2770 if (iter) { 2771 list_add(&dummy->l_debug_list, &iter->l_debug_list); 2772 priv->p_tmp_res = *iter; 2773 iter = &priv->p_tmp_res; 2774 } 2775 spin_unlock(&ocfs2_dlm_tracking_lock); 2776 2777 return iter; 2778 } 2779 2780 /* 2781 * Version is used by debugfs.ocfs2 to determine the format being used 2782 * 2783 * New in version 2 2784 * - Lock stats printed 2785 * New in version 3 2786 * - Max time in lock stats is in usecs (instead of nsecs) 2787 */ 2788 #define OCFS2_DLM_DEBUG_STR_VERSION 3 2789 static int ocfs2_dlm_seq_show(struct seq_file *m, void *v) 2790 { 2791 int i; 2792 char *lvb; 2793 struct ocfs2_lock_res *lockres = v; 2794 2795 if (!lockres) 2796 return -EINVAL; 2797 2798 seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION); 2799 2800 if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY) 2801 seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1, 2802 lockres->l_name, 2803 (unsigned int)ocfs2_get_dentry_lock_ino(lockres)); 2804 else 2805 seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name); 2806 2807 seq_printf(m, "%d\t" 2808 "0x%lx\t" 2809 "0x%x\t" 2810 "0x%x\t" 2811 "%u\t" 2812 "%u\t" 2813 "%d\t" 2814 "%d\t", 2815 lockres->l_level, 2816 lockres->l_flags, 2817 lockres->l_action, 2818 lockres->l_unlock_action, 2819 lockres->l_ro_holders, 2820 lockres->l_ex_holders, 2821 lockres->l_requested, 2822 lockres->l_blocking); 2823 2824 /* Dump the raw LVB */ 2825 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2826 for(i = 0; i < DLM_LVB_LEN; i++) 2827 seq_printf(m, "0x%x\t", lvb[i]); 2828 2829 #ifdef CONFIG_OCFS2_FS_STATS 2830 # define lock_num_prmode(_l) ((_l)->l_lock_prmode.ls_gets) 2831 # define lock_num_exmode(_l) ((_l)->l_lock_exmode.ls_gets) 2832 # define lock_num_prmode_failed(_l) ((_l)->l_lock_prmode.ls_fail) 2833 # define lock_num_exmode_failed(_l) ((_l)->l_lock_exmode.ls_fail) 2834 # define lock_total_prmode(_l) ((_l)->l_lock_prmode.ls_total) 2835 # define lock_total_exmode(_l) ((_l)->l_lock_exmode.ls_total) 2836 # define lock_max_prmode(_l) ((_l)->l_lock_prmode.ls_max) 2837 # define lock_max_exmode(_l) ((_l)->l_lock_exmode.ls_max) 2838 # define lock_refresh(_l) ((_l)->l_lock_refresh) 2839 #else 2840 # define lock_num_prmode(_l) (0) 2841 # define lock_num_exmode(_l) (0) 2842 # define lock_num_prmode_failed(_l) (0) 2843 # define lock_num_exmode_failed(_l) (0) 2844 # define lock_total_prmode(_l) (0ULL) 2845 # define lock_total_exmode(_l) (0ULL) 2846 # define lock_max_prmode(_l) (0) 2847 # define lock_max_exmode(_l) (0) 2848 # define lock_refresh(_l) (0) 2849 #endif 2850 /* The following seq_print was added in version 2 of this output */ 2851 seq_printf(m, "%u\t" 2852 "%u\t" 2853 "%u\t" 2854 "%u\t" 2855 "%llu\t" 2856 "%llu\t" 2857 "%u\t" 2858 "%u\t" 2859 "%u\t", 2860 lock_num_prmode(lockres), 2861 lock_num_exmode(lockres), 2862 lock_num_prmode_failed(lockres), 2863 lock_num_exmode_failed(lockres), 2864 lock_total_prmode(lockres), 2865 lock_total_exmode(lockres), 2866 lock_max_prmode(lockres), 2867 lock_max_exmode(lockres), 2868 lock_refresh(lockres)); 2869 2870 /* End the line */ 2871 seq_printf(m, "\n"); 2872 return 0; 2873 } 2874 2875 static const struct seq_operations ocfs2_dlm_seq_ops = { 2876 .start = ocfs2_dlm_seq_start, 2877 .stop = ocfs2_dlm_seq_stop, 2878 .next = ocfs2_dlm_seq_next, 2879 .show = ocfs2_dlm_seq_show, 2880 }; 2881 2882 static int ocfs2_dlm_debug_release(struct inode *inode, struct file *file) 2883 { 2884 struct seq_file *seq = file->private_data; 2885 struct ocfs2_dlm_seq_priv *priv = seq->private; 2886 struct ocfs2_lock_res *res = &priv->p_iter_res; 2887 2888 ocfs2_remove_lockres_tracking(res); 2889 ocfs2_put_dlm_debug(priv->p_dlm_debug); 2890 return seq_release_private(inode, file); 2891 } 2892 2893 static int ocfs2_dlm_debug_open(struct inode *inode, struct file *file) 2894 { 2895 int ret; 2896 struct ocfs2_dlm_seq_priv *priv; 2897 struct seq_file *seq; 2898 struct ocfs2_super *osb; 2899 2900 priv = kzalloc(sizeof(struct ocfs2_dlm_seq_priv), GFP_KERNEL); 2901 if (!priv) { 2902 ret = -ENOMEM; 2903 mlog_errno(ret); 2904 goto out; 2905 } 2906 osb = inode->i_private; 2907 ocfs2_get_dlm_debug(osb->osb_dlm_debug); 2908 priv->p_dlm_debug = osb->osb_dlm_debug; 2909 INIT_LIST_HEAD(&priv->p_iter_res.l_debug_list); 2910 2911 ret = seq_open(file, &ocfs2_dlm_seq_ops); 2912 if (ret) { 2913 kfree(priv); 2914 mlog_errno(ret); 2915 goto out; 2916 } 2917 2918 seq = file->private_data; 2919 seq->private = priv; 2920 2921 ocfs2_add_lockres_tracking(&priv->p_iter_res, 2922 priv->p_dlm_debug); 2923 2924 out: 2925 return ret; 2926 } 2927 2928 static const struct file_operations ocfs2_dlm_debug_fops = { 2929 .open = ocfs2_dlm_debug_open, 2930 .release = ocfs2_dlm_debug_release, 2931 .read = seq_read, 2932 .llseek = seq_lseek, 2933 }; 2934 2935 static int ocfs2_dlm_init_debug(struct ocfs2_super *osb) 2936 { 2937 int ret = 0; 2938 struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug; 2939 2940 dlm_debug->d_locking_state = debugfs_create_file("locking_state", 2941 S_IFREG|S_IRUSR, 2942 osb->osb_debug_root, 2943 osb, 2944 &ocfs2_dlm_debug_fops); 2945 if (!dlm_debug->d_locking_state) { 2946 ret = -EINVAL; 2947 mlog(ML_ERROR, 2948 "Unable to create locking state debugfs file.\n"); 2949 goto out; 2950 } 2951 2952 ocfs2_get_dlm_debug(dlm_debug); 2953 out: 2954 return ret; 2955 } 2956 2957 static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb) 2958 { 2959 struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug; 2960 2961 if (dlm_debug) { 2962 debugfs_remove(dlm_debug->d_locking_state); 2963 ocfs2_put_dlm_debug(dlm_debug); 2964 } 2965 } 2966 2967 int ocfs2_dlm_init(struct ocfs2_super *osb) 2968 { 2969 int status = 0; 2970 struct ocfs2_cluster_connection *conn = NULL; 2971 2972 if (ocfs2_mount_local(osb)) { 2973 osb->node_num = 0; 2974 goto local; 2975 } 2976 2977 status = ocfs2_dlm_init_debug(osb); 2978 if (status < 0) { 2979 mlog_errno(status); 2980 goto bail; 2981 } 2982 2983 /* launch downconvert thread */ 2984 osb->dc_task = kthread_run(ocfs2_downconvert_thread, osb, "ocfs2dc"); 2985 if (IS_ERR(osb->dc_task)) { 2986 status = PTR_ERR(osb->dc_task); 2987 osb->dc_task = NULL; 2988 mlog_errno(status); 2989 goto bail; 2990 } 2991 2992 /* for now, uuid == domain */ 2993 status = ocfs2_cluster_connect(osb->osb_cluster_stack, 2994 osb->osb_cluster_name, 2995 strlen(osb->osb_cluster_name), 2996 osb->uuid_str, 2997 strlen(osb->uuid_str), 2998 &lproto, ocfs2_do_node_down, osb, 2999 &conn); 3000 if (status) { 3001 mlog_errno(status); 3002 goto bail; 3003 } 3004 3005 status = ocfs2_cluster_this_node(conn, &osb->node_num); 3006 if (status < 0) { 3007 mlog_errno(status); 3008 mlog(ML_ERROR, 3009 "could not find this host's node number\n"); 3010 ocfs2_cluster_disconnect(conn, 0); 3011 goto bail; 3012 } 3013 3014 local: 3015 ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb); 3016 ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb); 3017 ocfs2_nfs_sync_lock_res_init(&osb->osb_nfs_sync_lockres, osb); 3018 ocfs2_orphan_scan_lock_res_init(&osb->osb_orphan_scan.os_lockres, osb); 3019 3020 osb->cconn = conn; 3021 3022 status = 0; 3023 bail: 3024 if (status < 0) { 3025 ocfs2_dlm_shutdown_debug(osb); 3026 if (osb->dc_task) 3027 kthread_stop(osb->dc_task); 3028 } 3029 3030 return status; 3031 } 3032 3033 void ocfs2_dlm_shutdown(struct ocfs2_super *osb, 3034 int hangup_pending) 3035 { 3036 ocfs2_drop_osb_locks(osb); 3037 3038 /* 3039 * Now that we have dropped all locks and ocfs2_dismount_volume() 3040 * has disabled recovery, the DLM won't be talking to us. It's 3041 * safe to tear things down before disconnecting the cluster. 3042 */ 3043 3044 if (osb->dc_task) { 3045 kthread_stop(osb->dc_task); 3046 osb->dc_task = NULL; 3047 } 3048 3049 ocfs2_lock_res_free(&osb->osb_super_lockres); 3050 ocfs2_lock_res_free(&osb->osb_rename_lockres); 3051 ocfs2_lock_res_free(&osb->osb_nfs_sync_lockres); 3052 ocfs2_lock_res_free(&osb->osb_orphan_scan.os_lockres); 3053 3054 ocfs2_cluster_disconnect(osb->cconn, hangup_pending); 3055 osb->cconn = NULL; 3056 3057 ocfs2_dlm_shutdown_debug(osb); 3058 } 3059 3060 static int ocfs2_drop_lock(struct ocfs2_super *osb, 3061 struct ocfs2_lock_res *lockres) 3062 { 3063 int ret; 3064 unsigned long flags; 3065 u32 lkm_flags = 0; 3066 3067 /* We didn't get anywhere near actually using this lockres. */ 3068 if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED)) 3069 goto out; 3070 3071 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) 3072 lkm_flags |= DLM_LKF_VALBLK; 3073 3074 spin_lock_irqsave(&lockres->l_lock, flags); 3075 3076 mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_FREEING), 3077 "lockres %s, flags 0x%lx\n", 3078 lockres->l_name, lockres->l_flags); 3079 3080 while (lockres->l_flags & OCFS2_LOCK_BUSY) { 3081 mlog(0, "waiting on busy lock \"%s\": flags = %lx, action = " 3082 "%u, unlock_action = %u\n", 3083 lockres->l_name, lockres->l_flags, lockres->l_action, 3084 lockres->l_unlock_action); 3085 3086 spin_unlock_irqrestore(&lockres->l_lock, flags); 3087 3088 /* XXX: Today we just wait on any busy 3089 * locks... Perhaps we need to cancel converts in the 3090 * future? */ 3091 ocfs2_wait_on_busy_lock(lockres); 3092 3093 spin_lock_irqsave(&lockres->l_lock, flags); 3094 } 3095 3096 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) { 3097 if (lockres->l_flags & OCFS2_LOCK_ATTACHED && 3098 lockres->l_level == DLM_LOCK_EX && 3099 !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) 3100 lockres->l_ops->set_lvb(lockres); 3101 } 3102 3103 if (lockres->l_flags & OCFS2_LOCK_BUSY) 3104 mlog(ML_ERROR, "destroying busy lock: \"%s\"\n", 3105 lockres->l_name); 3106 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) 3107 mlog(0, "destroying blocked lock: \"%s\"\n", lockres->l_name); 3108 3109 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 3110 spin_unlock_irqrestore(&lockres->l_lock, flags); 3111 goto out; 3112 } 3113 3114 lockres_clear_flags(lockres, OCFS2_LOCK_ATTACHED); 3115 3116 /* make sure we never get here while waiting for an ast to 3117 * fire. */ 3118 BUG_ON(lockres->l_action != OCFS2_AST_INVALID); 3119 3120 /* is this necessary? */ 3121 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 3122 lockres->l_unlock_action = OCFS2_UNLOCK_DROP_LOCK; 3123 spin_unlock_irqrestore(&lockres->l_lock, flags); 3124 3125 mlog(0, "lock %s\n", lockres->l_name); 3126 3127 ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, lkm_flags); 3128 if (ret) { 3129 ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres); 3130 mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags); 3131 ocfs2_dlm_dump_lksb(&lockres->l_lksb); 3132 BUG(); 3133 } 3134 mlog(0, "lock %s, successful return from ocfs2_dlm_unlock\n", 3135 lockres->l_name); 3136 3137 ocfs2_wait_on_busy_lock(lockres); 3138 out: 3139 return 0; 3140 } 3141 3142 static void ocfs2_process_blocked_lock(struct ocfs2_super *osb, 3143 struct ocfs2_lock_res *lockres); 3144 3145 /* Mark the lockres as being dropped. It will no longer be 3146 * queued if blocking, but we still may have to wait on it 3147 * being dequeued from the downconvert thread before we can consider 3148 * it safe to drop. 3149 * 3150 * You can *not* attempt to call cluster_lock on this lockres anymore. */ 3151 void ocfs2_mark_lockres_freeing(struct ocfs2_super *osb, 3152 struct ocfs2_lock_res *lockres) 3153 { 3154 int status; 3155 struct ocfs2_mask_waiter mw; 3156 unsigned long flags, flags2; 3157 3158 ocfs2_init_mask_waiter(&mw); 3159 3160 spin_lock_irqsave(&lockres->l_lock, flags); 3161 lockres->l_flags |= OCFS2_LOCK_FREEING; 3162 if (lockres->l_flags & OCFS2_LOCK_QUEUED && current == osb->dc_task) { 3163 /* 3164 * We know the downconvert is queued but not in progress 3165 * because we are the downconvert thread and processing 3166 * different lock. So we can just remove the lock from the 3167 * queue. This is not only an optimization but also a way 3168 * to avoid the following deadlock: 3169 * ocfs2_dentry_post_unlock() 3170 * ocfs2_dentry_lock_put() 3171 * ocfs2_drop_dentry_lock() 3172 * iput() 3173 * ocfs2_evict_inode() 3174 * ocfs2_clear_inode() 3175 * ocfs2_mark_lockres_freeing() 3176 * ... blocks waiting for OCFS2_LOCK_QUEUED 3177 * since we are the downconvert thread which 3178 * should clear the flag. 3179 */ 3180 spin_unlock_irqrestore(&lockres->l_lock, flags); 3181 spin_lock_irqsave(&osb->dc_task_lock, flags2); 3182 list_del_init(&lockres->l_blocked_list); 3183 osb->blocked_lock_count--; 3184 spin_unlock_irqrestore(&osb->dc_task_lock, flags2); 3185 /* 3186 * Warn if we recurse into another post_unlock call. Strictly 3187 * speaking it isn't a problem but we need to be careful if 3188 * that happens (stack overflow, deadlocks, ...) so warn if 3189 * ocfs2 grows a path for which this can happen. 3190 */ 3191 WARN_ON_ONCE(lockres->l_ops->post_unlock); 3192 /* Since the lock is freeing we don't do much in the fn below */ 3193 ocfs2_process_blocked_lock(osb, lockres); 3194 return; 3195 } 3196 while (lockres->l_flags & OCFS2_LOCK_QUEUED) { 3197 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_QUEUED, 0); 3198 spin_unlock_irqrestore(&lockres->l_lock, flags); 3199 3200 mlog(0, "Waiting on lockres %s\n", lockres->l_name); 3201 3202 status = ocfs2_wait_for_mask(&mw); 3203 if (status) 3204 mlog_errno(status); 3205 3206 spin_lock_irqsave(&lockres->l_lock, flags); 3207 } 3208 spin_unlock_irqrestore(&lockres->l_lock, flags); 3209 } 3210 3211 void ocfs2_simple_drop_lockres(struct ocfs2_super *osb, 3212 struct ocfs2_lock_res *lockres) 3213 { 3214 int ret; 3215 3216 ocfs2_mark_lockres_freeing(osb, lockres); 3217 ret = ocfs2_drop_lock(osb, lockres); 3218 if (ret) 3219 mlog_errno(ret); 3220 } 3221 3222 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb) 3223 { 3224 ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres); 3225 ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres); 3226 ocfs2_simple_drop_lockres(osb, &osb->osb_nfs_sync_lockres); 3227 ocfs2_simple_drop_lockres(osb, &osb->osb_orphan_scan.os_lockres); 3228 } 3229 3230 int ocfs2_drop_inode_locks(struct inode *inode) 3231 { 3232 int status, err; 3233 3234 /* No need to call ocfs2_mark_lockres_freeing here - 3235 * ocfs2_clear_inode has done it for us. */ 3236 3237 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 3238 &OCFS2_I(inode)->ip_open_lockres); 3239 if (err < 0) 3240 mlog_errno(err); 3241 3242 status = err; 3243 3244 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 3245 &OCFS2_I(inode)->ip_inode_lockres); 3246 if (err < 0) 3247 mlog_errno(err); 3248 if (err < 0 && !status) 3249 status = err; 3250 3251 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 3252 &OCFS2_I(inode)->ip_rw_lockres); 3253 if (err < 0) 3254 mlog_errno(err); 3255 if (err < 0 && !status) 3256 status = err; 3257 3258 return status; 3259 } 3260 3261 static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres, 3262 int new_level) 3263 { 3264 assert_spin_locked(&lockres->l_lock); 3265 3266 BUG_ON(lockres->l_blocking <= DLM_LOCK_NL); 3267 3268 if (lockres->l_level <= new_level) { 3269 mlog(ML_ERROR, "lockres %s, lvl %d <= %d, blcklst %d, mask %d, " 3270 "type %d, flags 0x%lx, hold %d %d, act %d %d, req %d, " 3271 "block %d, pgen %d\n", lockres->l_name, lockres->l_level, 3272 new_level, list_empty(&lockres->l_blocked_list), 3273 list_empty(&lockres->l_mask_waiters), lockres->l_type, 3274 lockres->l_flags, lockres->l_ro_holders, 3275 lockres->l_ex_holders, lockres->l_action, 3276 lockres->l_unlock_action, lockres->l_requested, 3277 lockres->l_blocking, lockres->l_pending_gen); 3278 BUG(); 3279 } 3280 3281 mlog(ML_BASTS, "lockres %s, level %d => %d, blocking %d\n", 3282 lockres->l_name, lockres->l_level, new_level, lockres->l_blocking); 3283 3284 lockres->l_action = OCFS2_AST_DOWNCONVERT; 3285 lockres->l_requested = new_level; 3286 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 3287 return lockres_set_pending(lockres); 3288 } 3289 3290 static int ocfs2_downconvert_lock(struct ocfs2_super *osb, 3291 struct ocfs2_lock_res *lockres, 3292 int new_level, 3293 int lvb, 3294 unsigned int generation) 3295 { 3296 int ret; 3297 u32 dlm_flags = DLM_LKF_CONVERT; 3298 3299 mlog(ML_BASTS, "lockres %s, level %d => %d\n", lockres->l_name, 3300 lockres->l_level, new_level); 3301 3302 if (lvb) 3303 dlm_flags |= DLM_LKF_VALBLK; 3304 3305 ret = ocfs2_dlm_lock(osb->cconn, 3306 new_level, 3307 &lockres->l_lksb, 3308 dlm_flags, 3309 lockres->l_name, 3310 OCFS2_LOCK_ID_MAX_LEN - 1); 3311 lockres_clear_pending(lockres, generation, osb); 3312 if (ret) { 3313 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 3314 ocfs2_recover_from_dlm_error(lockres, 1); 3315 goto bail; 3316 } 3317 3318 ret = 0; 3319 bail: 3320 return ret; 3321 } 3322 3323 /* returns 1 when the caller should unlock and call ocfs2_dlm_unlock */ 3324 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb, 3325 struct ocfs2_lock_res *lockres) 3326 { 3327 assert_spin_locked(&lockres->l_lock); 3328 3329 if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) { 3330 /* If we're already trying to cancel a lock conversion 3331 * then just drop the spinlock and allow the caller to 3332 * requeue this lock. */ 3333 mlog(ML_BASTS, "lockres %s, skip convert\n", lockres->l_name); 3334 return 0; 3335 } 3336 3337 /* were we in a convert when we got the bast fire? */ 3338 BUG_ON(lockres->l_action != OCFS2_AST_CONVERT && 3339 lockres->l_action != OCFS2_AST_DOWNCONVERT); 3340 /* set things up for the unlockast to know to just 3341 * clear out the ast_action and unset busy, etc. */ 3342 lockres->l_unlock_action = OCFS2_UNLOCK_CANCEL_CONVERT; 3343 3344 mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_BUSY), 3345 "lock %s, invalid flags: 0x%lx\n", 3346 lockres->l_name, lockres->l_flags); 3347 3348 mlog(ML_BASTS, "lockres %s\n", lockres->l_name); 3349 3350 return 1; 3351 } 3352 3353 static int ocfs2_cancel_convert(struct ocfs2_super *osb, 3354 struct ocfs2_lock_res *lockres) 3355 { 3356 int ret; 3357 3358 ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, 3359 DLM_LKF_CANCEL); 3360 if (ret) { 3361 ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres); 3362 ocfs2_recover_from_dlm_error(lockres, 0); 3363 } 3364 3365 mlog(ML_BASTS, "lockres %s\n", lockres->l_name); 3366 3367 return ret; 3368 } 3369 3370 static int ocfs2_unblock_lock(struct ocfs2_super *osb, 3371 struct ocfs2_lock_res *lockres, 3372 struct ocfs2_unblock_ctl *ctl) 3373 { 3374 unsigned long flags; 3375 int blocking; 3376 int new_level; 3377 int level; 3378 int ret = 0; 3379 int set_lvb = 0; 3380 unsigned int gen; 3381 3382 spin_lock_irqsave(&lockres->l_lock, flags); 3383 3384 recheck: 3385 /* 3386 * Is it still blocking? If not, we have no more work to do. 3387 */ 3388 if (!(lockres->l_flags & OCFS2_LOCK_BLOCKED)) { 3389 BUG_ON(lockres->l_blocking != DLM_LOCK_NL); 3390 spin_unlock_irqrestore(&lockres->l_lock, flags); 3391 ret = 0; 3392 goto leave; 3393 } 3394 3395 if (lockres->l_flags & OCFS2_LOCK_BUSY) { 3396 /* XXX 3397 * This is a *big* race. The OCFS2_LOCK_PENDING flag 3398 * exists entirely for one reason - another thread has set 3399 * OCFS2_LOCK_BUSY, but has *NOT* yet called dlm_lock(). 3400 * 3401 * If we do ocfs2_cancel_convert() before the other thread 3402 * calls dlm_lock(), our cancel will do nothing. We will 3403 * get no ast, and we will have no way of knowing the 3404 * cancel failed. Meanwhile, the other thread will call 3405 * into dlm_lock() and wait...forever. 3406 * 3407 * Why forever? Because another node has asked for the 3408 * lock first; that's why we're here in unblock_lock(). 3409 * 3410 * The solution is OCFS2_LOCK_PENDING. When PENDING is 3411 * set, we just requeue the unblock. Only when the other 3412 * thread has called dlm_lock() and cleared PENDING will 3413 * we then cancel their request. 3414 * 3415 * All callers of dlm_lock() must set OCFS2_DLM_PENDING 3416 * at the same time they set OCFS2_DLM_BUSY. They must 3417 * clear OCFS2_DLM_PENDING after dlm_lock() returns. 3418 */ 3419 if (lockres->l_flags & OCFS2_LOCK_PENDING) { 3420 mlog(ML_BASTS, "lockres %s, ReQ: Pending\n", 3421 lockres->l_name); 3422 goto leave_requeue; 3423 } 3424 3425 ctl->requeue = 1; 3426 ret = ocfs2_prepare_cancel_convert(osb, lockres); 3427 spin_unlock_irqrestore(&lockres->l_lock, flags); 3428 if (ret) { 3429 ret = ocfs2_cancel_convert(osb, lockres); 3430 if (ret < 0) 3431 mlog_errno(ret); 3432 } 3433 goto leave; 3434 } 3435 3436 /* 3437 * This prevents livelocks. OCFS2_LOCK_UPCONVERT_FINISHING flag is 3438 * set when the ast is received for an upconvert just before the 3439 * OCFS2_LOCK_BUSY flag is cleared. Now if the fs received a bast 3440 * on the heels of the ast, we want to delay the downconvert just 3441 * enough to allow the up requestor to do its task. Because this 3442 * lock is in the blocked queue, the lock will be downconverted 3443 * as soon as the requestor is done with the lock. 3444 */ 3445 if (lockres->l_flags & OCFS2_LOCK_UPCONVERT_FINISHING) 3446 goto leave_requeue; 3447 3448 /* 3449 * How can we block and yet be at NL? We were trying to upconvert 3450 * from NL and got canceled. The code comes back here, and now 3451 * we notice and clear BLOCKING. 3452 */ 3453 if (lockres->l_level == DLM_LOCK_NL) { 3454 BUG_ON(lockres->l_ex_holders || lockres->l_ro_holders); 3455 mlog(ML_BASTS, "lockres %s, Aborting dc\n", lockres->l_name); 3456 lockres->l_blocking = DLM_LOCK_NL; 3457 lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED); 3458 spin_unlock_irqrestore(&lockres->l_lock, flags); 3459 goto leave; 3460 } 3461 3462 /* if we're blocking an exclusive and we have *any* holders, 3463 * then requeue. */ 3464 if ((lockres->l_blocking == DLM_LOCK_EX) 3465 && (lockres->l_ex_holders || lockres->l_ro_holders)) { 3466 mlog(ML_BASTS, "lockres %s, ReQ: EX/PR Holders %u,%u\n", 3467 lockres->l_name, lockres->l_ex_holders, 3468 lockres->l_ro_holders); 3469 goto leave_requeue; 3470 } 3471 3472 /* If it's a PR we're blocking, then only 3473 * requeue if we've got any EX holders */ 3474 if (lockres->l_blocking == DLM_LOCK_PR && 3475 lockres->l_ex_holders) { 3476 mlog(ML_BASTS, "lockres %s, ReQ: EX Holders %u\n", 3477 lockres->l_name, lockres->l_ex_holders); 3478 goto leave_requeue; 3479 } 3480 3481 /* 3482 * Can we get a lock in this state if the holder counts are 3483 * zero? The meta data unblock code used to check this. 3484 */ 3485 if ((lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 3486 && (lockres->l_flags & OCFS2_LOCK_REFRESHING)) { 3487 mlog(ML_BASTS, "lockres %s, ReQ: Lock Refreshing\n", 3488 lockres->l_name); 3489 goto leave_requeue; 3490 } 3491 3492 new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking); 3493 3494 if (lockres->l_ops->check_downconvert 3495 && !lockres->l_ops->check_downconvert(lockres, new_level)) { 3496 mlog(ML_BASTS, "lockres %s, ReQ: Checkpointing\n", 3497 lockres->l_name); 3498 goto leave_requeue; 3499 } 3500 3501 /* If we get here, then we know that there are no more 3502 * incompatible holders (and anyone asking for an incompatible 3503 * lock is blocked). We can now downconvert the lock */ 3504 if (!lockres->l_ops->downconvert_worker) 3505 goto downconvert; 3506 3507 /* Some lockres types want to do a bit of work before 3508 * downconverting a lock. Allow that here. The worker function 3509 * may sleep, so we save off a copy of what we're blocking as 3510 * it may change while we're not holding the spin lock. */ 3511 blocking = lockres->l_blocking; 3512 level = lockres->l_level; 3513 spin_unlock_irqrestore(&lockres->l_lock, flags); 3514 3515 ctl->unblock_action = lockres->l_ops->downconvert_worker(lockres, blocking); 3516 3517 if (ctl->unblock_action == UNBLOCK_STOP_POST) { 3518 mlog(ML_BASTS, "lockres %s, UNBLOCK_STOP_POST\n", 3519 lockres->l_name); 3520 goto leave; 3521 } 3522 3523 spin_lock_irqsave(&lockres->l_lock, flags); 3524 if ((blocking != lockres->l_blocking) || (level != lockres->l_level)) { 3525 /* If this changed underneath us, then we can't drop 3526 * it just yet. */ 3527 mlog(ML_BASTS, "lockres %s, block=%d:%d, level=%d:%d, " 3528 "Recheck\n", lockres->l_name, blocking, 3529 lockres->l_blocking, level, lockres->l_level); 3530 goto recheck; 3531 } 3532 3533 downconvert: 3534 ctl->requeue = 0; 3535 3536 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) { 3537 if (lockres->l_level == DLM_LOCK_EX) 3538 set_lvb = 1; 3539 3540 /* 3541 * We only set the lvb if the lock has been fully 3542 * refreshed - otherwise we risk setting stale 3543 * data. Otherwise, there's no need to actually clear 3544 * out the lvb here as it's value is still valid. 3545 */ 3546 if (set_lvb && !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) 3547 lockres->l_ops->set_lvb(lockres); 3548 } 3549 3550 gen = ocfs2_prepare_downconvert(lockres, new_level); 3551 spin_unlock_irqrestore(&lockres->l_lock, flags); 3552 ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb, 3553 gen); 3554 3555 leave: 3556 if (ret) 3557 mlog_errno(ret); 3558 return ret; 3559 3560 leave_requeue: 3561 spin_unlock_irqrestore(&lockres->l_lock, flags); 3562 ctl->requeue = 1; 3563 3564 return 0; 3565 } 3566 3567 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres, 3568 int blocking) 3569 { 3570 struct inode *inode; 3571 struct address_space *mapping; 3572 struct ocfs2_inode_info *oi; 3573 3574 inode = ocfs2_lock_res_inode(lockres); 3575 mapping = inode->i_mapping; 3576 3577 if (S_ISDIR(inode->i_mode)) { 3578 oi = OCFS2_I(inode); 3579 oi->ip_dir_lock_gen++; 3580 mlog(0, "generation: %u\n", oi->ip_dir_lock_gen); 3581 goto out; 3582 } 3583 3584 if (!S_ISREG(inode->i_mode)) 3585 goto out; 3586 3587 /* 3588 * We need this before the filemap_fdatawrite() so that it can 3589 * transfer the dirty bit from the PTE to the 3590 * page. Unfortunately this means that even for EX->PR 3591 * downconverts, we'll lose our mappings and have to build 3592 * them up again. 3593 */ 3594 unmap_mapping_range(mapping, 0, 0, 0); 3595 3596 if (filemap_fdatawrite(mapping)) { 3597 mlog(ML_ERROR, "Could not sync inode %llu for downconvert!", 3598 (unsigned long long)OCFS2_I(inode)->ip_blkno); 3599 } 3600 sync_mapping_buffers(mapping); 3601 if (blocking == DLM_LOCK_EX) { 3602 truncate_inode_pages(mapping, 0); 3603 } else { 3604 /* We only need to wait on the I/O if we're not also 3605 * truncating pages because truncate_inode_pages waits 3606 * for us above. We don't truncate pages if we're 3607 * blocking anything < EXMODE because we want to keep 3608 * them around in that case. */ 3609 filemap_fdatawait(mapping); 3610 } 3611 3612 out: 3613 return UNBLOCK_CONTINUE; 3614 } 3615 3616 static int ocfs2_ci_checkpointed(struct ocfs2_caching_info *ci, 3617 struct ocfs2_lock_res *lockres, 3618 int new_level) 3619 { 3620 int checkpointed = ocfs2_ci_fully_checkpointed(ci); 3621 3622 BUG_ON(new_level != DLM_LOCK_NL && new_level != DLM_LOCK_PR); 3623 BUG_ON(lockres->l_level != DLM_LOCK_EX && !checkpointed); 3624 3625 if (checkpointed) 3626 return 1; 3627 3628 ocfs2_start_checkpoint(OCFS2_SB(ocfs2_metadata_cache_get_super(ci))); 3629 return 0; 3630 } 3631 3632 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres, 3633 int new_level) 3634 { 3635 struct inode *inode = ocfs2_lock_res_inode(lockres); 3636 3637 return ocfs2_ci_checkpointed(INODE_CACHE(inode), lockres, new_level); 3638 } 3639 3640 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres) 3641 { 3642 struct inode *inode = ocfs2_lock_res_inode(lockres); 3643 3644 __ocfs2_stuff_meta_lvb(inode); 3645 } 3646 3647 /* 3648 * Does the final reference drop on our dentry lock. Right now this 3649 * happens in the downconvert thread, but we could choose to simplify the 3650 * dlmglue API and push these off to the ocfs2_wq in the future. 3651 */ 3652 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb, 3653 struct ocfs2_lock_res *lockres) 3654 { 3655 struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres); 3656 ocfs2_dentry_lock_put(osb, dl); 3657 } 3658 3659 /* 3660 * d_delete() matching dentries before the lock downconvert. 3661 * 3662 * At this point, any process waiting to destroy the 3663 * dentry_lock due to last ref count is stopped by the 3664 * OCFS2_LOCK_QUEUED flag. 3665 * 3666 * We have two potential problems 3667 * 3668 * 1) If we do the last reference drop on our dentry_lock (via dput) 3669 * we'll wind up in ocfs2_release_dentry_lock(), waiting on 3670 * the downconvert to finish. Instead we take an elevated 3671 * reference and push the drop until after we've completed our 3672 * unblock processing. 3673 * 3674 * 2) There might be another process with a final reference, 3675 * waiting on us to finish processing. If this is the case, we 3676 * detect it and exit out - there's no more dentries anyway. 3677 */ 3678 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres, 3679 int blocking) 3680 { 3681 struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres); 3682 struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode); 3683 struct dentry *dentry; 3684 unsigned long flags; 3685 int extra_ref = 0; 3686 3687 /* 3688 * This node is blocking another node from getting a read 3689 * lock. This happens when we've renamed within a 3690 * directory. We've forced the other nodes to d_delete(), but 3691 * we never actually dropped our lock because it's still 3692 * valid. The downconvert code will retain a PR for this node, 3693 * so there's no further work to do. 3694 */ 3695 if (blocking == DLM_LOCK_PR) 3696 return UNBLOCK_CONTINUE; 3697 3698 /* 3699 * Mark this inode as potentially orphaned. The code in 3700 * ocfs2_delete_inode() will figure out whether it actually 3701 * needs to be freed or not. 3702 */ 3703 spin_lock(&oi->ip_lock); 3704 oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED; 3705 spin_unlock(&oi->ip_lock); 3706 3707 /* 3708 * Yuck. We need to make sure however that the check of 3709 * OCFS2_LOCK_FREEING and the extra reference are atomic with 3710 * respect to a reference decrement or the setting of that 3711 * flag. 3712 */ 3713 spin_lock_irqsave(&lockres->l_lock, flags); 3714 spin_lock(&dentry_attach_lock); 3715 if (!(lockres->l_flags & OCFS2_LOCK_FREEING) 3716 && dl->dl_count) { 3717 dl->dl_count++; 3718 extra_ref = 1; 3719 } 3720 spin_unlock(&dentry_attach_lock); 3721 spin_unlock_irqrestore(&lockres->l_lock, flags); 3722 3723 mlog(0, "extra_ref = %d\n", extra_ref); 3724 3725 /* 3726 * We have a process waiting on us in ocfs2_dentry_iput(), 3727 * which means we can't have any more outstanding 3728 * aliases. There's no need to do any more work. 3729 */ 3730 if (!extra_ref) 3731 return UNBLOCK_CONTINUE; 3732 3733 spin_lock(&dentry_attach_lock); 3734 while (1) { 3735 dentry = ocfs2_find_local_alias(dl->dl_inode, 3736 dl->dl_parent_blkno, 1); 3737 if (!dentry) 3738 break; 3739 spin_unlock(&dentry_attach_lock); 3740 3741 mlog(0, "d_delete(%.*s);\n", dentry->d_name.len, 3742 dentry->d_name.name); 3743 3744 /* 3745 * The following dcache calls may do an 3746 * iput(). Normally we don't want that from the 3747 * downconverting thread, but in this case it's ok 3748 * because the requesting node already has an 3749 * exclusive lock on the inode, so it can't be queued 3750 * for a downconvert. 3751 */ 3752 d_delete(dentry); 3753 dput(dentry); 3754 3755 spin_lock(&dentry_attach_lock); 3756 } 3757 spin_unlock(&dentry_attach_lock); 3758 3759 /* 3760 * If we are the last holder of this dentry lock, there is no 3761 * reason to downconvert so skip straight to the unlock. 3762 */ 3763 if (dl->dl_count == 1) 3764 return UNBLOCK_STOP_POST; 3765 3766 return UNBLOCK_CONTINUE_POST; 3767 } 3768 3769 static int ocfs2_check_refcount_downconvert(struct ocfs2_lock_res *lockres, 3770 int new_level) 3771 { 3772 struct ocfs2_refcount_tree *tree = 3773 ocfs2_lock_res_refcount_tree(lockres); 3774 3775 return ocfs2_ci_checkpointed(&tree->rf_ci, lockres, new_level); 3776 } 3777 3778 static int ocfs2_refcount_convert_worker(struct ocfs2_lock_res *lockres, 3779 int blocking) 3780 { 3781 struct ocfs2_refcount_tree *tree = 3782 ocfs2_lock_res_refcount_tree(lockres); 3783 3784 ocfs2_metadata_cache_purge(&tree->rf_ci); 3785 3786 return UNBLOCK_CONTINUE; 3787 } 3788 3789 static void ocfs2_set_qinfo_lvb(struct ocfs2_lock_res *lockres) 3790 { 3791 struct ocfs2_qinfo_lvb *lvb; 3792 struct ocfs2_mem_dqinfo *oinfo = ocfs2_lock_res_qinfo(lockres); 3793 struct mem_dqinfo *info = sb_dqinfo(oinfo->dqi_gi.dqi_sb, 3794 oinfo->dqi_gi.dqi_type); 3795 3796 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 3797 lvb->lvb_version = OCFS2_QINFO_LVB_VERSION; 3798 lvb->lvb_bgrace = cpu_to_be32(info->dqi_bgrace); 3799 lvb->lvb_igrace = cpu_to_be32(info->dqi_igrace); 3800 lvb->lvb_syncms = cpu_to_be32(oinfo->dqi_syncms); 3801 lvb->lvb_blocks = cpu_to_be32(oinfo->dqi_gi.dqi_blocks); 3802 lvb->lvb_free_blk = cpu_to_be32(oinfo->dqi_gi.dqi_free_blk); 3803 lvb->lvb_free_entry = cpu_to_be32(oinfo->dqi_gi.dqi_free_entry); 3804 } 3805 3806 void ocfs2_qinfo_unlock(struct ocfs2_mem_dqinfo *oinfo, int ex) 3807 { 3808 struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock; 3809 struct ocfs2_super *osb = OCFS2_SB(oinfo->dqi_gi.dqi_sb); 3810 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 3811 3812 if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb)) 3813 ocfs2_cluster_unlock(osb, lockres, level); 3814 } 3815 3816 static int ocfs2_refresh_qinfo(struct ocfs2_mem_dqinfo *oinfo) 3817 { 3818 struct mem_dqinfo *info = sb_dqinfo(oinfo->dqi_gi.dqi_sb, 3819 oinfo->dqi_gi.dqi_type); 3820 struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock; 3821 struct ocfs2_qinfo_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 3822 struct buffer_head *bh = NULL; 3823 struct ocfs2_global_disk_dqinfo *gdinfo; 3824 int status = 0; 3825 3826 if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) && 3827 lvb->lvb_version == OCFS2_QINFO_LVB_VERSION) { 3828 info->dqi_bgrace = be32_to_cpu(lvb->lvb_bgrace); 3829 info->dqi_igrace = be32_to_cpu(lvb->lvb_igrace); 3830 oinfo->dqi_syncms = be32_to_cpu(lvb->lvb_syncms); 3831 oinfo->dqi_gi.dqi_blocks = be32_to_cpu(lvb->lvb_blocks); 3832 oinfo->dqi_gi.dqi_free_blk = be32_to_cpu(lvb->lvb_free_blk); 3833 oinfo->dqi_gi.dqi_free_entry = 3834 be32_to_cpu(lvb->lvb_free_entry); 3835 } else { 3836 status = ocfs2_read_quota_phys_block(oinfo->dqi_gqinode, 3837 oinfo->dqi_giblk, &bh); 3838 if (status) { 3839 mlog_errno(status); 3840 goto bail; 3841 } 3842 gdinfo = (struct ocfs2_global_disk_dqinfo *) 3843 (bh->b_data + OCFS2_GLOBAL_INFO_OFF); 3844 info->dqi_bgrace = le32_to_cpu(gdinfo->dqi_bgrace); 3845 info->dqi_igrace = le32_to_cpu(gdinfo->dqi_igrace); 3846 oinfo->dqi_syncms = le32_to_cpu(gdinfo->dqi_syncms); 3847 oinfo->dqi_gi.dqi_blocks = le32_to_cpu(gdinfo->dqi_blocks); 3848 oinfo->dqi_gi.dqi_free_blk = le32_to_cpu(gdinfo->dqi_free_blk); 3849 oinfo->dqi_gi.dqi_free_entry = 3850 le32_to_cpu(gdinfo->dqi_free_entry); 3851 brelse(bh); 3852 ocfs2_track_lock_refresh(lockres); 3853 } 3854 3855 bail: 3856 return status; 3857 } 3858 3859 /* Lock quota info, this function expects at least shared lock on the quota file 3860 * so that we can safely refresh quota info from disk. */ 3861 int ocfs2_qinfo_lock(struct ocfs2_mem_dqinfo *oinfo, int ex) 3862 { 3863 struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock; 3864 struct ocfs2_super *osb = OCFS2_SB(oinfo->dqi_gi.dqi_sb); 3865 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 3866 int status = 0; 3867 3868 /* On RO devices, locking really isn't needed... */ 3869 if (ocfs2_is_hard_readonly(osb)) { 3870 if (ex) 3871 status = -EROFS; 3872 goto bail; 3873 } 3874 if (ocfs2_mount_local(osb)) 3875 goto bail; 3876 3877 status = ocfs2_cluster_lock(osb, lockres, level, 0, 0); 3878 if (status < 0) { 3879 mlog_errno(status); 3880 goto bail; 3881 } 3882 if (!ocfs2_should_refresh_lock_res(lockres)) 3883 goto bail; 3884 /* OK, we have the lock but we need to refresh the quota info */ 3885 status = ocfs2_refresh_qinfo(oinfo); 3886 if (status) 3887 ocfs2_qinfo_unlock(oinfo, ex); 3888 ocfs2_complete_lock_res_refresh(lockres, status); 3889 bail: 3890 return status; 3891 } 3892 3893 int ocfs2_refcount_lock(struct ocfs2_refcount_tree *ref_tree, int ex) 3894 { 3895 int status; 3896 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 3897 struct ocfs2_lock_res *lockres = &ref_tree->rf_lockres; 3898 struct ocfs2_super *osb = lockres->l_priv; 3899 3900 3901 if (ocfs2_is_hard_readonly(osb)) 3902 return -EROFS; 3903 3904 if (ocfs2_mount_local(osb)) 3905 return 0; 3906 3907 status = ocfs2_cluster_lock(osb, lockres, level, 0, 0); 3908 if (status < 0) 3909 mlog_errno(status); 3910 3911 return status; 3912 } 3913 3914 void ocfs2_refcount_unlock(struct ocfs2_refcount_tree *ref_tree, int ex) 3915 { 3916 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 3917 struct ocfs2_lock_res *lockres = &ref_tree->rf_lockres; 3918 struct ocfs2_super *osb = lockres->l_priv; 3919 3920 if (!ocfs2_mount_local(osb)) 3921 ocfs2_cluster_unlock(osb, lockres, level); 3922 } 3923 3924 static void ocfs2_process_blocked_lock(struct ocfs2_super *osb, 3925 struct ocfs2_lock_res *lockres) 3926 { 3927 int status; 3928 struct ocfs2_unblock_ctl ctl = {0, 0,}; 3929 unsigned long flags; 3930 3931 /* Our reference to the lockres in this function can be 3932 * considered valid until we remove the OCFS2_LOCK_QUEUED 3933 * flag. */ 3934 3935 BUG_ON(!lockres); 3936 BUG_ON(!lockres->l_ops); 3937 3938 mlog(ML_BASTS, "lockres %s blocked\n", lockres->l_name); 3939 3940 /* Detect whether a lock has been marked as going away while 3941 * the downconvert thread was processing other things. A lock can 3942 * still be marked with OCFS2_LOCK_FREEING after this check, 3943 * but short circuiting here will still save us some 3944 * performance. */ 3945 spin_lock_irqsave(&lockres->l_lock, flags); 3946 if (lockres->l_flags & OCFS2_LOCK_FREEING) 3947 goto unqueue; 3948 spin_unlock_irqrestore(&lockres->l_lock, flags); 3949 3950 status = ocfs2_unblock_lock(osb, lockres, &ctl); 3951 if (status < 0) 3952 mlog_errno(status); 3953 3954 spin_lock_irqsave(&lockres->l_lock, flags); 3955 unqueue: 3956 if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) { 3957 lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED); 3958 } else 3959 ocfs2_schedule_blocked_lock(osb, lockres); 3960 3961 mlog(ML_BASTS, "lockres %s, requeue = %s.\n", lockres->l_name, 3962 ctl.requeue ? "yes" : "no"); 3963 spin_unlock_irqrestore(&lockres->l_lock, flags); 3964 3965 if (ctl.unblock_action != UNBLOCK_CONTINUE 3966 && lockres->l_ops->post_unlock) 3967 lockres->l_ops->post_unlock(osb, lockres); 3968 } 3969 3970 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb, 3971 struct ocfs2_lock_res *lockres) 3972 { 3973 unsigned long flags; 3974 3975 assert_spin_locked(&lockres->l_lock); 3976 3977 if (lockres->l_flags & OCFS2_LOCK_FREEING) { 3978 /* Do not schedule a lock for downconvert when it's on 3979 * the way to destruction - any nodes wanting access 3980 * to the resource will get it soon. */ 3981 mlog(ML_BASTS, "lockres %s won't be scheduled: flags 0x%lx\n", 3982 lockres->l_name, lockres->l_flags); 3983 return; 3984 } 3985 3986 lockres_or_flags(lockres, OCFS2_LOCK_QUEUED); 3987 3988 spin_lock_irqsave(&osb->dc_task_lock, flags); 3989 if (list_empty(&lockres->l_blocked_list)) { 3990 list_add_tail(&lockres->l_blocked_list, 3991 &osb->blocked_lock_list); 3992 osb->blocked_lock_count++; 3993 } 3994 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 3995 } 3996 3997 static void ocfs2_downconvert_thread_do_work(struct ocfs2_super *osb) 3998 { 3999 unsigned long processed; 4000 unsigned long flags; 4001 struct ocfs2_lock_res *lockres; 4002 4003 spin_lock_irqsave(&osb->dc_task_lock, flags); 4004 /* grab this early so we know to try again if a state change and 4005 * wake happens part-way through our work */ 4006 osb->dc_work_sequence = osb->dc_wake_sequence; 4007 4008 processed = osb->blocked_lock_count; 4009 while (processed) { 4010 BUG_ON(list_empty(&osb->blocked_lock_list)); 4011 4012 lockres = list_entry(osb->blocked_lock_list.next, 4013 struct ocfs2_lock_res, l_blocked_list); 4014 list_del_init(&lockres->l_blocked_list); 4015 osb->blocked_lock_count--; 4016 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4017 4018 BUG_ON(!processed); 4019 processed--; 4020 4021 ocfs2_process_blocked_lock(osb, lockres); 4022 4023 spin_lock_irqsave(&osb->dc_task_lock, flags); 4024 } 4025 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4026 } 4027 4028 static int ocfs2_downconvert_thread_lists_empty(struct ocfs2_super *osb) 4029 { 4030 int empty = 0; 4031 unsigned long flags; 4032 4033 spin_lock_irqsave(&osb->dc_task_lock, flags); 4034 if (list_empty(&osb->blocked_lock_list)) 4035 empty = 1; 4036 4037 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4038 return empty; 4039 } 4040 4041 static int ocfs2_downconvert_thread_should_wake(struct ocfs2_super *osb) 4042 { 4043 int should_wake = 0; 4044 unsigned long flags; 4045 4046 spin_lock_irqsave(&osb->dc_task_lock, flags); 4047 if (osb->dc_work_sequence != osb->dc_wake_sequence) 4048 should_wake = 1; 4049 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4050 4051 return should_wake; 4052 } 4053 4054 static int ocfs2_downconvert_thread(void *arg) 4055 { 4056 int status = 0; 4057 struct ocfs2_super *osb = arg; 4058 4059 /* only quit once we've been asked to stop and there is no more 4060 * work available */ 4061 while (!(kthread_should_stop() && 4062 ocfs2_downconvert_thread_lists_empty(osb))) { 4063 4064 wait_event_interruptible(osb->dc_event, 4065 ocfs2_downconvert_thread_should_wake(osb) || 4066 kthread_should_stop()); 4067 4068 mlog(0, "downconvert_thread: awoken\n"); 4069 4070 ocfs2_downconvert_thread_do_work(osb); 4071 } 4072 4073 osb->dc_task = NULL; 4074 return status; 4075 } 4076 4077 void ocfs2_wake_downconvert_thread(struct ocfs2_super *osb) 4078 { 4079 unsigned long flags; 4080 4081 spin_lock_irqsave(&osb->dc_task_lock, flags); 4082 /* make sure the voting thread gets a swipe at whatever changes 4083 * the caller may have made to the voting state */ 4084 osb->dc_wake_sequence++; 4085 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4086 wake_up(&osb->dc_event); 4087 } 4088