1 /* -*- mode: c; c-basic-offset: 8; -*- 2 * vim: noexpandtab sw=8 ts=8 sts=0: 3 * 4 * dlmglue.c 5 * 6 * Code which implements an OCFS2 specific interface to our DLM. 7 * 8 * Copyright (C) 2003, 2004 Oracle. All rights reserved. 9 * 10 * This program is free software; you can redistribute it and/or 11 * modify it under the terms of the GNU General Public 12 * License as published by the Free Software Foundation; either 13 * version 2 of the License, or (at your option) any later version. 14 * 15 * This program is distributed in the hope that it will be useful, 16 * but WITHOUT ANY WARRANTY; without even the implied warranty of 17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 18 * General Public License for more details. 19 * 20 * You should have received a copy of the GNU General Public 21 * License along with this program; if not, write to the 22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 23 * Boston, MA 021110-1307, USA. 24 */ 25 26 #include <linux/types.h> 27 #include <linux/slab.h> 28 #include <linux/highmem.h> 29 #include <linux/mm.h> 30 #include <linux/kthread.h> 31 #include <linux/pagemap.h> 32 #include <linux/debugfs.h> 33 #include <linux/seq_file.h> 34 #include <linux/time.h> 35 #include <linux/quotaops.h> 36 #include <linux/sched/signal.h> 37 38 #define MLOG_MASK_PREFIX ML_DLM_GLUE 39 #include <cluster/masklog.h> 40 41 #include "ocfs2.h" 42 #include "ocfs2_lockingver.h" 43 44 #include "alloc.h" 45 #include "dcache.h" 46 #include "dlmglue.h" 47 #include "extent_map.h" 48 #include "file.h" 49 #include "heartbeat.h" 50 #include "inode.h" 51 #include "journal.h" 52 #include "stackglue.h" 53 #include "slot_map.h" 54 #include "super.h" 55 #include "uptodate.h" 56 #include "quota.h" 57 #include "refcounttree.h" 58 #include "acl.h" 59 60 #include "buffer_head_io.h" 61 62 struct ocfs2_mask_waiter { 63 struct list_head mw_item; 64 int mw_status; 65 struct completion mw_complete; 66 unsigned long mw_mask; 67 unsigned long mw_goal; 68 #ifdef CONFIG_OCFS2_FS_STATS 69 ktime_t mw_lock_start; 70 #endif 71 }; 72 73 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres); 74 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres); 75 static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres); 76 static struct ocfs2_super *ocfs2_get_qinfo_osb(struct ocfs2_lock_res *lockres); 77 78 /* 79 * Return value from ->downconvert_worker functions. 80 * 81 * These control the precise actions of ocfs2_unblock_lock() 82 * and ocfs2_process_blocked_lock() 83 * 84 */ 85 enum ocfs2_unblock_action { 86 UNBLOCK_CONTINUE = 0, /* Continue downconvert */ 87 UNBLOCK_CONTINUE_POST = 1, /* Continue downconvert, fire 88 * ->post_unlock callback */ 89 UNBLOCK_STOP_POST = 2, /* Do not downconvert, fire 90 * ->post_unlock() callback. */ 91 }; 92 93 struct ocfs2_unblock_ctl { 94 int requeue; 95 enum ocfs2_unblock_action unblock_action; 96 }; 97 98 /* Lockdep class keys */ 99 #ifdef CONFIG_DEBUG_LOCK_ALLOC 100 static struct lock_class_key lockdep_keys[OCFS2_NUM_LOCK_TYPES]; 101 #endif 102 103 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres, 104 int new_level); 105 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres); 106 107 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres, 108 int blocking); 109 110 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres, 111 int blocking); 112 113 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb, 114 struct ocfs2_lock_res *lockres); 115 116 static void ocfs2_set_qinfo_lvb(struct ocfs2_lock_res *lockres); 117 118 static int ocfs2_check_refcount_downconvert(struct ocfs2_lock_res *lockres, 119 int new_level); 120 static int ocfs2_refcount_convert_worker(struct ocfs2_lock_res *lockres, 121 int blocking); 122 123 #define mlog_meta_lvb(__level, __lockres) ocfs2_dump_meta_lvb_info(__level, __PRETTY_FUNCTION__, __LINE__, __lockres) 124 125 /* This aids in debugging situations where a bad LVB might be involved. */ 126 static void ocfs2_dump_meta_lvb_info(u64 level, 127 const char *function, 128 unsigned int line, 129 struct ocfs2_lock_res *lockres) 130 { 131 struct ocfs2_meta_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 132 133 mlog(level, "LVB information for %s (called from %s:%u):\n", 134 lockres->l_name, function, line); 135 mlog(level, "version: %u, clusters: %u, generation: 0x%x\n", 136 lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters), 137 be32_to_cpu(lvb->lvb_igeneration)); 138 mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n", 139 (unsigned long long)be64_to_cpu(lvb->lvb_isize), 140 be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid), 141 be16_to_cpu(lvb->lvb_imode)); 142 mlog(level, "nlink %u, atime_packed 0x%llx, ctime_packed 0x%llx, " 143 "mtime_packed 0x%llx iattr 0x%x\n", be16_to_cpu(lvb->lvb_inlink), 144 (long long)be64_to_cpu(lvb->lvb_iatime_packed), 145 (long long)be64_to_cpu(lvb->lvb_ictime_packed), 146 (long long)be64_to_cpu(lvb->lvb_imtime_packed), 147 be32_to_cpu(lvb->lvb_iattr)); 148 } 149 150 151 /* 152 * OCFS2 Lock Resource Operations 153 * 154 * These fine tune the behavior of the generic dlmglue locking infrastructure. 155 * 156 * The most basic of lock types can point ->l_priv to their respective 157 * struct ocfs2_super and allow the default actions to manage things. 158 * 159 * Right now, each lock type also needs to implement an init function, 160 * and trivial lock/unlock wrappers. ocfs2_simple_drop_lockres() 161 * should be called when the lock is no longer needed (i.e., object 162 * destruction time). 163 */ 164 struct ocfs2_lock_res_ops { 165 /* 166 * Translate an ocfs2_lock_res * into an ocfs2_super *. Define 167 * this callback if ->l_priv is not an ocfs2_super pointer 168 */ 169 struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *); 170 171 /* 172 * Optionally called in the downconvert thread after a 173 * successful downconvert. The lockres will not be referenced 174 * after this callback is called, so it is safe to free 175 * memory, etc. 176 * 177 * The exact semantics of when this is called are controlled 178 * by ->downconvert_worker() 179 */ 180 void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *); 181 182 /* 183 * Allow a lock type to add checks to determine whether it is 184 * safe to downconvert a lock. Return 0 to re-queue the 185 * downconvert at a later time, nonzero to continue. 186 * 187 * For most locks, the default checks that there are no 188 * incompatible holders are sufficient. 189 * 190 * Called with the lockres spinlock held. 191 */ 192 int (*check_downconvert)(struct ocfs2_lock_res *, int); 193 194 /* 195 * Allows a lock type to populate the lock value block. This 196 * is called on downconvert, and when we drop a lock. 197 * 198 * Locks that want to use this should set LOCK_TYPE_USES_LVB 199 * in the flags field. 200 * 201 * Called with the lockres spinlock held. 202 */ 203 void (*set_lvb)(struct ocfs2_lock_res *); 204 205 /* 206 * Called from the downconvert thread when it is determined 207 * that a lock will be downconverted. This is called without 208 * any locks held so the function can do work that might 209 * schedule (syncing out data, etc). 210 * 211 * This should return any one of the ocfs2_unblock_action 212 * values, depending on what it wants the thread to do. 213 */ 214 int (*downconvert_worker)(struct ocfs2_lock_res *, int); 215 216 /* 217 * LOCK_TYPE_* flags which describe the specific requirements 218 * of a lock type. Descriptions of each individual flag follow. 219 */ 220 int flags; 221 }; 222 223 /* 224 * Some locks want to "refresh" potentially stale data when a 225 * meaningful (PRMODE or EXMODE) lock level is first obtained. If this 226 * flag is set, the OCFS2_LOCK_NEEDS_REFRESH flag will be set on the 227 * individual lockres l_flags member from the ast function. It is 228 * expected that the locking wrapper will clear the 229 * OCFS2_LOCK_NEEDS_REFRESH flag when done. 230 */ 231 #define LOCK_TYPE_REQUIRES_REFRESH 0x1 232 233 /* 234 * Indicate that a lock type makes use of the lock value block. The 235 * ->set_lvb lock type callback must be defined. 236 */ 237 #define LOCK_TYPE_USES_LVB 0x2 238 239 static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = { 240 .get_osb = ocfs2_get_inode_osb, 241 .flags = 0, 242 }; 243 244 static struct ocfs2_lock_res_ops ocfs2_inode_inode_lops = { 245 .get_osb = ocfs2_get_inode_osb, 246 .check_downconvert = ocfs2_check_meta_downconvert, 247 .set_lvb = ocfs2_set_meta_lvb, 248 .downconvert_worker = ocfs2_data_convert_worker, 249 .flags = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB, 250 }; 251 252 static struct ocfs2_lock_res_ops ocfs2_super_lops = { 253 .flags = LOCK_TYPE_REQUIRES_REFRESH, 254 }; 255 256 static struct ocfs2_lock_res_ops ocfs2_rename_lops = { 257 .flags = 0, 258 }; 259 260 static struct ocfs2_lock_res_ops ocfs2_nfs_sync_lops = { 261 .flags = 0, 262 }; 263 264 static struct ocfs2_lock_res_ops ocfs2_trim_fs_lops = { 265 .flags = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB, 266 }; 267 268 static struct ocfs2_lock_res_ops ocfs2_orphan_scan_lops = { 269 .flags = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB, 270 }; 271 272 static struct ocfs2_lock_res_ops ocfs2_dentry_lops = { 273 .get_osb = ocfs2_get_dentry_osb, 274 .post_unlock = ocfs2_dentry_post_unlock, 275 .downconvert_worker = ocfs2_dentry_convert_worker, 276 .flags = 0, 277 }; 278 279 static struct ocfs2_lock_res_ops ocfs2_inode_open_lops = { 280 .get_osb = ocfs2_get_inode_osb, 281 .flags = 0, 282 }; 283 284 static struct ocfs2_lock_res_ops ocfs2_flock_lops = { 285 .get_osb = ocfs2_get_file_osb, 286 .flags = 0, 287 }; 288 289 static struct ocfs2_lock_res_ops ocfs2_qinfo_lops = { 290 .set_lvb = ocfs2_set_qinfo_lvb, 291 .get_osb = ocfs2_get_qinfo_osb, 292 .flags = LOCK_TYPE_REQUIRES_REFRESH | LOCK_TYPE_USES_LVB, 293 }; 294 295 static struct ocfs2_lock_res_ops ocfs2_refcount_block_lops = { 296 .check_downconvert = ocfs2_check_refcount_downconvert, 297 .downconvert_worker = ocfs2_refcount_convert_worker, 298 .flags = 0, 299 }; 300 301 static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres) 302 { 303 return lockres->l_type == OCFS2_LOCK_TYPE_META || 304 lockres->l_type == OCFS2_LOCK_TYPE_RW || 305 lockres->l_type == OCFS2_LOCK_TYPE_OPEN; 306 } 307 308 static inline struct ocfs2_lock_res *ocfs2_lksb_to_lock_res(struct ocfs2_dlm_lksb *lksb) 309 { 310 return container_of(lksb, struct ocfs2_lock_res, l_lksb); 311 } 312 313 static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres) 314 { 315 BUG_ON(!ocfs2_is_inode_lock(lockres)); 316 317 return (struct inode *) lockres->l_priv; 318 } 319 320 static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres) 321 { 322 BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY); 323 324 return (struct ocfs2_dentry_lock *)lockres->l_priv; 325 } 326 327 static inline struct ocfs2_mem_dqinfo *ocfs2_lock_res_qinfo(struct ocfs2_lock_res *lockres) 328 { 329 BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_QINFO); 330 331 return (struct ocfs2_mem_dqinfo *)lockres->l_priv; 332 } 333 334 static inline struct ocfs2_refcount_tree * 335 ocfs2_lock_res_refcount_tree(struct ocfs2_lock_res *res) 336 { 337 return container_of(res, struct ocfs2_refcount_tree, rf_lockres); 338 } 339 340 static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *lockres) 341 { 342 if (lockres->l_ops->get_osb) 343 return lockres->l_ops->get_osb(lockres); 344 345 return (struct ocfs2_super *)lockres->l_priv; 346 } 347 348 static int ocfs2_lock_create(struct ocfs2_super *osb, 349 struct ocfs2_lock_res *lockres, 350 int level, 351 u32 dlm_flags); 352 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres, 353 int wanted); 354 static void __ocfs2_cluster_unlock(struct ocfs2_super *osb, 355 struct ocfs2_lock_res *lockres, 356 int level, unsigned long caller_ip); 357 static inline void ocfs2_cluster_unlock(struct ocfs2_super *osb, 358 struct ocfs2_lock_res *lockres, 359 int level) 360 { 361 __ocfs2_cluster_unlock(osb, lockres, level, _RET_IP_); 362 } 363 364 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres); 365 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres); 366 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres); 367 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, int level); 368 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb, 369 struct ocfs2_lock_res *lockres); 370 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres, 371 int convert); 372 #define ocfs2_log_dlm_error(_func, _err, _lockres) do { \ 373 if ((_lockres)->l_type != OCFS2_LOCK_TYPE_DENTRY) \ 374 mlog(ML_ERROR, "DLM error %d while calling %s on resource %s\n", \ 375 _err, _func, _lockres->l_name); \ 376 else \ 377 mlog(ML_ERROR, "DLM error %d while calling %s on resource %.*s%08x\n", \ 378 _err, _func, OCFS2_DENTRY_LOCK_INO_START - 1, (_lockres)->l_name, \ 379 (unsigned int)ocfs2_get_dentry_lock_ino(_lockres)); \ 380 } while (0) 381 static int ocfs2_downconvert_thread(void *arg); 382 static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb, 383 struct ocfs2_lock_res *lockres); 384 static int ocfs2_inode_lock_update(struct inode *inode, 385 struct buffer_head **bh); 386 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb); 387 static inline int ocfs2_highest_compat_lock_level(int level); 388 static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres, 389 int new_level); 390 static int ocfs2_downconvert_lock(struct ocfs2_super *osb, 391 struct ocfs2_lock_res *lockres, 392 int new_level, 393 int lvb, 394 unsigned int generation); 395 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb, 396 struct ocfs2_lock_res *lockres); 397 static int ocfs2_cancel_convert(struct ocfs2_super *osb, 398 struct ocfs2_lock_res *lockres); 399 400 401 static void ocfs2_build_lock_name(enum ocfs2_lock_type type, 402 u64 blkno, 403 u32 generation, 404 char *name) 405 { 406 int len; 407 408 BUG_ON(type >= OCFS2_NUM_LOCK_TYPES); 409 410 len = snprintf(name, OCFS2_LOCK_ID_MAX_LEN, "%c%s%016llx%08x", 411 ocfs2_lock_type_char(type), OCFS2_LOCK_ID_PAD, 412 (long long)blkno, generation); 413 414 BUG_ON(len != (OCFS2_LOCK_ID_MAX_LEN - 1)); 415 416 mlog(0, "built lock resource with name: %s\n", name); 417 } 418 419 static DEFINE_SPINLOCK(ocfs2_dlm_tracking_lock); 420 421 static void ocfs2_add_lockres_tracking(struct ocfs2_lock_res *res, 422 struct ocfs2_dlm_debug *dlm_debug) 423 { 424 mlog(0, "Add tracking for lockres %s\n", res->l_name); 425 426 spin_lock(&ocfs2_dlm_tracking_lock); 427 list_add(&res->l_debug_list, &dlm_debug->d_lockres_tracking); 428 spin_unlock(&ocfs2_dlm_tracking_lock); 429 } 430 431 static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res) 432 { 433 spin_lock(&ocfs2_dlm_tracking_lock); 434 if (!list_empty(&res->l_debug_list)) 435 list_del_init(&res->l_debug_list); 436 spin_unlock(&ocfs2_dlm_tracking_lock); 437 } 438 439 #ifdef CONFIG_OCFS2_FS_STATS 440 static void ocfs2_init_lock_stats(struct ocfs2_lock_res *res) 441 { 442 res->l_lock_refresh = 0; 443 memset(&res->l_lock_prmode, 0, sizeof(struct ocfs2_lock_stats)); 444 memset(&res->l_lock_exmode, 0, sizeof(struct ocfs2_lock_stats)); 445 } 446 447 static void ocfs2_update_lock_stats(struct ocfs2_lock_res *res, int level, 448 struct ocfs2_mask_waiter *mw, int ret) 449 { 450 u32 usec; 451 ktime_t kt; 452 struct ocfs2_lock_stats *stats; 453 454 if (level == LKM_PRMODE) 455 stats = &res->l_lock_prmode; 456 else if (level == LKM_EXMODE) 457 stats = &res->l_lock_exmode; 458 else 459 return; 460 461 kt = ktime_sub(ktime_get(), mw->mw_lock_start); 462 usec = ktime_to_us(kt); 463 464 stats->ls_gets++; 465 stats->ls_total += ktime_to_ns(kt); 466 /* overflow */ 467 if (unlikely(stats->ls_gets == 0)) { 468 stats->ls_gets++; 469 stats->ls_total = ktime_to_ns(kt); 470 } 471 472 if (stats->ls_max < usec) 473 stats->ls_max = usec; 474 475 if (ret) 476 stats->ls_fail++; 477 } 478 479 static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres) 480 { 481 lockres->l_lock_refresh++; 482 } 483 484 static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw) 485 { 486 mw->mw_lock_start = ktime_get(); 487 } 488 #else 489 static inline void ocfs2_init_lock_stats(struct ocfs2_lock_res *res) 490 { 491 } 492 static inline void ocfs2_update_lock_stats(struct ocfs2_lock_res *res, 493 int level, struct ocfs2_mask_waiter *mw, int ret) 494 { 495 } 496 static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres) 497 { 498 } 499 static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw) 500 { 501 } 502 #endif 503 504 static void ocfs2_lock_res_init_common(struct ocfs2_super *osb, 505 struct ocfs2_lock_res *res, 506 enum ocfs2_lock_type type, 507 struct ocfs2_lock_res_ops *ops, 508 void *priv) 509 { 510 res->l_type = type; 511 res->l_ops = ops; 512 res->l_priv = priv; 513 514 res->l_level = DLM_LOCK_IV; 515 res->l_requested = DLM_LOCK_IV; 516 res->l_blocking = DLM_LOCK_IV; 517 res->l_action = OCFS2_AST_INVALID; 518 res->l_unlock_action = OCFS2_UNLOCK_INVALID; 519 520 res->l_flags = OCFS2_LOCK_INITIALIZED; 521 522 ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug); 523 524 ocfs2_init_lock_stats(res); 525 #ifdef CONFIG_DEBUG_LOCK_ALLOC 526 if (type != OCFS2_LOCK_TYPE_OPEN) 527 lockdep_init_map(&res->l_lockdep_map, ocfs2_lock_type_strings[type], 528 &lockdep_keys[type], 0); 529 else 530 res->l_lockdep_map.key = NULL; 531 #endif 532 } 533 534 void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res) 535 { 536 /* This also clears out the lock status block */ 537 memset(res, 0, sizeof(struct ocfs2_lock_res)); 538 spin_lock_init(&res->l_lock); 539 init_waitqueue_head(&res->l_event); 540 INIT_LIST_HEAD(&res->l_blocked_list); 541 INIT_LIST_HEAD(&res->l_mask_waiters); 542 INIT_LIST_HEAD(&res->l_holders); 543 } 544 545 void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res, 546 enum ocfs2_lock_type type, 547 unsigned int generation, 548 struct inode *inode) 549 { 550 struct ocfs2_lock_res_ops *ops; 551 552 switch(type) { 553 case OCFS2_LOCK_TYPE_RW: 554 ops = &ocfs2_inode_rw_lops; 555 break; 556 case OCFS2_LOCK_TYPE_META: 557 ops = &ocfs2_inode_inode_lops; 558 break; 559 case OCFS2_LOCK_TYPE_OPEN: 560 ops = &ocfs2_inode_open_lops; 561 break; 562 default: 563 mlog_bug_on_msg(1, "type: %d\n", type); 564 ops = NULL; /* thanks, gcc */ 565 break; 566 }; 567 568 ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno, 569 generation, res->l_name); 570 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode); 571 } 572 573 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres) 574 { 575 struct inode *inode = ocfs2_lock_res_inode(lockres); 576 577 return OCFS2_SB(inode->i_sb); 578 } 579 580 static struct ocfs2_super *ocfs2_get_qinfo_osb(struct ocfs2_lock_res *lockres) 581 { 582 struct ocfs2_mem_dqinfo *info = lockres->l_priv; 583 584 return OCFS2_SB(info->dqi_gi.dqi_sb); 585 } 586 587 static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres) 588 { 589 struct ocfs2_file_private *fp = lockres->l_priv; 590 591 return OCFS2_SB(fp->fp_file->f_mapping->host->i_sb); 592 } 593 594 static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres) 595 { 596 __be64 inode_blkno_be; 597 598 memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], 599 sizeof(__be64)); 600 601 return be64_to_cpu(inode_blkno_be); 602 } 603 604 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres) 605 { 606 struct ocfs2_dentry_lock *dl = lockres->l_priv; 607 608 return OCFS2_SB(dl->dl_inode->i_sb); 609 } 610 611 void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl, 612 u64 parent, struct inode *inode) 613 { 614 int len; 615 u64 inode_blkno = OCFS2_I(inode)->ip_blkno; 616 __be64 inode_blkno_be = cpu_to_be64(inode_blkno); 617 struct ocfs2_lock_res *lockres = &dl->dl_lockres; 618 619 ocfs2_lock_res_init_once(lockres); 620 621 /* 622 * Unfortunately, the standard lock naming scheme won't work 623 * here because we have two 16 byte values to use. Instead, 624 * we'll stuff the inode number as a binary value. We still 625 * want error prints to show something without garbling the 626 * display, so drop a null byte in there before the inode 627 * number. A future version of OCFS2 will likely use all 628 * binary lock names. The stringified names have been a 629 * tremendous aid in debugging, but now that the debugfs 630 * interface exists, we can mangle things there if need be. 631 * 632 * NOTE: We also drop the standard "pad" value (the total lock 633 * name size stays the same though - the last part is all 634 * zeros due to the memset in ocfs2_lock_res_init_once() 635 */ 636 len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START, 637 "%c%016llx", 638 ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY), 639 (long long)parent); 640 641 BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1)); 642 643 memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be, 644 sizeof(__be64)); 645 646 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres, 647 OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops, 648 dl); 649 } 650 651 static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res, 652 struct ocfs2_super *osb) 653 { 654 /* Superblock lockres doesn't come from a slab so we call init 655 * once on it manually. */ 656 ocfs2_lock_res_init_once(res); 657 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO, 658 0, res->l_name); 659 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER, 660 &ocfs2_super_lops, osb); 661 } 662 663 static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res, 664 struct ocfs2_super *osb) 665 { 666 /* Rename lockres doesn't come from a slab so we call init 667 * once on it manually. */ 668 ocfs2_lock_res_init_once(res); 669 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name); 670 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME, 671 &ocfs2_rename_lops, osb); 672 } 673 674 static void ocfs2_nfs_sync_lock_res_init(struct ocfs2_lock_res *res, 675 struct ocfs2_super *osb) 676 { 677 /* nfs_sync lockres doesn't come from a slab so we call init 678 * once on it manually. */ 679 ocfs2_lock_res_init_once(res); 680 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_NFS_SYNC, 0, 0, res->l_name); 681 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_NFS_SYNC, 682 &ocfs2_nfs_sync_lops, osb); 683 } 684 685 void ocfs2_trim_fs_lock_res_init(struct ocfs2_super *osb) 686 { 687 struct ocfs2_lock_res *lockres = &osb->osb_trim_fs_lockres; 688 689 ocfs2_lock_res_init_once(lockres); 690 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_TRIM_FS, 0, 0, lockres->l_name); 691 ocfs2_lock_res_init_common(osb, lockres, OCFS2_LOCK_TYPE_TRIM_FS, 692 &ocfs2_trim_fs_lops, osb); 693 } 694 695 void ocfs2_trim_fs_lock_res_uninit(struct ocfs2_super *osb) 696 { 697 struct ocfs2_lock_res *lockres = &osb->osb_trim_fs_lockres; 698 699 ocfs2_simple_drop_lockres(osb, lockres); 700 ocfs2_lock_res_free(lockres); 701 } 702 703 static void ocfs2_orphan_scan_lock_res_init(struct ocfs2_lock_res *res, 704 struct ocfs2_super *osb) 705 { 706 ocfs2_lock_res_init_once(res); 707 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_ORPHAN_SCAN, 0, 0, res->l_name); 708 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_ORPHAN_SCAN, 709 &ocfs2_orphan_scan_lops, osb); 710 } 711 712 void ocfs2_file_lock_res_init(struct ocfs2_lock_res *lockres, 713 struct ocfs2_file_private *fp) 714 { 715 struct inode *inode = fp->fp_file->f_mapping->host; 716 struct ocfs2_inode_info *oi = OCFS2_I(inode); 717 718 ocfs2_lock_res_init_once(lockres); 719 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_FLOCK, oi->ip_blkno, 720 inode->i_generation, lockres->l_name); 721 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres, 722 OCFS2_LOCK_TYPE_FLOCK, &ocfs2_flock_lops, 723 fp); 724 lockres->l_flags |= OCFS2_LOCK_NOCACHE; 725 } 726 727 void ocfs2_qinfo_lock_res_init(struct ocfs2_lock_res *lockres, 728 struct ocfs2_mem_dqinfo *info) 729 { 730 ocfs2_lock_res_init_once(lockres); 731 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_QINFO, info->dqi_gi.dqi_type, 732 0, lockres->l_name); 733 ocfs2_lock_res_init_common(OCFS2_SB(info->dqi_gi.dqi_sb), lockres, 734 OCFS2_LOCK_TYPE_QINFO, &ocfs2_qinfo_lops, 735 info); 736 } 737 738 void ocfs2_refcount_lock_res_init(struct ocfs2_lock_res *lockres, 739 struct ocfs2_super *osb, u64 ref_blkno, 740 unsigned int generation) 741 { 742 ocfs2_lock_res_init_once(lockres); 743 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_REFCOUNT, ref_blkno, 744 generation, lockres->l_name); 745 ocfs2_lock_res_init_common(osb, lockres, OCFS2_LOCK_TYPE_REFCOUNT, 746 &ocfs2_refcount_block_lops, osb); 747 } 748 749 void ocfs2_lock_res_free(struct ocfs2_lock_res *res) 750 { 751 if (!(res->l_flags & OCFS2_LOCK_INITIALIZED)) 752 return; 753 754 ocfs2_remove_lockres_tracking(res); 755 756 mlog_bug_on_msg(!list_empty(&res->l_blocked_list), 757 "Lockres %s is on the blocked list\n", 758 res->l_name); 759 mlog_bug_on_msg(!list_empty(&res->l_mask_waiters), 760 "Lockres %s has mask waiters pending\n", 761 res->l_name); 762 mlog_bug_on_msg(spin_is_locked(&res->l_lock), 763 "Lockres %s is locked\n", 764 res->l_name); 765 mlog_bug_on_msg(res->l_ro_holders, 766 "Lockres %s has %u ro holders\n", 767 res->l_name, res->l_ro_holders); 768 mlog_bug_on_msg(res->l_ex_holders, 769 "Lockres %s has %u ex holders\n", 770 res->l_name, res->l_ex_holders); 771 772 /* Need to clear out the lock status block for the dlm */ 773 memset(&res->l_lksb, 0, sizeof(res->l_lksb)); 774 775 res->l_flags = 0UL; 776 } 777 778 /* 779 * Keep a list of processes who have interest in a lockres. 780 * Note: this is now only uesed for check recursive cluster locking. 781 */ 782 static inline void ocfs2_add_holder(struct ocfs2_lock_res *lockres, 783 struct ocfs2_lock_holder *oh) 784 { 785 INIT_LIST_HEAD(&oh->oh_list); 786 oh->oh_owner_pid = get_pid(task_pid(current)); 787 788 spin_lock(&lockres->l_lock); 789 list_add_tail(&oh->oh_list, &lockres->l_holders); 790 spin_unlock(&lockres->l_lock); 791 } 792 793 static struct ocfs2_lock_holder * 794 ocfs2_pid_holder(struct ocfs2_lock_res *lockres, 795 struct pid *pid) 796 { 797 struct ocfs2_lock_holder *oh; 798 799 spin_lock(&lockres->l_lock); 800 list_for_each_entry(oh, &lockres->l_holders, oh_list) { 801 if (oh->oh_owner_pid == pid) { 802 spin_unlock(&lockres->l_lock); 803 return oh; 804 } 805 } 806 spin_unlock(&lockres->l_lock); 807 return NULL; 808 } 809 810 static inline void ocfs2_remove_holder(struct ocfs2_lock_res *lockres, 811 struct ocfs2_lock_holder *oh) 812 { 813 spin_lock(&lockres->l_lock); 814 list_del(&oh->oh_list); 815 spin_unlock(&lockres->l_lock); 816 817 put_pid(oh->oh_owner_pid); 818 } 819 820 821 static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres, 822 int level) 823 { 824 BUG_ON(!lockres); 825 826 switch(level) { 827 case DLM_LOCK_EX: 828 lockres->l_ex_holders++; 829 break; 830 case DLM_LOCK_PR: 831 lockres->l_ro_holders++; 832 break; 833 default: 834 BUG(); 835 } 836 } 837 838 static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres, 839 int level) 840 { 841 BUG_ON(!lockres); 842 843 switch(level) { 844 case DLM_LOCK_EX: 845 BUG_ON(!lockres->l_ex_holders); 846 lockres->l_ex_holders--; 847 break; 848 case DLM_LOCK_PR: 849 BUG_ON(!lockres->l_ro_holders); 850 lockres->l_ro_holders--; 851 break; 852 default: 853 BUG(); 854 } 855 } 856 857 /* WARNING: This function lives in a world where the only three lock 858 * levels are EX, PR, and NL. It *will* have to be adjusted when more 859 * lock types are added. */ 860 static inline int ocfs2_highest_compat_lock_level(int level) 861 { 862 int new_level = DLM_LOCK_EX; 863 864 if (level == DLM_LOCK_EX) 865 new_level = DLM_LOCK_NL; 866 else if (level == DLM_LOCK_PR) 867 new_level = DLM_LOCK_PR; 868 return new_level; 869 } 870 871 static void lockres_set_flags(struct ocfs2_lock_res *lockres, 872 unsigned long newflags) 873 { 874 struct ocfs2_mask_waiter *mw, *tmp; 875 876 assert_spin_locked(&lockres->l_lock); 877 878 lockres->l_flags = newflags; 879 880 list_for_each_entry_safe(mw, tmp, &lockres->l_mask_waiters, mw_item) { 881 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal) 882 continue; 883 884 list_del_init(&mw->mw_item); 885 mw->mw_status = 0; 886 complete(&mw->mw_complete); 887 } 888 } 889 static void lockres_or_flags(struct ocfs2_lock_res *lockres, unsigned long or) 890 { 891 lockres_set_flags(lockres, lockres->l_flags | or); 892 } 893 static void lockres_clear_flags(struct ocfs2_lock_res *lockres, 894 unsigned long clear) 895 { 896 lockres_set_flags(lockres, lockres->l_flags & ~clear); 897 } 898 899 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres) 900 { 901 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 902 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED)); 903 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); 904 BUG_ON(lockres->l_blocking <= DLM_LOCK_NL); 905 906 lockres->l_level = lockres->l_requested; 907 if (lockres->l_level <= 908 ocfs2_highest_compat_lock_level(lockres->l_blocking)) { 909 lockres->l_blocking = DLM_LOCK_NL; 910 lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED); 911 } 912 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 913 } 914 915 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres) 916 { 917 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 918 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED)); 919 920 /* Convert from RO to EX doesn't really need anything as our 921 * information is already up to data. Convert from NL to 922 * *anything* however should mark ourselves as needing an 923 * update */ 924 if (lockres->l_level == DLM_LOCK_NL && 925 lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 926 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 927 928 lockres->l_level = lockres->l_requested; 929 930 /* 931 * We set the OCFS2_LOCK_UPCONVERT_FINISHING flag before clearing 932 * the OCFS2_LOCK_BUSY flag to prevent the dc thread from 933 * downconverting the lock before the upconvert has fully completed. 934 * Do not prevent the dc thread from downconverting if NONBLOCK lock 935 * had already returned. 936 */ 937 if (!(lockres->l_flags & OCFS2_LOCK_NONBLOCK_FINISHED)) 938 lockres_or_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING); 939 else 940 lockres_clear_flags(lockres, OCFS2_LOCK_NONBLOCK_FINISHED); 941 942 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 943 } 944 945 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres) 946 { 947 BUG_ON((!(lockres->l_flags & OCFS2_LOCK_BUSY))); 948 BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED); 949 950 if (lockres->l_requested > DLM_LOCK_NL && 951 !(lockres->l_flags & OCFS2_LOCK_LOCAL) && 952 lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 953 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 954 955 lockres->l_level = lockres->l_requested; 956 lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED); 957 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 958 } 959 960 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, 961 int level) 962 { 963 int needs_downconvert = 0; 964 965 assert_spin_locked(&lockres->l_lock); 966 967 if (level > lockres->l_blocking) { 968 /* only schedule a downconvert if we haven't already scheduled 969 * one that goes low enough to satisfy the level we're 970 * blocking. this also catches the case where we get 971 * duplicate BASTs */ 972 if (ocfs2_highest_compat_lock_level(level) < 973 ocfs2_highest_compat_lock_level(lockres->l_blocking)) 974 needs_downconvert = 1; 975 976 lockres->l_blocking = level; 977 } 978 979 mlog(ML_BASTS, "lockres %s, block %d, level %d, l_block %d, dwn %d\n", 980 lockres->l_name, level, lockres->l_level, lockres->l_blocking, 981 needs_downconvert); 982 983 if (needs_downconvert) 984 lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED); 985 mlog(0, "needs_downconvert = %d\n", needs_downconvert); 986 return needs_downconvert; 987 } 988 989 /* 990 * OCFS2_LOCK_PENDING and l_pending_gen. 991 * 992 * Why does OCFS2_LOCK_PENDING exist? To close a race between setting 993 * OCFS2_LOCK_BUSY and calling ocfs2_dlm_lock(). See ocfs2_unblock_lock() 994 * for more details on the race. 995 * 996 * OCFS2_LOCK_PENDING closes the race quite nicely. However, it introduces 997 * a race on itself. In o2dlm, we can get the ast before ocfs2_dlm_lock() 998 * returns. The ast clears OCFS2_LOCK_BUSY, and must therefore clear 999 * OCFS2_LOCK_PENDING at the same time. When ocfs2_dlm_lock() returns, 1000 * the caller is going to try to clear PENDING again. If nothing else is 1001 * happening, __lockres_clear_pending() sees PENDING is unset and does 1002 * nothing. 1003 * 1004 * But what if another path (eg downconvert thread) has just started a 1005 * new locking action? The other path has re-set PENDING. Our path 1006 * cannot clear PENDING, because that will re-open the original race 1007 * window. 1008 * 1009 * [Example] 1010 * 1011 * ocfs2_meta_lock() 1012 * ocfs2_cluster_lock() 1013 * set BUSY 1014 * set PENDING 1015 * drop l_lock 1016 * ocfs2_dlm_lock() 1017 * ocfs2_locking_ast() ocfs2_downconvert_thread() 1018 * clear PENDING ocfs2_unblock_lock() 1019 * take_l_lock 1020 * !BUSY 1021 * ocfs2_prepare_downconvert() 1022 * set BUSY 1023 * set PENDING 1024 * drop l_lock 1025 * take l_lock 1026 * clear PENDING 1027 * drop l_lock 1028 * <window> 1029 * ocfs2_dlm_lock() 1030 * 1031 * So as you can see, we now have a window where l_lock is not held, 1032 * PENDING is not set, and ocfs2_dlm_lock() has not been called. 1033 * 1034 * The core problem is that ocfs2_cluster_lock() has cleared the PENDING 1035 * set by ocfs2_prepare_downconvert(). That wasn't nice. 1036 * 1037 * To solve this we introduce l_pending_gen. A call to 1038 * lockres_clear_pending() will only do so when it is passed a generation 1039 * number that matches the lockres. lockres_set_pending() will return the 1040 * current generation number. When ocfs2_cluster_lock() goes to clear 1041 * PENDING, it passes the generation it got from set_pending(). In our 1042 * example above, the generation numbers will *not* match. Thus, 1043 * ocfs2_cluster_lock() will not clear the PENDING set by 1044 * ocfs2_prepare_downconvert(). 1045 */ 1046 1047 /* Unlocked version for ocfs2_locking_ast() */ 1048 static void __lockres_clear_pending(struct ocfs2_lock_res *lockres, 1049 unsigned int generation, 1050 struct ocfs2_super *osb) 1051 { 1052 assert_spin_locked(&lockres->l_lock); 1053 1054 /* 1055 * The ast and locking functions can race us here. The winner 1056 * will clear pending, the loser will not. 1057 */ 1058 if (!(lockres->l_flags & OCFS2_LOCK_PENDING) || 1059 (lockres->l_pending_gen != generation)) 1060 return; 1061 1062 lockres_clear_flags(lockres, OCFS2_LOCK_PENDING); 1063 lockres->l_pending_gen++; 1064 1065 /* 1066 * The downconvert thread may have skipped us because we 1067 * were PENDING. Wake it up. 1068 */ 1069 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) 1070 ocfs2_wake_downconvert_thread(osb); 1071 } 1072 1073 /* Locked version for callers of ocfs2_dlm_lock() */ 1074 static void lockres_clear_pending(struct ocfs2_lock_res *lockres, 1075 unsigned int generation, 1076 struct ocfs2_super *osb) 1077 { 1078 unsigned long flags; 1079 1080 spin_lock_irqsave(&lockres->l_lock, flags); 1081 __lockres_clear_pending(lockres, generation, osb); 1082 spin_unlock_irqrestore(&lockres->l_lock, flags); 1083 } 1084 1085 static unsigned int lockres_set_pending(struct ocfs2_lock_res *lockres) 1086 { 1087 assert_spin_locked(&lockres->l_lock); 1088 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 1089 1090 lockres_or_flags(lockres, OCFS2_LOCK_PENDING); 1091 1092 return lockres->l_pending_gen; 1093 } 1094 1095 static void ocfs2_blocking_ast(struct ocfs2_dlm_lksb *lksb, int level) 1096 { 1097 struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb); 1098 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 1099 int needs_downconvert; 1100 unsigned long flags; 1101 1102 BUG_ON(level <= DLM_LOCK_NL); 1103 1104 mlog(ML_BASTS, "BAST fired for lockres %s, blocking %d, level %d, " 1105 "type %s\n", lockres->l_name, level, lockres->l_level, 1106 ocfs2_lock_type_string(lockres->l_type)); 1107 1108 /* 1109 * We can skip the bast for locks which don't enable caching - 1110 * they'll be dropped at the earliest possible time anyway. 1111 */ 1112 if (lockres->l_flags & OCFS2_LOCK_NOCACHE) 1113 return; 1114 1115 spin_lock_irqsave(&lockres->l_lock, flags); 1116 needs_downconvert = ocfs2_generic_handle_bast(lockres, level); 1117 if (needs_downconvert) 1118 ocfs2_schedule_blocked_lock(osb, lockres); 1119 spin_unlock_irqrestore(&lockres->l_lock, flags); 1120 1121 wake_up(&lockres->l_event); 1122 1123 ocfs2_wake_downconvert_thread(osb); 1124 } 1125 1126 static void ocfs2_locking_ast(struct ocfs2_dlm_lksb *lksb) 1127 { 1128 struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb); 1129 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 1130 unsigned long flags; 1131 int status; 1132 1133 spin_lock_irqsave(&lockres->l_lock, flags); 1134 1135 status = ocfs2_dlm_lock_status(&lockres->l_lksb); 1136 1137 if (status == -EAGAIN) { 1138 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 1139 goto out; 1140 } 1141 1142 if (status) { 1143 mlog(ML_ERROR, "lockres %s: lksb status value of %d!\n", 1144 lockres->l_name, status); 1145 spin_unlock_irqrestore(&lockres->l_lock, flags); 1146 return; 1147 } 1148 1149 mlog(ML_BASTS, "AST fired for lockres %s, action %d, unlock %d, " 1150 "level %d => %d\n", lockres->l_name, lockres->l_action, 1151 lockres->l_unlock_action, lockres->l_level, lockres->l_requested); 1152 1153 switch(lockres->l_action) { 1154 case OCFS2_AST_ATTACH: 1155 ocfs2_generic_handle_attach_action(lockres); 1156 lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL); 1157 break; 1158 case OCFS2_AST_CONVERT: 1159 ocfs2_generic_handle_convert_action(lockres); 1160 break; 1161 case OCFS2_AST_DOWNCONVERT: 1162 ocfs2_generic_handle_downconvert_action(lockres); 1163 break; 1164 default: 1165 mlog(ML_ERROR, "lockres %s: AST fired with invalid action: %u, " 1166 "flags 0x%lx, unlock: %u\n", 1167 lockres->l_name, lockres->l_action, lockres->l_flags, 1168 lockres->l_unlock_action); 1169 BUG(); 1170 } 1171 out: 1172 /* set it to something invalid so if we get called again we 1173 * can catch it. */ 1174 lockres->l_action = OCFS2_AST_INVALID; 1175 1176 /* Did we try to cancel this lock? Clear that state */ 1177 if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) 1178 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 1179 1180 /* 1181 * We may have beaten the locking functions here. We certainly 1182 * know that dlm_lock() has been called :-) 1183 * Because we can't have two lock calls in flight at once, we 1184 * can use lockres->l_pending_gen. 1185 */ 1186 __lockres_clear_pending(lockres, lockres->l_pending_gen, osb); 1187 1188 wake_up(&lockres->l_event); 1189 spin_unlock_irqrestore(&lockres->l_lock, flags); 1190 } 1191 1192 static void ocfs2_unlock_ast(struct ocfs2_dlm_lksb *lksb, int error) 1193 { 1194 struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb); 1195 unsigned long flags; 1196 1197 mlog(ML_BASTS, "UNLOCK AST fired for lockres %s, action = %d\n", 1198 lockres->l_name, lockres->l_unlock_action); 1199 1200 spin_lock_irqsave(&lockres->l_lock, flags); 1201 if (error) { 1202 mlog(ML_ERROR, "Dlm passes error %d for lock %s, " 1203 "unlock_action %d\n", error, lockres->l_name, 1204 lockres->l_unlock_action); 1205 spin_unlock_irqrestore(&lockres->l_lock, flags); 1206 return; 1207 } 1208 1209 switch(lockres->l_unlock_action) { 1210 case OCFS2_UNLOCK_CANCEL_CONVERT: 1211 mlog(0, "Cancel convert success for %s\n", lockres->l_name); 1212 lockres->l_action = OCFS2_AST_INVALID; 1213 /* Downconvert thread may have requeued this lock, we 1214 * need to wake it. */ 1215 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) 1216 ocfs2_wake_downconvert_thread(ocfs2_get_lockres_osb(lockres)); 1217 break; 1218 case OCFS2_UNLOCK_DROP_LOCK: 1219 lockres->l_level = DLM_LOCK_IV; 1220 break; 1221 default: 1222 BUG(); 1223 } 1224 1225 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 1226 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 1227 wake_up(&lockres->l_event); 1228 spin_unlock_irqrestore(&lockres->l_lock, flags); 1229 } 1230 1231 /* 1232 * This is the filesystem locking protocol. It provides the lock handling 1233 * hooks for the underlying DLM. It has a maximum version number. 1234 * The version number allows interoperability with systems running at 1235 * the same major number and an equal or smaller minor number. 1236 * 1237 * Whenever the filesystem does new things with locks (adds or removes a 1238 * lock, orders them differently, does different things underneath a lock), 1239 * the version must be changed. The protocol is negotiated when joining 1240 * the dlm domain. A node may join the domain if its major version is 1241 * identical to all other nodes and its minor version is greater than 1242 * or equal to all other nodes. When its minor version is greater than 1243 * the other nodes, it will run at the minor version specified by the 1244 * other nodes. 1245 * 1246 * If a locking change is made that will not be compatible with older 1247 * versions, the major number must be increased and the minor version set 1248 * to zero. If a change merely adds a behavior that can be disabled when 1249 * speaking to older versions, the minor version must be increased. If a 1250 * change adds a fully backwards compatible change (eg, LVB changes that 1251 * are just ignored by older versions), the version does not need to be 1252 * updated. 1253 */ 1254 static struct ocfs2_locking_protocol lproto = { 1255 .lp_max_version = { 1256 .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR, 1257 .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR, 1258 }, 1259 .lp_lock_ast = ocfs2_locking_ast, 1260 .lp_blocking_ast = ocfs2_blocking_ast, 1261 .lp_unlock_ast = ocfs2_unlock_ast, 1262 }; 1263 1264 void ocfs2_set_locking_protocol(void) 1265 { 1266 ocfs2_stack_glue_set_max_proto_version(&lproto.lp_max_version); 1267 } 1268 1269 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres, 1270 int convert) 1271 { 1272 unsigned long flags; 1273 1274 spin_lock_irqsave(&lockres->l_lock, flags); 1275 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 1276 lockres_clear_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING); 1277 if (convert) 1278 lockres->l_action = OCFS2_AST_INVALID; 1279 else 1280 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 1281 spin_unlock_irqrestore(&lockres->l_lock, flags); 1282 1283 wake_up(&lockres->l_event); 1284 } 1285 1286 /* Note: If we detect another process working on the lock (i.e., 1287 * OCFS2_LOCK_BUSY), we'll bail out returning 0. It's up to the caller 1288 * to do the right thing in that case. 1289 */ 1290 static int ocfs2_lock_create(struct ocfs2_super *osb, 1291 struct ocfs2_lock_res *lockres, 1292 int level, 1293 u32 dlm_flags) 1294 { 1295 int ret = 0; 1296 unsigned long flags; 1297 unsigned int gen; 1298 1299 mlog(0, "lock %s, level = %d, flags = %u\n", lockres->l_name, level, 1300 dlm_flags); 1301 1302 spin_lock_irqsave(&lockres->l_lock, flags); 1303 if ((lockres->l_flags & OCFS2_LOCK_ATTACHED) || 1304 (lockres->l_flags & OCFS2_LOCK_BUSY)) { 1305 spin_unlock_irqrestore(&lockres->l_lock, flags); 1306 goto bail; 1307 } 1308 1309 lockres->l_action = OCFS2_AST_ATTACH; 1310 lockres->l_requested = level; 1311 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 1312 gen = lockres_set_pending(lockres); 1313 spin_unlock_irqrestore(&lockres->l_lock, flags); 1314 1315 ret = ocfs2_dlm_lock(osb->cconn, 1316 level, 1317 &lockres->l_lksb, 1318 dlm_flags, 1319 lockres->l_name, 1320 OCFS2_LOCK_ID_MAX_LEN - 1); 1321 lockres_clear_pending(lockres, gen, osb); 1322 if (ret) { 1323 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 1324 ocfs2_recover_from_dlm_error(lockres, 1); 1325 } 1326 1327 mlog(0, "lock %s, return from ocfs2_dlm_lock\n", lockres->l_name); 1328 1329 bail: 1330 return ret; 1331 } 1332 1333 static inline int ocfs2_check_wait_flag(struct ocfs2_lock_res *lockres, 1334 int flag) 1335 { 1336 unsigned long flags; 1337 int ret; 1338 1339 spin_lock_irqsave(&lockres->l_lock, flags); 1340 ret = lockres->l_flags & flag; 1341 spin_unlock_irqrestore(&lockres->l_lock, flags); 1342 1343 return ret; 1344 } 1345 1346 static inline void ocfs2_wait_on_busy_lock(struct ocfs2_lock_res *lockres) 1347 1348 { 1349 wait_event(lockres->l_event, 1350 !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_BUSY)); 1351 } 1352 1353 static inline void ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res *lockres) 1354 1355 { 1356 wait_event(lockres->l_event, 1357 !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_REFRESHING)); 1358 } 1359 1360 /* predict what lock level we'll be dropping down to on behalf 1361 * of another node, and return true if the currently wanted 1362 * level will be compatible with it. */ 1363 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres, 1364 int wanted) 1365 { 1366 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); 1367 1368 return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking); 1369 } 1370 1371 static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw) 1372 { 1373 INIT_LIST_HEAD(&mw->mw_item); 1374 init_completion(&mw->mw_complete); 1375 ocfs2_init_start_time(mw); 1376 } 1377 1378 static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw) 1379 { 1380 wait_for_completion(&mw->mw_complete); 1381 /* Re-arm the completion in case we want to wait on it again */ 1382 reinit_completion(&mw->mw_complete); 1383 return mw->mw_status; 1384 } 1385 1386 static void lockres_add_mask_waiter(struct ocfs2_lock_res *lockres, 1387 struct ocfs2_mask_waiter *mw, 1388 unsigned long mask, 1389 unsigned long goal) 1390 { 1391 BUG_ON(!list_empty(&mw->mw_item)); 1392 1393 assert_spin_locked(&lockres->l_lock); 1394 1395 list_add_tail(&mw->mw_item, &lockres->l_mask_waiters); 1396 mw->mw_mask = mask; 1397 mw->mw_goal = goal; 1398 } 1399 1400 /* returns 0 if the mw that was removed was already satisfied, -EBUSY 1401 * if the mask still hadn't reached its goal */ 1402 static int __lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres, 1403 struct ocfs2_mask_waiter *mw) 1404 { 1405 int ret = 0; 1406 1407 assert_spin_locked(&lockres->l_lock); 1408 if (!list_empty(&mw->mw_item)) { 1409 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal) 1410 ret = -EBUSY; 1411 1412 list_del_init(&mw->mw_item); 1413 init_completion(&mw->mw_complete); 1414 } 1415 1416 return ret; 1417 } 1418 1419 static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres, 1420 struct ocfs2_mask_waiter *mw) 1421 { 1422 unsigned long flags; 1423 int ret = 0; 1424 1425 spin_lock_irqsave(&lockres->l_lock, flags); 1426 ret = __lockres_remove_mask_waiter(lockres, mw); 1427 spin_unlock_irqrestore(&lockres->l_lock, flags); 1428 1429 return ret; 1430 1431 } 1432 1433 static int ocfs2_wait_for_mask_interruptible(struct ocfs2_mask_waiter *mw, 1434 struct ocfs2_lock_res *lockres) 1435 { 1436 int ret; 1437 1438 ret = wait_for_completion_interruptible(&mw->mw_complete); 1439 if (ret) 1440 lockres_remove_mask_waiter(lockres, mw); 1441 else 1442 ret = mw->mw_status; 1443 /* Re-arm the completion in case we want to wait on it again */ 1444 reinit_completion(&mw->mw_complete); 1445 return ret; 1446 } 1447 1448 static int __ocfs2_cluster_lock(struct ocfs2_super *osb, 1449 struct ocfs2_lock_res *lockres, 1450 int level, 1451 u32 lkm_flags, 1452 int arg_flags, 1453 int l_subclass, 1454 unsigned long caller_ip) 1455 { 1456 struct ocfs2_mask_waiter mw; 1457 int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR); 1458 int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */ 1459 unsigned long flags; 1460 unsigned int gen; 1461 int noqueue_attempted = 0; 1462 int dlm_locked = 0; 1463 int kick_dc = 0; 1464 1465 if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED)) { 1466 mlog_errno(-EINVAL); 1467 return -EINVAL; 1468 } 1469 1470 ocfs2_init_mask_waiter(&mw); 1471 1472 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) 1473 lkm_flags |= DLM_LKF_VALBLK; 1474 1475 again: 1476 wait = 0; 1477 1478 spin_lock_irqsave(&lockres->l_lock, flags); 1479 1480 if (catch_signals && signal_pending(current)) { 1481 ret = -ERESTARTSYS; 1482 goto unlock; 1483 } 1484 1485 mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING, 1486 "Cluster lock called on freeing lockres %s! flags " 1487 "0x%lx\n", lockres->l_name, lockres->l_flags); 1488 1489 /* We only compare against the currently granted level 1490 * here. If the lock is blocked waiting on a downconvert, 1491 * we'll get caught below. */ 1492 if (lockres->l_flags & OCFS2_LOCK_BUSY && 1493 level > lockres->l_level) { 1494 /* is someone sitting in dlm_lock? If so, wait on 1495 * them. */ 1496 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1497 wait = 1; 1498 goto unlock; 1499 } 1500 1501 if (lockres->l_flags & OCFS2_LOCK_UPCONVERT_FINISHING) { 1502 /* 1503 * We've upconverted. If the lock now has a level we can 1504 * work with, we take it. If, however, the lock is not at the 1505 * required level, we go thru the full cycle. One way this could 1506 * happen is if a process requesting an upconvert to PR is 1507 * closely followed by another requesting upconvert to an EX. 1508 * If the process requesting EX lands here, we want it to 1509 * continue attempting to upconvert and let the process 1510 * requesting PR take the lock. 1511 * If multiple processes request upconvert to PR, the first one 1512 * here will take the lock. The others will have to go thru the 1513 * OCFS2_LOCK_BLOCKED check to ensure that there is no pending 1514 * downconvert request. 1515 */ 1516 if (level <= lockres->l_level) 1517 goto update_holders; 1518 } 1519 1520 if (lockres->l_flags & OCFS2_LOCK_BLOCKED && 1521 !ocfs2_may_continue_on_blocked_lock(lockres, level)) { 1522 /* is the lock is currently blocked on behalf of 1523 * another node */ 1524 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BLOCKED, 0); 1525 wait = 1; 1526 goto unlock; 1527 } 1528 1529 if (level > lockres->l_level) { 1530 if (noqueue_attempted > 0) { 1531 ret = -EAGAIN; 1532 goto unlock; 1533 } 1534 if (lkm_flags & DLM_LKF_NOQUEUE) 1535 noqueue_attempted = 1; 1536 1537 if (lockres->l_action != OCFS2_AST_INVALID) 1538 mlog(ML_ERROR, "lockres %s has action %u pending\n", 1539 lockres->l_name, lockres->l_action); 1540 1541 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 1542 lockres->l_action = OCFS2_AST_ATTACH; 1543 lkm_flags &= ~DLM_LKF_CONVERT; 1544 } else { 1545 lockres->l_action = OCFS2_AST_CONVERT; 1546 lkm_flags |= DLM_LKF_CONVERT; 1547 } 1548 1549 lockres->l_requested = level; 1550 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 1551 gen = lockres_set_pending(lockres); 1552 spin_unlock_irqrestore(&lockres->l_lock, flags); 1553 1554 BUG_ON(level == DLM_LOCK_IV); 1555 BUG_ON(level == DLM_LOCK_NL); 1556 1557 mlog(ML_BASTS, "lockres %s, convert from %d to %d\n", 1558 lockres->l_name, lockres->l_level, level); 1559 1560 /* call dlm_lock to upgrade lock now */ 1561 ret = ocfs2_dlm_lock(osb->cconn, 1562 level, 1563 &lockres->l_lksb, 1564 lkm_flags, 1565 lockres->l_name, 1566 OCFS2_LOCK_ID_MAX_LEN - 1); 1567 lockres_clear_pending(lockres, gen, osb); 1568 if (ret) { 1569 if (!(lkm_flags & DLM_LKF_NOQUEUE) || 1570 (ret != -EAGAIN)) { 1571 ocfs2_log_dlm_error("ocfs2_dlm_lock", 1572 ret, lockres); 1573 } 1574 ocfs2_recover_from_dlm_error(lockres, 1); 1575 goto out; 1576 } 1577 dlm_locked = 1; 1578 1579 mlog(0, "lock %s, successful return from ocfs2_dlm_lock\n", 1580 lockres->l_name); 1581 1582 /* At this point we've gone inside the dlm and need to 1583 * complete our work regardless. */ 1584 catch_signals = 0; 1585 1586 /* wait for busy to clear and carry on */ 1587 goto again; 1588 } 1589 1590 update_holders: 1591 /* Ok, if we get here then we're good to go. */ 1592 ocfs2_inc_holders(lockres, level); 1593 1594 ret = 0; 1595 unlock: 1596 lockres_clear_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING); 1597 1598 /* ocfs2_unblock_lock reques on seeing OCFS2_LOCK_UPCONVERT_FINISHING */ 1599 kick_dc = (lockres->l_flags & OCFS2_LOCK_BLOCKED); 1600 1601 spin_unlock_irqrestore(&lockres->l_lock, flags); 1602 if (kick_dc) 1603 ocfs2_wake_downconvert_thread(osb); 1604 out: 1605 /* 1606 * This is helping work around a lock inversion between the page lock 1607 * and dlm locks. One path holds the page lock while calling aops 1608 * which block acquiring dlm locks. The voting thread holds dlm 1609 * locks while acquiring page locks while down converting data locks. 1610 * This block is helping an aop path notice the inversion and back 1611 * off to unlock its page lock before trying the dlm lock again. 1612 */ 1613 if (wait && arg_flags & OCFS2_LOCK_NONBLOCK && 1614 mw.mw_mask & (OCFS2_LOCK_BUSY|OCFS2_LOCK_BLOCKED)) { 1615 wait = 0; 1616 spin_lock_irqsave(&lockres->l_lock, flags); 1617 if (__lockres_remove_mask_waiter(lockres, &mw)) { 1618 if (dlm_locked) 1619 lockres_or_flags(lockres, 1620 OCFS2_LOCK_NONBLOCK_FINISHED); 1621 spin_unlock_irqrestore(&lockres->l_lock, flags); 1622 ret = -EAGAIN; 1623 } else { 1624 spin_unlock_irqrestore(&lockres->l_lock, flags); 1625 goto again; 1626 } 1627 } 1628 if (wait) { 1629 ret = ocfs2_wait_for_mask(&mw); 1630 if (ret == 0) 1631 goto again; 1632 mlog_errno(ret); 1633 } 1634 ocfs2_update_lock_stats(lockres, level, &mw, ret); 1635 1636 #ifdef CONFIG_DEBUG_LOCK_ALLOC 1637 if (!ret && lockres->l_lockdep_map.key != NULL) { 1638 if (level == DLM_LOCK_PR) 1639 rwsem_acquire_read(&lockres->l_lockdep_map, l_subclass, 1640 !!(arg_flags & OCFS2_META_LOCK_NOQUEUE), 1641 caller_ip); 1642 else 1643 rwsem_acquire(&lockres->l_lockdep_map, l_subclass, 1644 !!(arg_flags & OCFS2_META_LOCK_NOQUEUE), 1645 caller_ip); 1646 } 1647 #endif 1648 return ret; 1649 } 1650 1651 static inline int ocfs2_cluster_lock(struct ocfs2_super *osb, 1652 struct ocfs2_lock_res *lockres, 1653 int level, 1654 u32 lkm_flags, 1655 int arg_flags) 1656 { 1657 return __ocfs2_cluster_lock(osb, lockres, level, lkm_flags, arg_flags, 1658 0, _RET_IP_); 1659 } 1660 1661 1662 static void __ocfs2_cluster_unlock(struct ocfs2_super *osb, 1663 struct ocfs2_lock_res *lockres, 1664 int level, 1665 unsigned long caller_ip) 1666 { 1667 unsigned long flags; 1668 1669 spin_lock_irqsave(&lockres->l_lock, flags); 1670 ocfs2_dec_holders(lockres, level); 1671 ocfs2_downconvert_on_unlock(osb, lockres); 1672 spin_unlock_irqrestore(&lockres->l_lock, flags); 1673 #ifdef CONFIG_DEBUG_LOCK_ALLOC 1674 if (lockres->l_lockdep_map.key != NULL) 1675 rwsem_release(&lockres->l_lockdep_map, 1, caller_ip); 1676 #endif 1677 } 1678 1679 static int ocfs2_create_new_lock(struct ocfs2_super *osb, 1680 struct ocfs2_lock_res *lockres, 1681 int ex, 1682 int local) 1683 { 1684 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 1685 unsigned long flags; 1686 u32 lkm_flags = local ? DLM_LKF_LOCAL : 0; 1687 1688 spin_lock_irqsave(&lockres->l_lock, flags); 1689 BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED); 1690 lockres_or_flags(lockres, OCFS2_LOCK_LOCAL); 1691 spin_unlock_irqrestore(&lockres->l_lock, flags); 1692 1693 return ocfs2_lock_create(osb, lockres, level, lkm_flags); 1694 } 1695 1696 /* Grants us an EX lock on the data and metadata resources, skipping 1697 * the normal cluster directory lookup. Use this ONLY on newly created 1698 * inodes which other nodes can't possibly see, and which haven't been 1699 * hashed in the inode hash yet. This can give us a good performance 1700 * increase as it'll skip the network broadcast normally associated 1701 * with creating a new lock resource. */ 1702 int ocfs2_create_new_inode_locks(struct inode *inode) 1703 { 1704 int ret; 1705 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1706 1707 BUG_ON(!ocfs2_inode_is_new(inode)); 1708 1709 mlog(0, "Inode %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno); 1710 1711 /* NOTE: That we don't increment any of the holder counts, nor 1712 * do we add anything to a journal handle. Since this is 1713 * supposed to be a new inode which the cluster doesn't know 1714 * about yet, there is no need to. As far as the LVB handling 1715 * is concerned, this is basically like acquiring an EX lock 1716 * on a resource which has an invalid one -- we'll set it 1717 * valid when we release the EX. */ 1718 1719 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1, 1); 1720 if (ret) { 1721 mlog_errno(ret); 1722 goto bail; 1723 } 1724 1725 /* 1726 * We don't want to use DLM_LKF_LOCAL on a meta data lock as they 1727 * don't use a generation in their lock names. 1728 */ 1729 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_inode_lockres, 1, 0); 1730 if (ret) { 1731 mlog_errno(ret); 1732 goto bail; 1733 } 1734 1735 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_open_lockres, 0, 0); 1736 if (ret) 1737 mlog_errno(ret); 1738 1739 bail: 1740 return ret; 1741 } 1742 1743 int ocfs2_rw_lock(struct inode *inode, int write) 1744 { 1745 int status, level; 1746 struct ocfs2_lock_res *lockres; 1747 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1748 1749 mlog(0, "inode %llu take %s RW lock\n", 1750 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1751 write ? "EXMODE" : "PRMODE"); 1752 1753 if (ocfs2_mount_local(osb)) 1754 return 0; 1755 1756 lockres = &OCFS2_I(inode)->ip_rw_lockres; 1757 1758 level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1759 1760 status = ocfs2_cluster_lock(osb, lockres, level, 0, 0); 1761 if (status < 0) 1762 mlog_errno(status); 1763 1764 return status; 1765 } 1766 1767 int ocfs2_try_rw_lock(struct inode *inode, int write) 1768 { 1769 int status, level; 1770 struct ocfs2_lock_res *lockres; 1771 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1772 1773 mlog(0, "inode %llu try to take %s RW lock\n", 1774 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1775 write ? "EXMODE" : "PRMODE"); 1776 1777 if (ocfs2_mount_local(osb)) 1778 return 0; 1779 1780 lockres = &OCFS2_I(inode)->ip_rw_lockres; 1781 1782 level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1783 1784 status = ocfs2_cluster_lock(osb, lockres, level, DLM_LKF_NOQUEUE, 0); 1785 return status; 1786 } 1787 1788 void ocfs2_rw_unlock(struct inode *inode, int write) 1789 { 1790 int level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1791 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres; 1792 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1793 1794 mlog(0, "inode %llu drop %s RW lock\n", 1795 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1796 write ? "EXMODE" : "PRMODE"); 1797 1798 if (!ocfs2_mount_local(osb)) 1799 ocfs2_cluster_unlock(osb, lockres, level); 1800 } 1801 1802 /* 1803 * ocfs2_open_lock always get PR mode lock. 1804 */ 1805 int ocfs2_open_lock(struct inode *inode) 1806 { 1807 int status = 0; 1808 struct ocfs2_lock_res *lockres; 1809 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1810 1811 mlog(0, "inode %llu take PRMODE open lock\n", 1812 (unsigned long long)OCFS2_I(inode)->ip_blkno); 1813 1814 if (ocfs2_is_hard_readonly(osb) || ocfs2_mount_local(osb)) 1815 goto out; 1816 1817 lockres = &OCFS2_I(inode)->ip_open_lockres; 1818 1819 status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_PR, 0, 0); 1820 if (status < 0) 1821 mlog_errno(status); 1822 1823 out: 1824 return status; 1825 } 1826 1827 int ocfs2_try_open_lock(struct inode *inode, int write) 1828 { 1829 int status = 0, level; 1830 struct ocfs2_lock_res *lockres; 1831 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1832 1833 mlog(0, "inode %llu try to take %s open lock\n", 1834 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1835 write ? "EXMODE" : "PRMODE"); 1836 1837 if (ocfs2_is_hard_readonly(osb)) { 1838 if (write) 1839 status = -EROFS; 1840 goto out; 1841 } 1842 1843 if (ocfs2_mount_local(osb)) 1844 goto out; 1845 1846 lockres = &OCFS2_I(inode)->ip_open_lockres; 1847 1848 level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1849 1850 /* 1851 * The file system may already holding a PRMODE/EXMODE open lock. 1852 * Since we pass DLM_LKF_NOQUEUE, the request won't block waiting on 1853 * other nodes and the -EAGAIN will indicate to the caller that 1854 * this inode is still in use. 1855 */ 1856 status = ocfs2_cluster_lock(osb, lockres, level, DLM_LKF_NOQUEUE, 0); 1857 1858 out: 1859 return status; 1860 } 1861 1862 /* 1863 * ocfs2_open_unlock unlock PR and EX mode open locks. 1864 */ 1865 void ocfs2_open_unlock(struct inode *inode) 1866 { 1867 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_open_lockres; 1868 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1869 1870 mlog(0, "inode %llu drop open lock\n", 1871 (unsigned long long)OCFS2_I(inode)->ip_blkno); 1872 1873 if (ocfs2_mount_local(osb)) 1874 goto out; 1875 1876 if(lockres->l_ro_holders) 1877 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_PR); 1878 if(lockres->l_ex_holders) 1879 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX); 1880 1881 out: 1882 return; 1883 } 1884 1885 static int ocfs2_flock_handle_signal(struct ocfs2_lock_res *lockres, 1886 int level) 1887 { 1888 int ret; 1889 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 1890 unsigned long flags; 1891 struct ocfs2_mask_waiter mw; 1892 1893 ocfs2_init_mask_waiter(&mw); 1894 1895 retry_cancel: 1896 spin_lock_irqsave(&lockres->l_lock, flags); 1897 if (lockres->l_flags & OCFS2_LOCK_BUSY) { 1898 ret = ocfs2_prepare_cancel_convert(osb, lockres); 1899 if (ret) { 1900 spin_unlock_irqrestore(&lockres->l_lock, flags); 1901 ret = ocfs2_cancel_convert(osb, lockres); 1902 if (ret < 0) { 1903 mlog_errno(ret); 1904 goto out; 1905 } 1906 goto retry_cancel; 1907 } 1908 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1909 spin_unlock_irqrestore(&lockres->l_lock, flags); 1910 1911 ocfs2_wait_for_mask(&mw); 1912 goto retry_cancel; 1913 } 1914 1915 ret = -ERESTARTSYS; 1916 /* 1917 * We may still have gotten the lock, in which case there's no 1918 * point to restarting the syscall. 1919 */ 1920 if (lockres->l_level == level) 1921 ret = 0; 1922 1923 mlog(0, "Cancel returning %d. flags: 0x%lx, level: %d, act: %d\n", ret, 1924 lockres->l_flags, lockres->l_level, lockres->l_action); 1925 1926 spin_unlock_irqrestore(&lockres->l_lock, flags); 1927 1928 out: 1929 return ret; 1930 } 1931 1932 /* 1933 * ocfs2_file_lock() and ocfs2_file_unlock() map to a single pair of 1934 * flock() calls. The locking approach this requires is sufficiently 1935 * different from all other cluster lock types that we implement a 1936 * separate path to the "low-level" dlm calls. In particular: 1937 * 1938 * - No optimization of lock levels is done - we take at exactly 1939 * what's been requested. 1940 * 1941 * - No lock caching is employed. We immediately downconvert to 1942 * no-lock at unlock time. This also means flock locks never go on 1943 * the blocking list). 1944 * 1945 * - Since userspace can trivially deadlock itself with flock, we make 1946 * sure to allow cancellation of a misbehaving applications flock() 1947 * request. 1948 * 1949 * - Access to any flock lockres doesn't require concurrency, so we 1950 * can simplify the code by requiring the caller to guarantee 1951 * serialization of dlmglue flock calls. 1952 */ 1953 int ocfs2_file_lock(struct file *file, int ex, int trylock) 1954 { 1955 int ret, level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 1956 unsigned int lkm_flags = trylock ? DLM_LKF_NOQUEUE : 0; 1957 unsigned long flags; 1958 struct ocfs2_file_private *fp = file->private_data; 1959 struct ocfs2_lock_res *lockres = &fp->fp_flock; 1960 struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb); 1961 struct ocfs2_mask_waiter mw; 1962 1963 ocfs2_init_mask_waiter(&mw); 1964 1965 if ((lockres->l_flags & OCFS2_LOCK_BUSY) || 1966 (lockres->l_level > DLM_LOCK_NL)) { 1967 mlog(ML_ERROR, 1968 "File lock \"%s\" has busy or locked state: flags: 0x%lx, " 1969 "level: %u\n", lockres->l_name, lockres->l_flags, 1970 lockres->l_level); 1971 return -EINVAL; 1972 } 1973 1974 spin_lock_irqsave(&lockres->l_lock, flags); 1975 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 1976 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1977 spin_unlock_irqrestore(&lockres->l_lock, flags); 1978 1979 /* 1980 * Get the lock at NLMODE to start - that way we 1981 * can cancel the upconvert request if need be. 1982 */ 1983 ret = ocfs2_lock_create(osb, lockres, DLM_LOCK_NL, 0); 1984 if (ret < 0) { 1985 mlog_errno(ret); 1986 goto out; 1987 } 1988 1989 ret = ocfs2_wait_for_mask(&mw); 1990 if (ret) { 1991 mlog_errno(ret); 1992 goto out; 1993 } 1994 spin_lock_irqsave(&lockres->l_lock, flags); 1995 } 1996 1997 lockres->l_action = OCFS2_AST_CONVERT; 1998 lkm_flags |= DLM_LKF_CONVERT; 1999 lockres->l_requested = level; 2000 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 2001 2002 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 2003 spin_unlock_irqrestore(&lockres->l_lock, flags); 2004 2005 ret = ocfs2_dlm_lock(osb->cconn, level, &lockres->l_lksb, lkm_flags, 2006 lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1); 2007 if (ret) { 2008 if (!trylock || (ret != -EAGAIN)) { 2009 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 2010 ret = -EINVAL; 2011 } 2012 2013 ocfs2_recover_from_dlm_error(lockres, 1); 2014 lockres_remove_mask_waiter(lockres, &mw); 2015 goto out; 2016 } 2017 2018 ret = ocfs2_wait_for_mask_interruptible(&mw, lockres); 2019 if (ret == -ERESTARTSYS) { 2020 /* 2021 * Userspace can cause deadlock itself with 2022 * flock(). Current behavior locally is to allow the 2023 * deadlock, but abort the system call if a signal is 2024 * received. We follow this example, otherwise a 2025 * poorly written program could sit in kernel until 2026 * reboot. 2027 * 2028 * Handling this is a bit more complicated for Ocfs2 2029 * though. We can't exit this function with an 2030 * outstanding lock request, so a cancel convert is 2031 * required. We intentionally overwrite 'ret' - if the 2032 * cancel fails and the lock was granted, it's easier 2033 * to just bubble success back up to the user. 2034 */ 2035 ret = ocfs2_flock_handle_signal(lockres, level); 2036 } else if (!ret && (level > lockres->l_level)) { 2037 /* Trylock failed asynchronously */ 2038 BUG_ON(!trylock); 2039 ret = -EAGAIN; 2040 } 2041 2042 out: 2043 2044 mlog(0, "Lock: \"%s\" ex: %d, trylock: %d, returns: %d\n", 2045 lockres->l_name, ex, trylock, ret); 2046 return ret; 2047 } 2048 2049 void ocfs2_file_unlock(struct file *file) 2050 { 2051 int ret; 2052 unsigned int gen; 2053 unsigned long flags; 2054 struct ocfs2_file_private *fp = file->private_data; 2055 struct ocfs2_lock_res *lockres = &fp->fp_flock; 2056 struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb); 2057 struct ocfs2_mask_waiter mw; 2058 2059 ocfs2_init_mask_waiter(&mw); 2060 2061 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) 2062 return; 2063 2064 if (lockres->l_level == DLM_LOCK_NL) 2065 return; 2066 2067 mlog(0, "Unlock: \"%s\" flags: 0x%lx, level: %d, act: %d\n", 2068 lockres->l_name, lockres->l_flags, lockres->l_level, 2069 lockres->l_action); 2070 2071 spin_lock_irqsave(&lockres->l_lock, flags); 2072 /* 2073 * Fake a blocking ast for the downconvert code. 2074 */ 2075 lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED); 2076 lockres->l_blocking = DLM_LOCK_EX; 2077 2078 gen = ocfs2_prepare_downconvert(lockres, DLM_LOCK_NL); 2079 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 2080 spin_unlock_irqrestore(&lockres->l_lock, flags); 2081 2082 ret = ocfs2_downconvert_lock(osb, lockres, DLM_LOCK_NL, 0, gen); 2083 if (ret) { 2084 mlog_errno(ret); 2085 return; 2086 } 2087 2088 ret = ocfs2_wait_for_mask(&mw); 2089 if (ret) 2090 mlog_errno(ret); 2091 } 2092 2093 static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb, 2094 struct ocfs2_lock_res *lockres) 2095 { 2096 int kick = 0; 2097 2098 /* If we know that another node is waiting on our lock, kick 2099 * the downconvert thread * pre-emptively when we reach a release 2100 * condition. */ 2101 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) { 2102 switch(lockres->l_blocking) { 2103 case DLM_LOCK_EX: 2104 if (!lockres->l_ex_holders && !lockres->l_ro_holders) 2105 kick = 1; 2106 break; 2107 case DLM_LOCK_PR: 2108 if (!lockres->l_ex_holders) 2109 kick = 1; 2110 break; 2111 default: 2112 BUG(); 2113 } 2114 } 2115 2116 if (kick) 2117 ocfs2_wake_downconvert_thread(osb); 2118 } 2119 2120 #define OCFS2_SEC_BITS 34 2121 #define OCFS2_SEC_SHIFT (64 - 34) 2122 #define OCFS2_NSEC_MASK ((1ULL << OCFS2_SEC_SHIFT) - 1) 2123 2124 /* LVB only has room for 64 bits of time here so we pack it for 2125 * now. */ 2126 static u64 ocfs2_pack_timespec(struct timespec *spec) 2127 { 2128 u64 res; 2129 u64 sec = spec->tv_sec; 2130 u32 nsec = spec->tv_nsec; 2131 2132 res = (sec << OCFS2_SEC_SHIFT) | (nsec & OCFS2_NSEC_MASK); 2133 2134 return res; 2135 } 2136 2137 /* Call this with the lockres locked. I am reasonably sure we don't 2138 * need ip_lock in this function as anyone who would be changing those 2139 * values is supposed to be blocked in ocfs2_inode_lock right now. */ 2140 static void __ocfs2_stuff_meta_lvb(struct inode *inode) 2141 { 2142 struct ocfs2_inode_info *oi = OCFS2_I(inode); 2143 struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; 2144 struct ocfs2_meta_lvb *lvb; 2145 struct timespec ts; 2146 2147 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2148 2149 /* 2150 * Invalidate the LVB of a deleted inode - this way other 2151 * nodes are forced to go to disk and discover the new inode 2152 * status. 2153 */ 2154 if (oi->ip_flags & OCFS2_INODE_DELETED) { 2155 lvb->lvb_version = 0; 2156 goto out; 2157 } 2158 2159 lvb->lvb_version = OCFS2_LVB_VERSION; 2160 lvb->lvb_isize = cpu_to_be64(i_size_read(inode)); 2161 lvb->lvb_iclusters = cpu_to_be32(oi->ip_clusters); 2162 lvb->lvb_iuid = cpu_to_be32(i_uid_read(inode)); 2163 lvb->lvb_igid = cpu_to_be32(i_gid_read(inode)); 2164 lvb->lvb_imode = cpu_to_be16(inode->i_mode); 2165 lvb->lvb_inlink = cpu_to_be16(inode->i_nlink); 2166 ts = timespec64_to_timespec(inode->i_atime); 2167 lvb->lvb_iatime_packed = 2168 cpu_to_be64(ocfs2_pack_timespec(&ts)); 2169 ts = timespec64_to_timespec(inode->i_ctime); 2170 lvb->lvb_ictime_packed = 2171 cpu_to_be64(ocfs2_pack_timespec(&ts)); 2172 ts = timespec64_to_timespec(inode->i_mtime); 2173 lvb->lvb_imtime_packed = 2174 cpu_to_be64(ocfs2_pack_timespec(&ts)); 2175 lvb->lvb_iattr = cpu_to_be32(oi->ip_attr); 2176 lvb->lvb_idynfeatures = cpu_to_be16(oi->ip_dyn_features); 2177 lvb->lvb_igeneration = cpu_to_be32(inode->i_generation); 2178 2179 out: 2180 mlog_meta_lvb(0, lockres); 2181 } 2182 2183 static void ocfs2_unpack_timespec(struct timespec *spec, 2184 u64 packed_time) 2185 { 2186 spec->tv_sec = packed_time >> OCFS2_SEC_SHIFT; 2187 spec->tv_nsec = packed_time & OCFS2_NSEC_MASK; 2188 } 2189 2190 static void ocfs2_refresh_inode_from_lvb(struct inode *inode) 2191 { 2192 struct timespec ts; 2193 struct ocfs2_inode_info *oi = OCFS2_I(inode); 2194 struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; 2195 struct ocfs2_meta_lvb *lvb; 2196 2197 mlog_meta_lvb(0, lockres); 2198 2199 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2200 2201 /* We're safe here without the lockres lock... */ 2202 spin_lock(&oi->ip_lock); 2203 oi->ip_clusters = be32_to_cpu(lvb->lvb_iclusters); 2204 i_size_write(inode, be64_to_cpu(lvb->lvb_isize)); 2205 2206 oi->ip_attr = be32_to_cpu(lvb->lvb_iattr); 2207 oi->ip_dyn_features = be16_to_cpu(lvb->lvb_idynfeatures); 2208 ocfs2_set_inode_flags(inode); 2209 2210 /* fast-symlinks are a special case */ 2211 if (S_ISLNK(inode->i_mode) && !oi->ip_clusters) 2212 inode->i_blocks = 0; 2213 else 2214 inode->i_blocks = ocfs2_inode_sector_count(inode); 2215 2216 i_uid_write(inode, be32_to_cpu(lvb->lvb_iuid)); 2217 i_gid_write(inode, be32_to_cpu(lvb->lvb_igid)); 2218 inode->i_mode = be16_to_cpu(lvb->lvb_imode); 2219 set_nlink(inode, be16_to_cpu(lvb->lvb_inlink)); 2220 ocfs2_unpack_timespec(&ts, 2221 be64_to_cpu(lvb->lvb_iatime_packed)); 2222 inode->i_atime = timespec_to_timespec64(ts); 2223 ocfs2_unpack_timespec(&ts, 2224 be64_to_cpu(lvb->lvb_imtime_packed)); 2225 inode->i_mtime = timespec_to_timespec64(ts); 2226 ocfs2_unpack_timespec(&ts, 2227 be64_to_cpu(lvb->lvb_ictime_packed)); 2228 inode->i_ctime = timespec_to_timespec64(ts); 2229 spin_unlock(&oi->ip_lock); 2230 } 2231 2232 static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode, 2233 struct ocfs2_lock_res *lockres) 2234 { 2235 struct ocfs2_meta_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2236 2237 if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) 2238 && lvb->lvb_version == OCFS2_LVB_VERSION 2239 && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation) 2240 return 1; 2241 return 0; 2242 } 2243 2244 /* Determine whether a lock resource needs to be refreshed, and 2245 * arbitrate who gets to refresh it. 2246 * 2247 * 0 means no refresh needed. 2248 * 2249 * > 0 means you need to refresh this and you MUST call 2250 * ocfs2_complete_lock_res_refresh afterwards. */ 2251 static int ocfs2_should_refresh_lock_res(struct ocfs2_lock_res *lockres) 2252 { 2253 unsigned long flags; 2254 int status = 0; 2255 2256 refresh_check: 2257 spin_lock_irqsave(&lockres->l_lock, flags); 2258 if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) { 2259 spin_unlock_irqrestore(&lockres->l_lock, flags); 2260 goto bail; 2261 } 2262 2263 if (lockres->l_flags & OCFS2_LOCK_REFRESHING) { 2264 spin_unlock_irqrestore(&lockres->l_lock, flags); 2265 2266 ocfs2_wait_on_refreshing_lock(lockres); 2267 goto refresh_check; 2268 } 2269 2270 /* Ok, I'll be the one to refresh this lock. */ 2271 lockres_or_flags(lockres, OCFS2_LOCK_REFRESHING); 2272 spin_unlock_irqrestore(&lockres->l_lock, flags); 2273 2274 status = 1; 2275 bail: 2276 mlog(0, "status %d\n", status); 2277 return status; 2278 } 2279 2280 /* If status is non zero, I'll mark it as not being in refresh 2281 * anymroe, but i won't clear the needs refresh flag. */ 2282 static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockres, 2283 int status) 2284 { 2285 unsigned long flags; 2286 2287 spin_lock_irqsave(&lockres->l_lock, flags); 2288 lockres_clear_flags(lockres, OCFS2_LOCK_REFRESHING); 2289 if (!status) 2290 lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 2291 spin_unlock_irqrestore(&lockres->l_lock, flags); 2292 2293 wake_up(&lockres->l_event); 2294 } 2295 2296 /* may or may not return a bh if it went to disk. */ 2297 static int ocfs2_inode_lock_update(struct inode *inode, 2298 struct buffer_head **bh) 2299 { 2300 int status = 0; 2301 struct ocfs2_inode_info *oi = OCFS2_I(inode); 2302 struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; 2303 struct ocfs2_dinode *fe; 2304 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 2305 2306 if (ocfs2_mount_local(osb)) 2307 goto bail; 2308 2309 spin_lock(&oi->ip_lock); 2310 if (oi->ip_flags & OCFS2_INODE_DELETED) { 2311 mlog(0, "Orphaned inode %llu was deleted while we " 2312 "were waiting on a lock. ip_flags = 0x%x\n", 2313 (unsigned long long)oi->ip_blkno, oi->ip_flags); 2314 spin_unlock(&oi->ip_lock); 2315 status = -ENOENT; 2316 goto bail; 2317 } 2318 spin_unlock(&oi->ip_lock); 2319 2320 if (!ocfs2_should_refresh_lock_res(lockres)) 2321 goto bail; 2322 2323 /* This will discard any caching information we might have had 2324 * for the inode metadata. */ 2325 ocfs2_metadata_cache_purge(INODE_CACHE(inode)); 2326 2327 ocfs2_extent_map_trunc(inode, 0); 2328 2329 if (ocfs2_meta_lvb_is_trustable(inode, lockres)) { 2330 mlog(0, "Trusting LVB on inode %llu\n", 2331 (unsigned long long)oi->ip_blkno); 2332 ocfs2_refresh_inode_from_lvb(inode); 2333 } else { 2334 /* Boo, we have to go to disk. */ 2335 /* read bh, cast, ocfs2_refresh_inode */ 2336 status = ocfs2_read_inode_block(inode, bh); 2337 if (status < 0) { 2338 mlog_errno(status); 2339 goto bail_refresh; 2340 } 2341 fe = (struct ocfs2_dinode *) (*bh)->b_data; 2342 2343 /* This is a good chance to make sure we're not 2344 * locking an invalid object. ocfs2_read_inode_block() 2345 * already checked that the inode block is sane. 2346 * 2347 * We bug on a stale inode here because we checked 2348 * above whether it was wiped from disk. The wiping 2349 * node provides a guarantee that we receive that 2350 * message and can mark the inode before dropping any 2351 * locks associated with it. */ 2352 mlog_bug_on_msg(inode->i_generation != 2353 le32_to_cpu(fe->i_generation), 2354 "Invalid dinode %llu disk generation: %u " 2355 "inode->i_generation: %u\n", 2356 (unsigned long long)oi->ip_blkno, 2357 le32_to_cpu(fe->i_generation), 2358 inode->i_generation); 2359 mlog_bug_on_msg(le64_to_cpu(fe->i_dtime) || 2360 !(fe->i_flags & cpu_to_le32(OCFS2_VALID_FL)), 2361 "Stale dinode %llu dtime: %llu flags: 0x%x\n", 2362 (unsigned long long)oi->ip_blkno, 2363 (unsigned long long)le64_to_cpu(fe->i_dtime), 2364 le32_to_cpu(fe->i_flags)); 2365 2366 ocfs2_refresh_inode(inode, fe); 2367 ocfs2_track_lock_refresh(lockres); 2368 } 2369 2370 status = 0; 2371 bail_refresh: 2372 ocfs2_complete_lock_res_refresh(lockres, status); 2373 bail: 2374 return status; 2375 } 2376 2377 static int ocfs2_assign_bh(struct inode *inode, 2378 struct buffer_head **ret_bh, 2379 struct buffer_head *passed_bh) 2380 { 2381 int status; 2382 2383 if (passed_bh) { 2384 /* Ok, the update went to disk for us, use the 2385 * returned bh. */ 2386 *ret_bh = passed_bh; 2387 get_bh(*ret_bh); 2388 2389 return 0; 2390 } 2391 2392 status = ocfs2_read_inode_block(inode, ret_bh); 2393 if (status < 0) 2394 mlog_errno(status); 2395 2396 return status; 2397 } 2398 2399 /* 2400 * returns < 0 error if the callback will never be called, otherwise 2401 * the result of the lock will be communicated via the callback. 2402 */ 2403 int ocfs2_inode_lock_full_nested(struct inode *inode, 2404 struct buffer_head **ret_bh, 2405 int ex, 2406 int arg_flags, 2407 int subclass) 2408 { 2409 int status, level, acquired; 2410 u32 dlm_flags; 2411 struct ocfs2_lock_res *lockres = NULL; 2412 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 2413 struct buffer_head *local_bh = NULL; 2414 2415 mlog(0, "inode %llu, take %s META lock\n", 2416 (unsigned long long)OCFS2_I(inode)->ip_blkno, 2417 ex ? "EXMODE" : "PRMODE"); 2418 2419 status = 0; 2420 acquired = 0; 2421 /* We'll allow faking a readonly metadata lock for 2422 * rodevices. */ 2423 if (ocfs2_is_hard_readonly(osb)) { 2424 if (ex) 2425 status = -EROFS; 2426 goto getbh; 2427 } 2428 2429 if ((arg_flags & OCFS2_META_LOCK_GETBH) || 2430 ocfs2_mount_local(osb)) 2431 goto update; 2432 2433 if (!(arg_flags & OCFS2_META_LOCK_RECOVERY)) 2434 ocfs2_wait_for_recovery(osb); 2435 2436 lockres = &OCFS2_I(inode)->ip_inode_lockres; 2437 level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2438 dlm_flags = 0; 2439 if (arg_flags & OCFS2_META_LOCK_NOQUEUE) 2440 dlm_flags |= DLM_LKF_NOQUEUE; 2441 2442 status = __ocfs2_cluster_lock(osb, lockres, level, dlm_flags, 2443 arg_flags, subclass, _RET_IP_); 2444 if (status < 0) { 2445 if (status != -EAGAIN) 2446 mlog_errno(status); 2447 goto bail; 2448 } 2449 2450 /* Notify the error cleanup path to drop the cluster lock. */ 2451 acquired = 1; 2452 2453 /* We wait twice because a node may have died while we were in 2454 * the lower dlm layers. The second time though, we've 2455 * committed to owning this lock so we don't allow signals to 2456 * abort the operation. */ 2457 if (!(arg_flags & OCFS2_META_LOCK_RECOVERY)) 2458 ocfs2_wait_for_recovery(osb); 2459 2460 update: 2461 /* 2462 * We only see this flag if we're being called from 2463 * ocfs2_read_locked_inode(). It means we're locking an inode 2464 * which hasn't been populated yet, so clear the refresh flag 2465 * and let the caller handle it. 2466 */ 2467 if (inode->i_state & I_NEW) { 2468 status = 0; 2469 if (lockres) 2470 ocfs2_complete_lock_res_refresh(lockres, 0); 2471 goto bail; 2472 } 2473 2474 /* This is fun. The caller may want a bh back, or it may 2475 * not. ocfs2_inode_lock_update definitely wants one in, but 2476 * may or may not read one, depending on what's in the 2477 * LVB. The result of all of this is that we've *only* gone to 2478 * disk if we have to, so the complexity is worthwhile. */ 2479 status = ocfs2_inode_lock_update(inode, &local_bh); 2480 if (status < 0) { 2481 if (status != -ENOENT) 2482 mlog_errno(status); 2483 goto bail; 2484 } 2485 getbh: 2486 if (ret_bh) { 2487 status = ocfs2_assign_bh(inode, ret_bh, local_bh); 2488 if (status < 0) { 2489 mlog_errno(status); 2490 goto bail; 2491 } 2492 } 2493 2494 bail: 2495 if (status < 0) { 2496 if (ret_bh && (*ret_bh)) { 2497 brelse(*ret_bh); 2498 *ret_bh = NULL; 2499 } 2500 if (acquired) 2501 ocfs2_inode_unlock(inode, ex); 2502 } 2503 2504 if (local_bh) 2505 brelse(local_bh); 2506 2507 return status; 2508 } 2509 2510 /* 2511 * This is working around a lock inversion between tasks acquiring DLM 2512 * locks while holding a page lock and the downconvert thread which 2513 * blocks dlm lock acquiry while acquiring page locks. 2514 * 2515 * ** These _with_page variantes are only intended to be called from aop 2516 * methods that hold page locks and return a very specific *positive* error 2517 * code that aop methods pass up to the VFS -- test for errors with != 0. ** 2518 * 2519 * The DLM is called such that it returns -EAGAIN if it would have 2520 * blocked waiting for the downconvert thread. In that case we unlock 2521 * our page so the downconvert thread can make progress. Once we've 2522 * done this we have to return AOP_TRUNCATED_PAGE so the aop method 2523 * that called us can bubble that back up into the VFS who will then 2524 * immediately retry the aop call. 2525 */ 2526 int ocfs2_inode_lock_with_page(struct inode *inode, 2527 struct buffer_head **ret_bh, 2528 int ex, 2529 struct page *page) 2530 { 2531 int ret; 2532 2533 ret = ocfs2_inode_lock_full(inode, ret_bh, ex, OCFS2_LOCK_NONBLOCK); 2534 if (ret == -EAGAIN) { 2535 unlock_page(page); 2536 /* 2537 * If we can't get inode lock immediately, we should not return 2538 * directly here, since this will lead to a softlockup problem. 2539 * The method is to get a blocking lock and immediately unlock 2540 * before returning, this can avoid CPU resource waste due to 2541 * lots of retries, and benefits fairness in getting lock. 2542 */ 2543 if (ocfs2_inode_lock(inode, ret_bh, ex) == 0) 2544 ocfs2_inode_unlock(inode, ex); 2545 ret = AOP_TRUNCATED_PAGE; 2546 } 2547 2548 return ret; 2549 } 2550 2551 int ocfs2_inode_lock_atime(struct inode *inode, 2552 struct vfsmount *vfsmnt, 2553 int *level, int wait) 2554 { 2555 int ret; 2556 2557 if (wait) 2558 ret = ocfs2_inode_lock(inode, NULL, 0); 2559 else 2560 ret = ocfs2_try_inode_lock(inode, NULL, 0); 2561 2562 if (ret < 0) { 2563 if (ret != -EAGAIN) 2564 mlog_errno(ret); 2565 return ret; 2566 } 2567 2568 /* 2569 * If we should update atime, we will get EX lock, 2570 * otherwise we just get PR lock. 2571 */ 2572 if (ocfs2_should_update_atime(inode, vfsmnt)) { 2573 struct buffer_head *bh = NULL; 2574 2575 ocfs2_inode_unlock(inode, 0); 2576 if (wait) 2577 ret = ocfs2_inode_lock(inode, &bh, 1); 2578 else 2579 ret = ocfs2_try_inode_lock(inode, &bh, 1); 2580 2581 if (ret < 0) { 2582 if (ret != -EAGAIN) 2583 mlog_errno(ret); 2584 return ret; 2585 } 2586 *level = 1; 2587 if (ocfs2_should_update_atime(inode, vfsmnt)) 2588 ocfs2_update_inode_atime(inode, bh); 2589 if (bh) 2590 brelse(bh); 2591 } else 2592 *level = 0; 2593 2594 return ret; 2595 } 2596 2597 void ocfs2_inode_unlock(struct inode *inode, 2598 int ex) 2599 { 2600 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2601 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_inode_lockres; 2602 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 2603 2604 mlog(0, "inode %llu drop %s META lock\n", 2605 (unsigned long long)OCFS2_I(inode)->ip_blkno, 2606 ex ? "EXMODE" : "PRMODE"); 2607 2608 if (!ocfs2_is_hard_readonly(osb) && 2609 !ocfs2_mount_local(osb)) 2610 ocfs2_cluster_unlock(osb, lockres, level); 2611 } 2612 2613 /* 2614 * This _tracker variantes are introduced to deal with the recursive cluster 2615 * locking issue. The idea is to keep track of a lock holder on the stack of 2616 * the current process. If there's a lock holder on the stack, we know the 2617 * task context is already protected by cluster locking. Currently, they're 2618 * used in some VFS entry routines. 2619 * 2620 * return < 0 on error, return == 0 if there's no lock holder on the stack 2621 * before this call, return == 1 if this call would be a recursive locking. 2622 * return == -1 if this lock attempt will cause an upgrade which is forbidden. 2623 * 2624 * When taking lock levels into account,we face some different situations. 2625 * 2626 * 1. no lock is held 2627 * In this case, just lock the inode as requested and return 0 2628 * 2629 * 2. We are holding a lock 2630 * For this situation, things diverges into several cases 2631 * 2632 * wanted holding what to do 2633 * ex ex see 2.1 below 2634 * ex pr see 2.2 below 2635 * pr ex see 2.1 below 2636 * pr pr see 2.1 below 2637 * 2638 * 2.1 lock level that is been held is compatible 2639 * with the wanted level, so no lock action will be tacken. 2640 * 2641 * 2.2 Otherwise, an upgrade is needed, but it is forbidden. 2642 * 2643 * Reason why upgrade within a process is forbidden is that 2644 * lock upgrade may cause dead lock. The following illustrates 2645 * how it happens. 2646 * 2647 * thread on node1 thread on node2 2648 * ocfs2_inode_lock_tracker(ex=0) 2649 * 2650 * <====== ocfs2_inode_lock_tracker(ex=1) 2651 * 2652 * ocfs2_inode_lock_tracker(ex=1) 2653 */ 2654 int ocfs2_inode_lock_tracker(struct inode *inode, 2655 struct buffer_head **ret_bh, 2656 int ex, 2657 struct ocfs2_lock_holder *oh) 2658 { 2659 int status = 0; 2660 struct ocfs2_lock_res *lockres; 2661 struct ocfs2_lock_holder *tmp_oh; 2662 struct pid *pid = task_pid(current); 2663 2664 2665 lockres = &OCFS2_I(inode)->ip_inode_lockres; 2666 tmp_oh = ocfs2_pid_holder(lockres, pid); 2667 2668 if (!tmp_oh) { 2669 /* 2670 * This corresponds to the case 1. 2671 * We haven't got any lock before. 2672 */ 2673 status = ocfs2_inode_lock_full(inode, ret_bh, ex, 0); 2674 if (status < 0) { 2675 if (status != -ENOENT) 2676 mlog_errno(status); 2677 return status; 2678 } 2679 2680 oh->oh_ex = ex; 2681 ocfs2_add_holder(lockres, oh); 2682 return 0; 2683 } 2684 2685 if (unlikely(ex && !tmp_oh->oh_ex)) { 2686 /* 2687 * case 2.2 upgrade may cause dead lock, forbid it. 2688 */ 2689 mlog(ML_ERROR, "Recursive locking is not permitted to " 2690 "upgrade to EX level from PR level.\n"); 2691 dump_stack(); 2692 return -EINVAL; 2693 } 2694 2695 /* 2696 * case 2.1 OCFS2_META_LOCK_GETBH flag make ocfs2_inode_lock_full. 2697 * ignore the lock level and just update it. 2698 */ 2699 if (ret_bh) { 2700 status = ocfs2_inode_lock_full(inode, ret_bh, ex, 2701 OCFS2_META_LOCK_GETBH); 2702 if (status < 0) { 2703 if (status != -ENOENT) 2704 mlog_errno(status); 2705 return status; 2706 } 2707 } 2708 return tmp_oh ? 1 : 0; 2709 } 2710 2711 void ocfs2_inode_unlock_tracker(struct inode *inode, 2712 int ex, 2713 struct ocfs2_lock_holder *oh, 2714 int had_lock) 2715 { 2716 struct ocfs2_lock_res *lockres; 2717 2718 lockres = &OCFS2_I(inode)->ip_inode_lockres; 2719 /* had_lock means that the currect process already takes the cluster 2720 * lock previously. 2721 * If had_lock is 1, we have nothing to do here. 2722 * If had_lock is 0, we will release the lock. 2723 */ 2724 if (!had_lock) { 2725 ocfs2_inode_unlock(inode, oh->oh_ex); 2726 ocfs2_remove_holder(lockres, oh); 2727 } 2728 } 2729 2730 int ocfs2_orphan_scan_lock(struct ocfs2_super *osb, u32 *seqno) 2731 { 2732 struct ocfs2_lock_res *lockres; 2733 struct ocfs2_orphan_scan_lvb *lvb; 2734 int status = 0; 2735 2736 if (ocfs2_is_hard_readonly(osb)) 2737 return -EROFS; 2738 2739 if (ocfs2_mount_local(osb)) 2740 return 0; 2741 2742 lockres = &osb->osb_orphan_scan.os_lockres; 2743 status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0); 2744 if (status < 0) 2745 return status; 2746 2747 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2748 if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) && 2749 lvb->lvb_version == OCFS2_ORPHAN_LVB_VERSION) 2750 *seqno = be32_to_cpu(lvb->lvb_os_seqno); 2751 else 2752 *seqno = osb->osb_orphan_scan.os_seqno + 1; 2753 2754 return status; 2755 } 2756 2757 void ocfs2_orphan_scan_unlock(struct ocfs2_super *osb, u32 seqno) 2758 { 2759 struct ocfs2_lock_res *lockres; 2760 struct ocfs2_orphan_scan_lvb *lvb; 2761 2762 if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb)) { 2763 lockres = &osb->osb_orphan_scan.os_lockres; 2764 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2765 lvb->lvb_version = OCFS2_ORPHAN_LVB_VERSION; 2766 lvb->lvb_os_seqno = cpu_to_be32(seqno); 2767 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX); 2768 } 2769 } 2770 2771 int ocfs2_super_lock(struct ocfs2_super *osb, 2772 int ex) 2773 { 2774 int status = 0; 2775 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2776 struct ocfs2_lock_res *lockres = &osb->osb_super_lockres; 2777 2778 if (ocfs2_is_hard_readonly(osb)) 2779 return -EROFS; 2780 2781 if (ocfs2_mount_local(osb)) 2782 goto bail; 2783 2784 status = ocfs2_cluster_lock(osb, lockres, level, 0, 0); 2785 if (status < 0) { 2786 mlog_errno(status); 2787 goto bail; 2788 } 2789 2790 /* The super block lock path is really in the best position to 2791 * know when resources covered by the lock need to be 2792 * refreshed, so we do it here. Of course, making sense of 2793 * everything is up to the caller :) */ 2794 status = ocfs2_should_refresh_lock_res(lockres); 2795 if (status) { 2796 status = ocfs2_refresh_slot_info(osb); 2797 2798 ocfs2_complete_lock_res_refresh(lockres, status); 2799 2800 if (status < 0) { 2801 ocfs2_cluster_unlock(osb, lockres, level); 2802 mlog_errno(status); 2803 } 2804 ocfs2_track_lock_refresh(lockres); 2805 } 2806 bail: 2807 return status; 2808 } 2809 2810 void ocfs2_super_unlock(struct ocfs2_super *osb, 2811 int ex) 2812 { 2813 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2814 struct ocfs2_lock_res *lockres = &osb->osb_super_lockres; 2815 2816 if (!ocfs2_mount_local(osb)) 2817 ocfs2_cluster_unlock(osb, lockres, level); 2818 } 2819 2820 int ocfs2_rename_lock(struct ocfs2_super *osb) 2821 { 2822 int status; 2823 struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres; 2824 2825 if (ocfs2_is_hard_readonly(osb)) 2826 return -EROFS; 2827 2828 if (ocfs2_mount_local(osb)) 2829 return 0; 2830 2831 status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0); 2832 if (status < 0) 2833 mlog_errno(status); 2834 2835 return status; 2836 } 2837 2838 void ocfs2_rename_unlock(struct ocfs2_super *osb) 2839 { 2840 struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres; 2841 2842 if (!ocfs2_mount_local(osb)) 2843 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX); 2844 } 2845 2846 int ocfs2_nfs_sync_lock(struct ocfs2_super *osb, int ex) 2847 { 2848 int status; 2849 struct ocfs2_lock_res *lockres = &osb->osb_nfs_sync_lockres; 2850 2851 if (ocfs2_is_hard_readonly(osb)) 2852 return -EROFS; 2853 2854 if (ocfs2_mount_local(osb)) 2855 return 0; 2856 2857 status = ocfs2_cluster_lock(osb, lockres, ex ? LKM_EXMODE : LKM_PRMODE, 2858 0, 0); 2859 if (status < 0) 2860 mlog(ML_ERROR, "lock on nfs sync lock failed %d\n", status); 2861 2862 return status; 2863 } 2864 2865 void ocfs2_nfs_sync_unlock(struct ocfs2_super *osb, int ex) 2866 { 2867 struct ocfs2_lock_res *lockres = &osb->osb_nfs_sync_lockres; 2868 2869 if (!ocfs2_mount_local(osb)) 2870 ocfs2_cluster_unlock(osb, lockres, 2871 ex ? LKM_EXMODE : LKM_PRMODE); 2872 } 2873 2874 int ocfs2_trim_fs_lock(struct ocfs2_super *osb, 2875 struct ocfs2_trim_fs_info *info, int trylock) 2876 { 2877 int status; 2878 struct ocfs2_trim_fs_lvb *lvb; 2879 struct ocfs2_lock_res *lockres = &osb->osb_trim_fs_lockres; 2880 2881 if (info) 2882 info->tf_valid = 0; 2883 2884 if (ocfs2_is_hard_readonly(osb)) 2885 return -EROFS; 2886 2887 if (ocfs2_mount_local(osb)) 2888 return 0; 2889 2890 status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 2891 trylock ? DLM_LKF_NOQUEUE : 0, 0); 2892 if (status < 0) { 2893 if (status != -EAGAIN) 2894 mlog_errno(status); 2895 return status; 2896 } 2897 2898 if (info) { 2899 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2900 if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) && 2901 lvb->lvb_version == OCFS2_TRIMFS_LVB_VERSION) { 2902 info->tf_valid = 1; 2903 info->tf_success = lvb->lvb_success; 2904 info->tf_nodenum = be32_to_cpu(lvb->lvb_nodenum); 2905 info->tf_start = be64_to_cpu(lvb->lvb_start); 2906 info->tf_len = be64_to_cpu(lvb->lvb_len); 2907 info->tf_minlen = be64_to_cpu(lvb->lvb_minlen); 2908 info->tf_trimlen = be64_to_cpu(lvb->lvb_trimlen); 2909 } 2910 } 2911 2912 return status; 2913 } 2914 2915 void ocfs2_trim_fs_unlock(struct ocfs2_super *osb, 2916 struct ocfs2_trim_fs_info *info) 2917 { 2918 struct ocfs2_trim_fs_lvb *lvb; 2919 struct ocfs2_lock_res *lockres = &osb->osb_trim_fs_lockres; 2920 2921 if (ocfs2_mount_local(osb)) 2922 return; 2923 2924 if (info) { 2925 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2926 lvb->lvb_version = OCFS2_TRIMFS_LVB_VERSION; 2927 lvb->lvb_success = info->tf_success; 2928 lvb->lvb_nodenum = cpu_to_be32(info->tf_nodenum); 2929 lvb->lvb_start = cpu_to_be64(info->tf_start); 2930 lvb->lvb_len = cpu_to_be64(info->tf_len); 2931 lvb->lvb_minlen = cpu_to_be64(info->tf_minlen); 2932 lvb->lvb_trimlen = cpu_to_be64(info->tf_trimlen); 2933 } 2934 2935 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX); 2936 } 2937 2938 int ocfs2_dentry_lock(struct dentry *dentry, int ex) 2939 { 2940 int ret; 2941 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2942 struct ocfs2_dentry_lock *dl = dentry->d_fsdata; 2943 struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb); 2944 2945 BUG_ON(!dl); 2946 2947 if (ocfs2_is_hard_readonly(osb)) { 2948 if (ex) 2949 return -EROFS; 2950 return 0; 2951 } 2952 2953 if (ocfs2_mount_local(osb)) 2954 return 0; 2955 2956 ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0); 2957 if (ret < 0) 2958 mlog_errno(ret); 2959 2960 return ret; 2961 } 2962 2963 void ocfs2_dentry_unlock(struct dentry *dentry, int ex) 2964 { 2965 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2966 struct ocfs2_dentry_lock *dl = dentry->d_fsdata; 2967 struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb); 2968 2969 if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb)) 2970 ocfs2_cluster_unlock(osb, &dl->dl_lockres, level); 2971 } 2972 2973 /* Reference counting of the dlm debug structure. We want this because 2974 * open references on the debug inodes can live on after a mount, so 2975 * we can't rely on the ocfs2_super to always exist. */ 2976 static void ocfs2_dlm_debug_free(struct kref *kref) 2977 { 2978 struct ocfs2_dlm_debug *dlm_debug; 2979 2980 dlm_debug = container_of(kref, struct ocfs2_dlm_debug, d_refcnt); 2981 2982 kfree(dlm_debug); 2983 } 2984 2985 void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug) 2986 { 2987 if (dlm_debug) 2988 kref_put(&dlm_debug->d_refcnt, ocfs2_dlm_debug_free); 2989 } 2990 2991 static void ocfs2_get_dlm_debug(struct ocfs2_dlm_debug *debug) 2992 { 2993 kref_get(&debug->d_refcnt); 2994 } 2995 2996 struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void) 2997 { 2998 struct ocfs2_dlm_debug *dlm_debug; 2999 3000 dlm_debug = kmalloc(sizeof(struct ocfs2_dlm_debug), GFP_KERNEL); 3001 if (!dlm_debug) { 3002 mlog_errno(-ENOMEM); 3003 goto out; 3004 } 3005 3006 kref_init(&dlm_debug->d_refcnt); 3007 INIT_LIST_HEAD(&dlm_debug->d_lockres_tracking); 3008 dlm_debug->d_locking_state = NULL; 3009 out: 3010 return dlm_debug; 3011 } 3012 3013 /* Access to this is arbitrated for us via seq_file->sem. */ 3014 struct ocfs2_dlm_seq_priv { 3015 struct ocfs2_dlm_debug *p_dlm_debug; 3016 struct ocfs2_lock_res p_iter_res; 3017 struct ocfs2_lock_res p_tmp_res; 3018 }; 3019 3020 static struct ocfs2_lock_res *ocfs2_dlm_next_res(struct ocfs2_lock_res *start, 3021 struct ocfs2_dlm_seq_priv *priv) 3022 { 3023 struct ocfs2_lock_res *iter, *ret = NULL; 3024 struct ocfs2_dlm_debug *dlm_debug = priv->p_dlm_debug; 3025 3026 assert_spin_locked(&ocfs2_dlm_tracking_lock); 3027 3028 list_for_each_entry(iter, &start->l_debug_list, l_debug_list) { 3029 /* discover the head of the list */ 3030 if (&iter->l_debug_list == &dlm_debug->d_lockres_tracking) { 3031 mlog(0, "End of list found, %p\n", ret); 3032 break; 3033 } 3034 3035 /* We track our "dummy" iteration lockres' by a NULL 3036 * l_ops field. */ 3037 if (iter->l_ops != NULL) { 3038 ret = iter; 3039 break; 3040 } 3041 } 3042 3043 return ret; 3044 } 3045 3046 static void *ocfs2_dlm_seq_start(struct seq_file *m, loff_t *pos) 3047 { 3048 struct ocfs2_dlm_seq_priv *priv = m->private; 3049 struct ocfs2_lock_res *iter; 3050 3051 spin_lock(&ocfs2_dlm_tracking_lock); 3052 iter = ocfs2_dlm_next_res(&priv->p_iter_res, priv); 3053 if (iter) { 3054 /* Since lockres' have the lifetime of their container 3055 * (which can be inodes, ocfs2_supers, etc) we want to 3056 * copy this out to a temporary lockres while still 3057 * under the spinlock. Obviously after this we can't 3058 * trust any pointers on the copy returned, but that's 3059 * ok as the information we want isn't typically held 3060 * in them. */ 3061 priv->p_tmp_res = *iter; 3062 iter = &priv->p_tmp_res; 3063 } 3064 spin_unlock(&ocfs2_dlm_tracking_lock); 3065 3066 return iter; 3067 } 3068 3069 static void ocfs2_dlm_seq_stop(struct seq_file *m, void *v) 3070 { 3071 } 3072 3073 static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos) 3074 { 3075 struct ocfs2_dlm_seq_priv *priv = m->private; 3076 struct ocfs2_lock_res *iter = v; 3077 struct ocfs2_lock_res *dummy = &priv->p_iter_res; 3078 3079 spin_lock(&ocfs2_dlm_tracking_lock); 3080 iter = ocfs2_dlm_next_res(iter, priv); 3081 list_del_init(&dummy->l_debug_list); 3082 if (iter) { 3083 list_add(&dummy->l_debug_list, &iter->l_debug_list); 3084 priv->p_tmp_res = *iter; 3085 iter = &priv->p_tmp_res; 3086 } 3087 spin_unlock(&ocfs2_dlm_tracking_lock); 3088 3089 return iter; 3090 } 3091 3092 /* 3093 * Version is used by debugfs.ocfs2 to determine the format being used 3094 * 3095 * New in version 2 3096 * - Lock stats printed 3097 * New in version 3 3098 * - Max time in lock stats is in usecs (instead of nsecs) 3099 */ 3100 #define OCFS2_DLM_DEBUG_STR_VERSION 3 3101 static int ocfs2_dlm_seq_show(struct seq_file *m, void *v) 3102 { 3103 int i; 3104 char *lvb; 3105 struct ocfs2_lock_res *lockres = v; 3106 3107 if (!lockres) 3108 return -EINVAL; 3109 3110 seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION); 3111 3112 if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY) 3113 seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1, 3114 lockres->l_name, 3115 (unsigned int)ocfs2_get_dentry_lock_ino(lockres)); 3116 else 3117 seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name); 3118 3119 seq_printf(m, "%d\t" 3120 "0x%lx\t" 3121 "0x%x\t" 3122 "0x%x\t" 3123 "%u\t" 3124 "%u\t" 3125 "%d\t" 3126 "%d\t", 3127 lockres->l_level, 3128 lockres->l_flags, 3129 lockres->l_action, 3130 lockres->l_unlock_action, 3131 lockres->l_ro_holders, 3132 lockres->l_ex_holders, 3133 lockres->l_requested, 3134 lockres->l_blocking); 3135 3136 /* Dump the raw LVB */ 3137 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 3138 for(i = 0; i < DLM_LVB_LEN; i++) 3139 seq_printf(m, "0x%x\t", lvb[i]); 3140 3141 #ifdef CONFIG_OCFS2_FS_STATS 3142 # define lock_num_prmode(_l) ((_l)->l_lock_prmode.ls_gets) 3143 # define lock_num_exmode(_l) ((_l)->l_lock_exmode.ls_gets) 3144 # define lock_num_prmode_failed(_l) ((_l)->l_lock_prmode.ls_fail) 3145 # define lock_num_exmode_failed(_l) ((_l)->l_lock_exmode.ls_fail) 3146 # define lock_total_prmode(_l) ((_l)->l_lock_prmode.ls_total) 3147 # define lock_total_exmode(_l) ((_l)->l_lock_exmode.ls_total) 3148 # define lock_max_prmode(_l) ((_l)->l_lock_prmode.ls_max) 3149 # define lock_max_exmode(_l) ((_l)->l_lock_exmode.ls_max) 3150 # define lock_refresh(_l) ((_l)->l_lock_refresh) 3151 #else 3152 # define lock_num_prmode(_l) (0) 3153 # define lock_num_exmode(_l) (0) 3154 # define lock_num_prmode_failed(_l) (0) 3155 # define lock_num_exmode_failed(_l) (0) 3156 # define lock_total_prmode(_l) (0ULL) 3157 # define lock_total_exmode(_l) (0ULL) 3158 # define lock_max_prmode(_l) (0) 3159 # define lock_max_exmode(_l) (0) 3160 # define lock_refresh(_l) (0) 3161 #endif 3162 /* The following seq_print was added in version 2 of this output */ 3163 seq_printf(m, "%u\t" 3164 "%u\t" 3165 "%u\t" 3166 "%u\t" 3167 "%llu\t" 3168 "%llu\t" 3169 "%u\t" 3170 "%u\t" 3171 "%u\t", 3172 lock_num_prmode(lockres), 3173 lock_num_exmode(lockres), 3174 lock_num_prmode_failed(lockres), 3175 lock_num_exmode_failed(lockres), 3176 lock_total_prmode(lockres), 3177 lock_total_exmode(lockres), 3178 lock_max_prmode(lockres), 3179 lock_max_exmode(lockres), 3180 lock_refresh(lockres)); 3181 3182 /* End the line */ 3183 seq_printf(m, "\n"); 3184 return 0; 3185 } 3186 3187 static const struct seq_operations ocfs2_dlm_seq_ops = { 3188 .start = ocfs2_dlm_seq_start, 3189 .stop = ocfs2_dlm_seq_stop, 3190 .next = ocfs2_dlm_seq_next, 3191 .show = ocfs2_dlm_seq_show, 3192 }; 3193 3194 static int ocfs2_dlm_debug_release(struct inode *inode, struct file *file) 3195 { 3196 struct seq_file *seq = file->private_data; 3197 struct ocfs2_dlm_seq_priv *priv = seq->private; 3198 struct ocfs2_lock_res *res = &priv->p_iter_res; 3199 3200 ocfs2_remove_lockres_tracking(res); 3201 ocfs2_put_dlm_debug(priv->p_dlm_debug); 3202 return seq_release_private(inode, file); 3203 } 3204 3205 static int ocfs2_dlm_debug_open(struct inode *inode, struct file *file) 3206 { 3207 struct ocfs2_dlm_seq_priv *priv; 3208 struct ocfs2_super *osb; 3209 3210 priv = __seq_open_private(file, &ocfs2_dlm_seq_ops, sizeof(*priv)); 3211 if (!priv) { 3212 mlog_errno(-ENOMEM); 3213 return -ENOMEM; 3214 } 3215 3216 osb = inode->i_private; 3217 ocfs2_get_dlm_debug(osb->osb_dlm_debug); 3218 priv->p_dlm_debug = osb->osb_dlm_debug; 3219 INIT_LIST_HEAD(&priv->p_iter_res.l_debug_list); 3220 3221 ocfs2_add_lockres_tracking(&priv->p_iter_res, 3222 priv->p_dlm_debug); 3223 3224 return 0; 3225 } 3226 3227 static const struct file_operations ocfs2_dlm_debug_fops = { 3228 .open = ocfs2_dlm_debug_open, 3229 .release = ocfs2_dlm_debug_release, 3230 .read = seq_read, 3231 .llseek = seq_lseek, 3232 }; 3233 3234 static int ocfs2_dlm_init_debug(struct ocfs2_super *osb) 3235 { 3236 int ret = 0; 3237 struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug; 3238 3239 dlm_debug->d_locking_state = debugfs_create_file("locking_state", 3240 S_IFREG|S_IRUSR, 3241 osb->osb_debug_root, 3242 osb, 3243 &ocfs2_dlm_debug_fops); 3244 if (!dlm_debug->d_locking_state) { 3245 ret = -EINVAL; 3246 mlog(ML_ERROR, 3247 "Unable to create locking state debugfs file.\n"); 3248 goto out; 3249 } 3250 3251 ocfs2_get_dlm_debug(dlm_debug); 3252 out: 3253 return ret; 3254 } 3255 3256 static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb) 3257 { 3258 struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug; 3259 3260 if (dlm_debug) { 3261 debugfs_remove(dlm_debug->d_locking_state); 3262 ocfs2_put_dlm_debug(dlm_debug); 3263 } 3264 } 3265 3266 int ocfs2_dlm_init(struct ocfs2_super *osb) 3267 { 3268 int status = 0; 3269 struct ocfs2_cluster_connection *conn = NULL; 3270 3271 if (ocfs2_mount_local(osb)) { 3272 osb->node_num = 0; 3273 goto local; 3274 } 3275 3276 status = ocfs2_dlm_init_debug(osb); 3277 if (status < 0) { 3278 mlog_errno(status); 3279 goto bail; 3280 } 3281 3282 /* launch downconvert thread */ 3283 osb->dc_task = kthread_run(ocfs2_downconvert_thread, osb, "ocfs2dc-%s", 3284 osb->uuid_str); 3285 if (IS_ERR(osb->dc_task)) { 3286 status = PTR_ERR(osb->dc_task); 3287 osb->dc_task = NULL; 3288 mlog_errno(status); 3289 goto bail; 3290 } 3291 3292 /* for now, uuid == domain */ 3293 status = ocfs2_cluster_connect(osb->osb_cluster_stack, 3294 osb->osb_cluster_name, 3295 strlen(osb->osb_cluster_name), 3296 osb->uuid_str, 3297 strlen(osb->uuid_str), 3298 &lproto, ocfs2_do_node_down, osb, 3299 &conn); 3300 if (status) { 3301 mlog_errno(status); 3302 goto bail; 3303 } 3304 3305 status = ocfs2_cluster_this_node(conn, &osb->node_num); 3306 if (status < 0) { 3307 mlog_errno(status); 3308 mlog(ML_ERROR, 3309 "could not find this host's node number\n"); 3310 ocfs2_cluster_disconnect(conn, 0); 3311 goto bail; 3312 } 3313 3314 local: 3315 ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb); 3316 ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb); 3317 ocfs2_nfs_sync_lock_res_init(&osb->osb_nfs_sync_lockres, osb); 3318 ocfs2_orphan_scan_lock_res_init(&osb->osb_orphan_scan.os_lockres, osb); 3319 3320 osb->cconn = conn; 3321 bail: 3322 if (status < 0) { 3323 ocfs2_dlm_shutdown_debug(osb); 3324 if (osb->dc_task) 3325 kthread_stop(osb->dc_task); 3326 } 3327 3328 return status; 3329 } 3330 3331 void ocfs2_dlm_shutdown(struct ocfs2_super *osb, 3332 int hangup_pending) 3333 { 3334 ocfs2_drop_osb_locks(osb); 3335 3336 /* 3337 * Now that we have dropped all locks and ocfs2_dismount_volume() 3338 * has disabled recovery, the DLM won't be talking to us. It's 3339 * safe to tear things down before disconnecting the cluster. 3340 */ 3341 3342 if (osb->dc_task) { 3343 kthread_stop(osb->dc_task); 3344 osb->dc_task = NULL; 3345 } 3346 3347 ocfs2_lock_res_free(&osb->osb_super_lockres); 3348 ocfs2_lock_res_free(&osb->osb_rename_lockres); 3349 ocfs2_lock_res_free(&osb->osb_nfs_sync_lockres); 3350 ocfs2_lock_res_free(&osb->osb_orphan_scan.os_lockres); 3351 3352 ocfs2_cluster_disconnect(osb->cconn, hangup_pending); 3353 osb->cconn = NULL; 3354 3355 ocfs2_dlm_shutdown_debug(osb); 3356 } 3357 3358 static int ocfs2_drop_lock(struct ocfs2_super *osb, 3359 struct ocfs2_lock_res *lockres) 3360 { 3361 int ret; 3362 unsigned long flags; 3363 u32 lkm_flags = 0; 3364 3365 /* We didn't get anywhere near actually using this lockres. */ 3366 if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED)) 3367 goto out; 3368 3369 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) 3370 lkm_flags |= DLM_LKF_VALBLK; 3371 3372 spin_lock_irqsave(&lockres->l_lock, flags); 3373 3374 mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_FREEING), 3375 "lockres %s, flags 0x%lx\n", 3376 lockres->l_name, lockres->l_flags); 3377 3378 while (lockres->l_flags & OCFS2_LOCK_BUSY) { 3379 mlog(0, "waiting on busy lock \"%s\": flags = %lx, action = " 3380 "%u, unlock_action = %u\n", 3381 lockres->l_name, lockres->l_flags, lockres->l_action, 3382 lockres->l_unlock_action); 3383 3384 spin_unlock_irqrestore(&lockres->l_lock, flags); 3385 3386 /* XXX: Today we just wait on any busy 3387 * locks... Perhaps we need to cancel converts in the 3388 * future? */ 3389 ocfs2_wait_on_busy_lock(lockres); 3390 3391 spin_lock_irqsave(&lockres->l_lock, flags); 3392 } 3393 3394 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) { 3395 if (lockres->l_flags & OCFS2_LOCK_ATTACHED && 3396 lockres->l_level == DLM_LOCK_EX && 3397 !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) 3398 lockres->l_ops->set_lvb(lockres); 3399 } 3400 3401 if (lockres->l_flags & OCFS2_LOCK_BUSY) 3402 mlog(ML_ERROR, "destroying busy lock: \"%s\"\n", 3403 lockres->l_name); 3404 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) 3405 mlog(0, "destroying blocked lock: \"%s\"\n", lockres->l_name); 3406 3407 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 3408 spin_unlock_irqrestore(&lockres->l_lock, flags); 3409 goto out; 3410 } 3411 3412 lockres_clear_flags(lockres, OCFS2_LOCK_ATTACHED); 3413 3414 /* make sure we never get here while waiting for an ast to 3415 * fire. */ 3416 BUG_ON(lockres->l_action != OCFS2_AST_INVALID); 3417 3418 /* is this necessary? */ 3419 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 3420 lockres->l_unlock_action = OCFS2_UNLOCK_DROP_LOCK; 3421 spin_unlock_irqrestore(&lockres->l_lock, flags); 3422 3423 mlog(0, "lock %s\n", lockres->l_name); 3424 3425 ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, lkm_flags); 3426 if (ret) { 3427 ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres); 3428 mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags); 3429 ocfs2_dlm_dump_lksb(&lockres->l_lksb); 3430 BUG(); 3431 } 3432 mlog(0, "lock %s, successful return from ocfs2_dlm_unlock\n", 3433 lockres->l_name); 3434 3435 ocfs2_wait_on_busy_lock(lockres); 3436 out: 3437 return 0; 3438 } 3439 3440 static void ocfs2_process_blocked_lock(struct ocfs2_super *osb, 3441 struct ocfs2_lock_res *lockres); 3442 3443 /* Mark the lockres as being dropped. It will no longer be 3444 * queued if blocking, but we still may have to wait on it 3445 * being dequeued from the downconvert thread before we can consider 3446 * it safe to drop. 3447 * 3448 * You can *not* attempt to call cluster_lock on this lockres anymore. */ 3449 void ocfs2_mark_lockres_freeing(struct ocfs2_super *osb, 3450 struct ocfs2_lock_res *lockres) 3451 { 3452 int status; 3453 struct ocfs2_mask_waiter mw; 3454 unsigned long flags, flags2; 3455 3456 ocfs2_init_mask_waiter(&mw); 3457 3458 spin_lock_irqsave(&lockres->l_lock, flags); 3459 lockres->l_flags |= OCFS2_LOCK_FREEING; 3460 if (lockres->l_flags & OCFS2_LOCK_QUEUED && current == osb->dc_task) { 3461 /* 3462 * We know the downconvert is queued but not in progress 3463 * because we are the downconvert thread and processing 3464 * different lock. So we can just remove the lock from the 3465 * queue. This is not only an optimization but also a way 3466 * to avoid the following deadlock: 3467 * ocfs2_dentry_post_unlock() 3468 * ocfs2_dentry_lock_put() 3469 * ocfs2_drop_dentry_lock() 3470 * iput() 3471 * ocfs2_evict_inode() 3472 * ocfs2_clear_inode() 3473 * ocfs2_mark_lockres_freeing() 3474 * ... blocks waiting for OCFS2_LOCK_QUEUED 3475 * since we are the downconvert thread which 3476 * should clear the flag. 3477 */ 3478 spin_unlock_irqrestore(&lockres->l_lock, flags); 3479 spin_lock_irqsave(&osb->dc_task_lock, flags2); 3480 list_del_init(&lockres->l_blocked_list); 3481 osb->blocked_lock_count--; 3482 spin_unlock_irqrestore(&osb->dc_task_lock, flags2); 3483 /* 3484 * Warn if we recurse into another post_unlock call. Strictly 3485 * speaking it isn't a problem but we need to be careful if 3486 * that happens (stack overflow, deadlocks, ...) so warn if 3487 * ocfs2 grows a path for which this can happen. 3488 */ 3489 WARN_ON_ONCE(lockres->l_ops->post_unlock); 3490 /* Since the lock is freeing we don't do much in the fn below */ 3491 ocfs2_process_blocked_lock(osb, lockres); 3492 return; 3493 } 3494 while (lockres->l_flags & OCFS2_LOCK_QUEUED) { 3495 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_QUEUED, 0); 3496 spin_unlock_irqrestore(&lockres->l_lock, flags); 3497 3498 mlog(0, "Waiting on lockres %s\n", lockres->l_name); 3499 3500 status = ocfs2_wait_for_mask(&mw); 3501 if (status) 3502 mlog_errno(status); 3503 3504 spin_lock_irqsave(&lockres->l_lock, flags); 3505 } 3506 spin_unlock_irqrestore(&lockres->l_lock, flags); 3507 } 3508 3509 void ocfs2_simple_drop_lockres(struct ocfs2_super *osb, 3510 struct ocfs2_lock_res *lockres) 3511 { 3512 int ret; 3513 3514 ocfs2_mark_lockres_freeing(osb, lockres); 3515 ret = ocfs2_drop_lock(osb, lockres); 3516 if (ret) 3517 mlog_errno(ret); 3518 } 3519 3520 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb) 3521 { 3522 ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres); 3523 ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres); 3524 ocfs2_simple_drop_lockres(osb, &osb->osb_nfs_sync_lockres); 3525 ocfs2_simple_drop_lockres(osb, &osb->osb_orphan_scan.os_lockres); 3526 } 3527 3528 int ocfs2_drop_inode_locks(struct inode *inode) 3529 { 3530 int status, err; 3531 3532 /* No need to call ocfs2_mark_lockres_freeing here - 3533 * ocfs2_clear_inode has done it for us. */ 3534 3535 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 3536 &OCFS2_I(inode)->ip_open_lockres); 3537 if (err < 0) 3538 mlog_errno(err); 3539 3540 status = err; 3541 3542 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 3543 &OCFS2_I(inode)->ip_inode_lockres); 3544 if (err < 0) 3545 mlog_errno(err); 3546 if (err < 0 && !status) 3547 status = err; 3548 3549 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 3550 &OCFS2_I(inode)->ip_rw_lockres); 3551 if (err < 0) 3552 mlog_errno(err); 3553 if (err < 0 && !status) 3554 status = err; 3555 3556 return status; 3557 } 3558 3559 static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres, 3560 int new_level) 3561 { 3562 assert_spin_locked(&lockres->l_lock); 3563 3564 BUG_ON(lockres->l_blocking <= DLM_LOCK_NL); 3565 3566 if (lockres->l_level <= new_level) { 3567 mlog(ML_ERROR, "lockres %s, lvl %d <= %d, blcklst %d, mask %d, " 3568 "type %d, flags 0x%lx, hold %d %d, act %d %d, req %d, " 3569 "block %d, pgen %d\n", lockres->l_name, lockres->l_level, 3570 new_level, list_empty(&lockres->l_blocked_list), 3571 list_empty(&lockres->l_mask_waiters), lockres->l_type, 3572 lockres->l_flags, lockres->l_ro_holders, 3573 lockres->l_ex_holders, lockres->l_action, 3574 lockres->l_unlock_action, lockres->l_requested, 3575 lockres->l_blocking, lockres->l_pending_gen); 3576 BUG(); 3577 } 3578 3579 mlog(ML_BASTS, "lockres %s, level %d => %d, blocking %d\n", 3580 lockres->l_name, lockres->l_level, new_level, lockres->l_blocking); 3581 3582 lockres->l_action = OCFS2_AST_DOWNCONVERT; 3583 lockres->l_requested = new_level; 3584 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 3585 return lockres_set_pending(lockres); 3586 } 3587 3588 static int ocfs2_downconvert_lock(struct ocfs2_super *osb, 3589 struct ocfs2_lock_res *lockres, 3590 int new_level, 3591 int lvb, 3592 unsigned int generation) 3593 { 3594 int ret; 3595 u32 dlm_flags = DLM_LKF_CONVERT; 3596 3597 mlog(ML_BASTS, "lockres %s, level %d => %d\n", lockres->l_name, 3598 lockres->l_level, new_level); 3599 3600 /* 3601 * On DLM_LKF_VALBLK, fsdlm behaves differently with o2cb. It always 3602 * expects DLM_LKF_VALBLK being set if the LKB has LVB, so that 3603 * we can recover correctly from node failure. Otherwise, we may get 3604 * invalid LVB in LKB, but without DLM_SBF_VALNOTVALID being set. 3605 */ 3606 if (!ocfs2_is_o2cb_active() && 3607 lockres->l_ops->flags & LOCK_TYPE_USES_LVB) 3608 lvb = 1; 3609 3610 if (lvb) 3611 dlm_flags |= DLM_LKF_VALBLK; 3612 3613 ret = ocfs2_dlm_lock(osb->cconn, 3614 new_level, 3615 &lockres->l_lksb, 3616 dlm_flags, 3617 lockres->l_name, 3618 OCFS2_LOCK_ID_MAX_LEN - 1); 3619 lockres_clear_pending(lockres, generation, osb); 3620 if (ret) { 3621 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 3622 ocfs2_recover_from_dlm_error(lockres, 1); 3623 goto bail; 3624 } 3625 3626 ret = 0; 3627 bail: 3628 return ret; 3629 } 3630 3631 /* returns 1 when the caller should unlock and call ocfs2_dlm_unlock */ 3632 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb, 3633 struct ocfs2_lock_res *lockres) 3634 { 3635 assert_spin_locked(&lockres->l_lock); 3636 3637 if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) { 3638 /* If we're already trying to cancel a lock conversion 3639 * then just drop the spinlock and allow the caller to 3640 * requeue this lock. */ 3641 mlog(ML_BASTS, "lockres %s, skip convert\n", lockres->l_name); 3642 return 0; 3643 } 3644 3645 /* were we in a convert when we got the bast fire? */ 3646 BUG_ON(lockres->l_action != OCFS2_AST_CONVERT && 3647 lockres->l_action != OCFS2_AST_DOWNCONVERT); 3648 /* set things up for the unlockast to know to just 3649 * clear out the ast_action and unset busy, etc. */ 3650 lockres->l_unlock_action = OCFS2_UNLOCK_CANCEL_CONVERT; 3651 3652 mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_BUSY), 3653 "lock %s, invalid flags: 0x%lx\n", 3654 lockres->l_name, lockres->l_flags); 3655 3656 mlog(ML_BASTS, "lockres %s\n", lockres->l_name); 3657 3658 return 1; 3659 } 3660 3661 static int ocfs2_cancel_convert(struct ocfs2_super *osb, 3662 struct ocfs2_lock_res *lockres) 3663 { 3664 int ret; 3665 3666 ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, 3667 DLM_LKF_CANCEL); 3668 if (ret) { 3669 ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres); 3670 ocfs2_recover_from_dlm_error(lockres, 0); 3671 } 3672 3673 mlog(ML_BASTS, "lockres %s\n", lockres->l_name); 3674 3675 return ret; 3676 } 3677 3678 static int ocfs2_unblock_lock(struct ocfs2_super *osb, 3679 struct ocfs2_lock_res *lockres, 3680 struct ocfs2_unblock_ctl *ctl) 3681 { 3682 unsigned long flags; 3683 int blocking; 3684 int new_level; 3685 int level; 3686 int ret = 0; 3687 int set_lvb = 0; 3688 unsigned int gen; 3689 3690 spin_lock_irqsave(&lockres->l_lock, flags); 3691 3692 recheck: 3693 /* 3694 * Is it still blocking? If not, we have no more work to do. 3695 */ 3696 if (!(lockres->l_flags & OCFS2_LOCK_BLOCKED)) { 3697 BUG_ON(lockres->l_blocking != DLM_LOCK_NL); 3698 spin_unlock_irqrestore(&lockres->l_lock, flags); 3699 ret = 0; 3700 goto leave; 3701 } 3702 3703 if (lockres->l_flags & OCFS2_LOCK_BUSY) { 3704 /* XXX 3705 * This is a *big* race. The OCFS2_LOCK_PENDING flag 3706 * exists entirely for one reason - another thread has set 3707 * OCFS2_LOCK_BUSY, but has *NOT* yet called dlm_lock(). 3708 * 3709 * If we do ocfs2_cancel_convert() before the other thread 3710 * calls dlm_lock(), our cancel will do nothing. We will 3711 * get no ast, and we will have no way of knowing the 3712 * cancel failed. Meanwhile, the other thread will call 3713 * into dlm_lock() and wait...forever. 3714 * 3715 * Why forever? Because another node has asked for the 3716 * lock first; that's why we're here in unblock_lock(). 3717 * 3718 * The solution is OCFS2_LOCK_PENDING. When PENDING is 3719 * set, we just requeue the unblock. Only when the other 3720 * thread has called dlm_lock() and cleared PENDING will 3721 * we then cancel their request. 3722 * 3723 * All callers of dlm_lock() must set OCFS2_DLM_PENDING 3724 * at the same time they set OCFS2_DLM_BUSY. They must 3725 * clear OCFS2_DLM_PENDING after dlm_lock() returns. 3726 */ 3727 if (lockres->l_flags & OCFS2_LOCK_PENDING) { 3728 mlog(ML_BASTS, "lockres %s, ReQ: Pending\n", 3729 lockres->l_name); 3730 goto leave_requeue; 3731 } 3732 3733 ctl->requeue = 1; 3734 ret = ocfs2_prepare_cancel_convert(osb, lockres); 3735 spin_unlock_irqrestore(&lockres->l_lock, flags); 3736 if (ret) { 3737 ret = ocfs2_cancel_convert(osb, lockres); 3738 if (ret < 0) 3739 mlog_errno(ret); 3740 } 3741 goto leave; 3742 } 3743 3744 /* 3745 * This prevents livelocks. OCFS2_LOCK_UPCONVERT_FINISHING flag is 3746 * set when the ast is received for an upconvert just before the 3747 * OCFS2_LOCK_BUSY flag is cleared. Now if the fs received a bast 3748 * on the heels of the ast, we want to delay the downconvert just 3749 * enough to allow the up requestor to do its task. Because this 3750 * lock is in the blocked queue, the lock will be downconverted 3751 * as soon as the requestor is done with the lock. 3752 */ 3753 if (lockres->l_flags & OCFS2_LOCK_UPCONVERT_FINISHING) 3754 goto leave_requeue; 3755 3756 /* 3757 * How can we block and yet be at NL? We were trying to upconvert 3758 * from NL and got canceled. The code comes back here, and now 3759 * we notice and clear BLOCKING. 3760 */ 3761 if (lockres->l_level == DLM_LOCK_NL) { 3762 BUG_ON(lockres->l_ex_holders || lockres->l_ro_holders); 3763 mlog(ML_BASTS, "lockres %s, Aborting dc\n", lockres->l_name); 3764 lockres->l_blocking = DLM_LOCK_NL; 3765 lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED); 3766 spin_unlock_irqrestore(&lockres->l_lock, flags); 3767 goto leave; 3768 } 3769 3770 /* if we're blocking an exclusive and we have *any* holders, 3771 * then requeue. */ 3772 if ((lockres->l_blocking == DLM_LOCK_EX) 3773 && (lockres->l_ex_holders || lockres->l_ro_holders)) { 3774 mlog(ML_BASTS, "lockres %s, ReQ: EX/PR Holders %u,%u\n", 3775 lockres->l_name, lockres->l_ex_holders, 3776 lockres->l_ro_holders); 3777 goto leave_requeue; 3778 } 3779 3780 /* If it's a PR we're blocking, then only 3781 * requeue if we've got any EX holders */ 3782 if (lockres->l_blocking == DLM_LOCK_PR && 3783 lockres->l_ex_holders) { 3784 mlog(ML_BASTS, "lockres %s, ReQ: EX Holders %u\n", 3785 lockres->l_name, lockres->l_ex_holders); 3786 goto leave_requeue; 3787 } 3788 3789 /* 3790 * Can we get a lock in this state if the holder counts are 3791 * zero? The meta data unblock code used to check this. 3792 */ 3793 if ((lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 3794 && (lockres->l_flags & OCFS2_LOCK_REFRESHING)) { 3795 mlog(ML_BASTS, "lockres %s, ReQ: Lock Refreshing\n", 3796 lockres->l_name); 3797 goto leave_requeue; 3798 } 3799 3800 new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking); 3801 3802 if (lockres->l_ops->check_downconvert 3803 && !lockres->l_ops->check_downconvert(lockres, new_level)) { 3804 mlog(ML_BASTS, "lockres %s, ReQ: Checkpointing\n", 3805 lockres->l_name); 3806 goto leave_requeue; 3807 } 3808 3809 /* If we get here, then we know that there are no more 3810 * incompatible holders (and anyone asking for an incompatible 3811 * lock is blocked). We can now downconvert the lock */ 3812 if (!lockres->l_ops->downconvert_worker) 3813 goto downconvert; 3814 3815 /* Some lockres types want to do a bit of work before 3816 * downconverting a lock. Allow that here. The worker function 3817 * may sleep, so we save off a copy of what we're blocking as 3818 * it may change while we're not holding the spin lock. */ 3819 blocking = lockres->l_blocking; 3820 level = lockres->l_level; 3821 spin_unlock_irqrestore(&lockres->l_lock, flags); 3822 3823 ctl->unblock_action = lockres->l_ops->downconvert_worker(lockres, blocking); 3824 3825 if (ctl->unblock_action == UNBLOCK_STOP_POST) { 3826 mlog(ML_BASTS, "lockres %s, UNBLOCK_STOP_POST\n", 3827 lockres->l_name); 3828 goto leave; 3829 } 3830 3831 spin_lock_irqsave(&lockres->l_lock, flags); 3832 if ((blocking != lockres->l_blocking) || (level != lockres->l_level)) { 3833 /* If this changed underneath us, then we can't drop 3834 * it just yet. */ 3835 mlog(ML_BASTS, "lockres %s, block=%d:%d, level=%d:%d, " 3836 "Recheck\n", lockres->l_name, blocking, 3837 lockres->l_blocking, level, lockres->l_level); 3838 goto recheck; 3839 } 3840 3841 downconvert: 3842 ctl->requeue = 0; 3843 3844 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) { 3845 if (lockres->l_level == DLM_LOCK_EX) 3846 set_lvb = 1; 3847 3848 /* 3849 * We only set the lvb if the lock has been fully 3850 * refreshed - otherwise we risk setting stale 3851 * data. Otherwise, there's no need to actually clear 3852 * out the lvb here as it's value is still valid. 3853 */ 3854 if (set_lvb && !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) 3855 lockres->l_ops->set_lvb(lockres); 3856 } 3857 3858 gen = ocfs2_prepare_downconvert(lockres, new_level); 3859 spin_unlock_irqrestore(&lockres->l_lock, flags); 3860 ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb, 3861 gen); 3862 3863 leave: 3864 if (ret) 3865 mlog_errno(ret); 3866 return ret; 3867 3868 leave_requeue: 3869 spin_unlock_irqrestore(&lockres->l_lock, flags); 3870 ctl->requeue = 1; 3871 3872 return 0; 3873 } 3874 3875 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres, 3876 int blocking) 3877 { 3878 struct inode *inode; 3879 struct address_space *mapping; 3880 struct ocfs2_inode_info *oi; 3881 3882 inode = ocfs2_lock_res_inode(lockres); 3883 mapping = inode->i_mapping; 3884 3885 if (S_ISDIR(inode->i_mode)) { 3886 oi = OCFS2_I(inode); 3887 oi->ip_dir_lock_gen++; 3888 mlog(0, "generation: %u\n", oi->ip_dir_lock_gen); 3889 goto out; 3890 } 3891 3892 if (!S_ISREG(inode->i_mode)) 3893 goto out; 3894 3895 /* 3896 * We need this before the filemap_fdatawrite() so that it can 3897 * transfer the dirty bit from the PTE to the 3898 * page. Unfortunately this means that even for EX->PR 3899 * downconverts, we'll lose our mappings and have to build 3900 * them up again. 3901 */ 3902 unmap_mapping_range(mapping, 0, 0, 0); 3903 3904 if (filemap_fdatawrite(mapping)) { 3905 mlog(ML_ERROR, "Could not sync inode %llu for downconvert!", 3906 (unsigned long long)OCFS2_I(inode)->ip_blkno); 3907 } 3908 sync_mapping_buffers(mapping); 3909 if (blocking == DLM_LOCK_EX) { 3910 truncate_inode_pages(mapping, 0); 3911 } else { 3912 /* We only need to wait on the I/O if we're not also 3913 * truncating pages because truncate_inode_pages waits 3914 * for us above. We don't truncate pages if we're 3915 * blocking anything < EXMODE because we want to keep 3916 * them around in that case. */ 3917 filemap_fdatawait(mapping); 3918 } 3919 3920 forget_all_cached_acls(inode); 3921 3922 out: 3923 return UNBLOCK_CONTINUE; 3924 } 3925 3926 static int ocfs2_ci_checkpointed(struct ocfs2_caching_info *ci, 3927 struct ocfs2_lock_res *lockres, 3928 int new_level) 3929 { 3930 int checkpointed = ocfs2_ci_fully_checkpointed(ci); 3931 3932 BUG_ON(new_level != DLM_LOCK_NL && new_level != DLM_LOCK_PR); 3933 BUG_ON(lockres->l_level != DLM_LOCK_EX && !checkpointed); 3934 3935 if (checkpointed) 3936 return 1; 3937 3938 ocfs2_start_checkpoint(OCFS2_SB(ocfs2_metadata_cache_get_super(ci))); 3939 return 0; 3940 } 3941 3942 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres, 3943 int new_level) 3944 { 3945 struct inode *inode = ocfs2_lock_res_inode(lockres); 3946 3947 return ocfs2_ci_checkpointed(INODE_CACHE(inode), lockres, new_level); 3948 } 3949 3950 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres) 3951 { 3952 struct inode *inode = ocfs2_lock_res_inode(lockres); 3953 3954 __ocfs2_stuff_meta_lvb(inode); 3955 } 3956 3957 /* 3958 * Does the final reference drop on our dentry lock. Right now this 3959 * happens in the downconvert thread, but we could choose to simplify the 3960 * dlmglue API and push these off to the ocfs2_wq in the future. 3961 */ 3962 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb, 3963 struct ocfs2_lock_res *lockres) 3964 { 3965 struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres); 3966 ocfs2_dentry_lock_put(osb, dl); 3967 } 3968 3969 /* 3970 * d_delete() matching dentries before the lock downconvert. 3971 * 3972 * At this point, any process waiting to destroy the 3973 * dentry_lock due to last ref count is stopped by the 3974 * OCFS2_LOCK_QUEUED flag. 3975 * 3976 * We have two potential problems 3977 * 3978 * 1) If we do the last reference drop on our dentry_lock (via dput) 3979 * we'll wind up in ocfs2_release_dentry_lock(), waiting on 3980 * the downconvert to finish. Instead we take an elevated 3981 * reference and push the drop until after we've completed our 3982 * unblock processing. 3983 * 3984 * 2) There might be another process with a final reference, 3985 * waiting on us to finish processing. If this is the case, we 3986 * detect it and exit out - there's no more dentries anyway. 3987 */ 3988 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres, 3989 int blocking) 3990 { 3991 struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres); 3992 struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode); 3993 struct dentry *dentry; 3994 unsigned long flags; 3995 int extra_ref = 0; 3996 3997 /* 3998 * This node is blocking another node from getting a read 3999 * lock. This happens when we've renamed within a 4000 * directory. We've forced the other nodes to d_delete(), but 4001 * we never actually dropped our lock because it's still 4002 * valid. The downconvert code will retain a PR for this node, 4003 * so there's no further work to do. 4004 */ 4005 if (blocking == DLM_LOCK_PR) 4006 return UNBLOCK_CONTINUE; 4007 4008 /* 4009 * Mark this inode as potentially orphaned. The code in 4010 * ocfs2_delete_inode() will figure out whether it actually 4011 * needs to be freed or not. 4012 */ 4013 spin_lock(&oi->ip_lock); 4014 oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED; 4015 spin_unlock(&oi->ip_lock); 4016 4017 /* 4018 * Yuck. We need to make sure however that the check of 4019 * OCFS2_LOCK_FREEING and the extra reference are atomic with 4020 * respect to a reference decrement or the setting of that 4021 * flag. 4022 */ 4023 spin_lock_irqsave(&lockres->l_lock, flags); 4024 spin_lock(&dentry_attach_lock); 4025 if (!(lockres->l_flags & OCFS2_LOCK_FREEING) 4026 && dl->dl_count) { 4027 dl->dl_count++; 4028 extra_ref = 1; 4029 } 4030 spin_unlock(&dentry_attach_lock); 4031 spin_unlock_irqrestore(&lockres->l_lock, flags); 4032 4033 mlog(0, "extra_ref = %d\n", extra_ref); 4034 4035 /* 4036 * We have a process waiting on us in ocfs2_dentry_iput(), 4037 * which means we can't have any more outstanding 4038 * aliases. There's no need to do any more work. 4039 */ 4040 if (!extra_ref) 4041 return UNBLOCK_CONTINUE; 4042 4043 spin_lock(&dentry_attach_lock); 4044 while (1) { 4045 dentry = ocfs2_find_local_alias(dl->dl_inode, 4046 dl->dl_parent_blkno, 1); 4047 if (!dentry) 4048 break; 4049 spin_unlock(&dentry_attach_lock); 4050 4051 if (S_ISDIR(dl->dl_inode->i_mode)) 4052 shrink_dcache_parent(dentry); 4053 4054 mlog(0, "d_delete(%pd);\n", dentry); 4055 4056 /* 4057 * The following dcache calls may do an 4058 * iput(). Normally we don't want that from the 4059 * downconverting thread, but in this case it's ok 4060 * because the requesting node already has an 4061 * exclusive lock on the inode, so it can't be queued 4062 * for a downconvert. 4063 */ 4064 d_delete(dentry); 4065 dput(dentry); 4066 4067 spin_lock(&dentry_attach_lock); 4068 } 4069 spin_unlock(&dentry_attach_lock); 4070 4071 /* 4072 * If we are the last holder of this dentry lock, there is no 4073 * reason to downconvert so skip straight to the unlock. 4074 */ 4075 if (dl->dl_count == 1) 4076 return UNBLOCK_STOP_POST; 4077 4078 return UNBLOCK_CONTINUE_POST; 4079 } 4080 4081 static int ocfs2_check_refcount_downconvert(struct ocfs2_lock_res *lockres, 4082 int new_level) 4083 { 4084 struct ocfs2_refcount_tree *tree = 4085 ocfs2_lock_res_refcount_tree(lockres); 4086 4087 return ocfs2_ci_checkpointed(&tree->rf_ci, lockres, new_level); 4088 } 4089 4090 static int ocfs2_refcount_convert_worker(struct ocfs2_lock_res *lockres, 4091 int blocking) 4092 { 4093 struct ocfs2_refcount_tree *tree = 4094 ocfs2_lock_res_refcount_tree(lockres); 4095 4096 ocfs2_metadata_cache_purge(&tree->rf_ci); 4097 4098 return UNBLOCK_CONTINUE; 4099 } 4100 4101 static void ocfs2_set_qinfo_lvb(struct ocfs2_lock_res *lockres) 4102 { 4103 struct ocfs2_qinfo_lvb *lvb; 4104 struct ocfs2_mem_dqinfo *oinfo = ocfs2_lock_res_qinfo(lockres); 4105 struct mem_dqinfo *info = sb_dqinfo(oinfo->dqi_gi.dqi_sb, 4106 oinfo->dqi_gi.dqi_type); 4107 4108 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 4109 lvb->lvb_version = OCFS2_QINFO_LVB_VERSION; 4110 lvb->lvb_bgrace = cpu_to_be32(info->dqi_bgrace); 4111 lvb->lvb_igrace = cpu_to_be32(info->dqi_igrace); 4112 lvb->lvb_syncms = cpu_to_be32(oinfo->dqi_syncms); 4113 lvb->lvb_blocks = cpu_to_be32(oinfo->dqi_gi.dqi_blocks); 4114 lvb->lvb_free_blk = cpu_to_be32(oinfo->dqi_gi.dqi_free_blk); 4115 lvb->lvb_free_entry = cpu_to_be32(oinfo->dqi_gi.dqi_free_entry); 4116 } 4117 4118 void ocfs2_qinfo_unlock(struct ocfs2_mem_dqinfo *oinfo, int ex) 4119 { 4120 struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock; 4121 struct ocfs2_super *osb = OCFS2_SB(oinfo->dqi_gi.dqi_sb); 4122 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 4123 4124 if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb)) 4125 ocfs2_cluster_unlock(osb, lockres, level); 4126 } 4127 4128 static int ocfs2_refresh_qinfo(struct ocfs2_mem_dqinfo *oinfo) 4129 { 4130 struct mem_dqinfo *info = sb_dqinfo(oinfo->dqi_gi.dqi_sb, 4131 oinfo->dqi_gi.dqi_type); 4132 struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock; 4133 struct ocfs2_qinfo_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 4134 struct buffer_head *bh = NULL; 4135 struct ocfs2_global_disk_dqinfo *gdinfo; 4136 int status = 0; 4137 4138 if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) && 4139 lvb->lvb_version == OCFS2_QINFO_LVB_VERSION) { 4140 info->dqi_bgrace = be32_to_cpu(lvb->lvb_bgrace); 4141 info->dqi_igrace = be32_to_cpu(lvb->lvb_igrace); 4142 oinfo->dqi_syncms = be32_to_cpu(lvb->lvb_syncms); 4143 oinfo->dqi_gi.dqi_blocks = be32_to_cpu(lvb->lvb_blocks); 4144 oinfo->dqi_gi.dqi_free_blk = be32_to_cpu(lvb->lvb_free_blk); 4145 oinfo->dqi_gi.dqi_free_entry = 4146 be32_to_cpu(lvb->lvb_free_entry); 4147 } else { 4148 status = ocfs2_read_quota_phys_block(oinfo->dqi_gqinode, 4149 oinfo->dqi_giblk, &bh); 4150 if (status) { 4151 mlog_errno(status); 4152 goto bail; 4153 } 4154 gdinfo = (struct ocfs2_global_disk_dqinfo *) 4155 (bh->b_data + OCFS2_GLOBAL_INFO_OFF); 4156 info->dqi_bgrace = le32_to_cpu(gdinfo->dqi_bgrace); 4157 info->dqi_igrace = le32_to_cpu(gdinfo->dqi_igrace); 4158 oinfo->dqi_syncms = le32_to_cpu(gdinfo->dqi_syncms); 4159 oinfo->dqi_gi.dqi_blocks = le32_to_cpu(gdinfo->dqi_blocks); 4160 oinfo->dqi_gi.dqi_free_blk = le32_to_cpu(gdinfo->dqi_free_blk); 4161 oinfo->dqi_gi.dqi_free_entry = 4162 le32_to_cpu(gdinfo->dqi_free_entry); 4163 brelse(bh); 4164 ocfs2_track_lock_refresh(lockres); 4165 } 4166 4167 bail: 4168 return status; 4169 } 4170 4171 /* Lock quota info, this function expects at least shared lock on the quota file 4172 * so that we can safely refresh quota info from disk. */ 4173 int ocfs2_qinfo_lock(struct ocfs2_mem_dqinfo *oinfo, int ex) 4174 { 4175 struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock; 4176 struct ocfs2_super *osb = OCFS2_SB(oinfo->dqi_gi.dqi_sb); 4177 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 4178 int status = 0; 4179 4180 /* On RO devices, locking really isn't needed... */ 4181 if (ocfs2_is_hard_readonly(osb)) { 4182 if (ex) 4183 status = -EROFS; 4184 goto bail; 4185 } 4186 if (ocfs2_mount_local(osb)) 4187 goto bail; 4188 4189 status = ocfs2_cluster_lock(osb, lockres, level, 0, 0); 4190 if (status < 0) { 4191 mlog_errno(status); 4192 goto bail; 4193 } 4194 if (!ocfs2_should_refresh_lock_res(lockres)) 4195 goto bail; 4196 /* OK, we have the lock but we need to refresh the quota info */ 4197 status = ocfs2_refresh_qinfo(oinfo); 4198 if (status) 4199 ocfs2_qinfo_unlock(oinfo, ex); 4200 ocfs2_complete_lock_res_refresh(lockres, status); 4201 bail: 4202 return status; 4203 } 4204 4205 int ocfs2_refcount_lock(struct ocfs2_refcount_tree *ref_tree, int ex) 4206 { 4207 int status; 4208 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 4209 struct ocfs2_lock_res *lockres = &ref_tree->rf_lockres; 4210 struct ocfs2_super *osb = lockres->l_priv; 4211 4212 4213 if (ocfs2_is_hard_readonly(osb)) 4214 return -EROFS; 4215 4216 if (ocfs2_mount_local(osb)) 4217 return 0; 4218 4219 status = ocfs2_cluster_lock(osb, lockres, level, 0, 0); 4220 if (status < 0) 4221 mlog_errno(status); 4222 4223 return status; 4224 } 4225 4226 void ocfs2_refcount_unlock(struct ocfs2_refcount_tree *ref_tree, int ex) 4227 { 4228 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 4229 struct ocfs2_lock_res *lockres = &ref_tree->rf_lockres; 4230 struct ocfs2_super *osb = lockres->l_priv; 4231 4232 if (!ocfs2_mount_local(osb)) 4233 ocfs2_cluster_unlock(osb, lockres, level); 4234 } 4235 4236 static void ocfs2_process_blocked_lock(struct ocfs2_super *osb, 4237 struct ocfs2_lock_res *lockres) 4238 { 4239 int status; 4240 struct ocfs2_unblock_ctl ctl = {0, 0,}; 4241 unsigned long flags; 4242 4243 /* Our reference to the lockres in this function can be 4244 * considered valid until we remove the OCFS2_LOCK_QUEUED 4245 * flag. */ 4246 4247 BUG_ON(!lockres); 4248 BUG_ON(!lockres->l_ops); 4249 4250 mlog(ML_BASTS, "lockres %s blocked\n", lockres->l_name); 4251 4252 /* Detect whether a lock has been marked as going away while 4253 * the downconvert thread was processing other things. A lock can 4254 * still be marked with OCFS2_LOCK_FREEING after this check, 4255 * but short circuiting here will still save us some 4256 * performance. */ 4257 spin_lock_irqsave(&lockres->l_lock, flags); 4258 if (lockres->l_flags & OCFS2_LOCK_FREEING) 4259 goto unqueue; 4260 spin_unlock_irqrestore(&lockres->l_lock, flags); 4261 4262 status = ocfs2_unblock_lock(osb, lockres, &ctl); 4263 if (status < 0) 4264 mlog_errno(status); 4265 4266 spin_lock_irqsave(&lockres->l_lock, flags); 4267 unqueue: 4268 if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) { 4269 lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED); 4270 } else 4271 ocfs2_schedule_blocked_lock(osb, lockres); 4272 4273 mlog(ML_BASTS, "lockres %s, requeue = %s.\n", lockres->l_name, 4274 ctl.requeue ? "yes" : "no"); 4275 spin_unlock_irqrestore(&lockres->l_lock, flags); 4276 4277 if (ctl.unblock_action != UNBLOCK_CONTINUE 4278 && lockres->l_ops->post_unlock) 4279 lockres->l_ops->post_unlock(osb, lockres); 4280 } 4281 4282 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb, 4283 struct ocfs2_lock_res *lockres) 4284 { 4285 unsigned long flags; 4286 4287 assert_spin_locked(&lockres->l_lock); 4288 4289 if (lockres->l_flags & OCFS2_LOCK_FREEING) { 4290 /* Do not schedule a lock for downconvert when it's on 4291 * the way to destruction - any nodes wanting access 4292 * to the resource will get it soon. */ 4293 mlog(ML_BASTS, "lockres %s won't be scheduled: flags 0x%lx\n", 4294 lockres->l_name, lockres->l_flags); 4295 return; 4296 } 4297 4298 lockres_or_flags(lockres, OCFS2_LOCK_QUEUED); 4299 4300 spin_lock_irqsave(&osb->dc_task_lock, flags); 4301 if (list_empty(&lockres->l_blocked_list)) { 4302 list_add_tail(&lockres->l_blocked_list, 4303 &osb->blocked_lock_list); 4304 osb->blocked_lock_count++; 4305 } 4306 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4307 } 4308 4309 static void ocfs2_downconvert_thread_do_work(struct ocfs2_super *osb) 4310 { 4311 unsigned long processed; 4312 unsigned long flags; 4313 struct ocfs2_lock_res *lockres; 4314 4315 spin_lock_irqsave(&osb->dc_task_lock, flags); 4316 /* grab this early so we know to try again if a state change and 4317 * wake happens part-way through our work */ 4318 osb->dc_work_sequence = osb->dc_wake_sequence; 4319 4320 processed = osb->blocked_lock_count; 4321 /* 4322 * blocked lock processing in this loop might call iput which can 4323 * remove items off osb->blocked_lock_list. Downconvert up to 4324 * 'processed' number of locks, but stop short if we had some 4325 * removed in ocfs2_mark_lockres_freeing when downconverting. 4326 */ 4327 while (processed && !list_empty(&osb->blocked_lock_list)) { 4328 lockres = list_entry(osb->blocked_lock_list.next, 4329 struct ocfs2_lock_res, l_blocked_list); 4330 list_del_init(&lockres->l_blocked_list); 4331 osb->blocked_lock_count--; 4332 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4333 4334 BUG_ON(!processed); 4335 processed--; 4336 4337 ocfs2_process_blocked_lock(osb, lockres); 4338 4339 spin_lock_irqsave(&osb->dc_task_lock, flags); 4340 } 4341 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4342 } 4343 4344 static int ocfs2_downconvert_thread_lists_empty(struct ocfs2_super *osb) 4345 { 4346 int empty = 0; 4347 unsigned long flags; 4348 4349 spin_lock_irqsave(&osb->dc_task_lock, flags); 4350 if (list_empty(&osb->blocked_lock_list)) 4351 empty = 1; 4352 4353 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4354 return empty; 4355 } 4356 4357 static int ocfs2_downconvert_thread_should_wake(struct ocfs2_super *osb) 4358 { 4359 int should_wake = 0; 4360 unsigned long flags; 4361 4362 spin_lock_irqsave(&osb->dc_task_lock, flags); 4363 if (osb->dc_work_sequence != osb->dc_wake_sequence) 4364 should_wake = 1; 4365 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4366 4367 return should_wake; 4368 } 4369 4370 static int ocfs2_downconvert_thread(void *arg) 4371 { 4372 int status = 0; 4373 struct ocfs2_super *osb = arg; 4374 4375 /* only quit once we've been asked to stop and there is no more 4376 * work available */ 4377 while (!(kthread_should_stop() && 4378 ocfs2_downconvert_thread_lists_empty(osb))) { 4379 4380 wait_event_interruptible(osb->dc_event, 4381 ocfs2_downconvert_thread_should_wake(osb) || 4382 kthread_should_stop()); 4383 4384 mlog(0, "downconvert_thread: awoken\n"); 4385 4386 ocfs2_downconvert_thread_do_work(osb); 4387 } 4388 4389 osb->dc_task = NULL; 4390 return status; 4391 } 4392 4393 void ocfs2_wake_downconvert_thread(struct ocfs2_super *osb) 4394 { 4395 unsigned long flags; 4396 4397 spin_lock_irqsave(&osb->dc_task_lock, flags); 4398 /* make sure the voting thread gets a swipe at whatever changes 4399 * the caller may have made to the voting state */ 4400 osb->dc_wake_sequence++; 4401 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4402 wake_up(&osb->dc_event); 4403 } 4404