1 /* -*- mode: c; c-basic-offset: 8; -*- 2 * vim: noexpandtab sw=8 ts=8 sts=0: 3 * 4 * dlmglue.c 5 * 6 * Code which implements an OCFS2 specific interface to our DLM. 7 * 8 * Copyright (C) 2003, 2004 Oracle. All rights reserved. 9 * 10 * This program is free software; you can redistribute it and/or 11 * modify it under the terms of the GNU General Public 12 * License as published by the Free Software Foundation; either 13 * version 2 of the License, or (at your option) any later version. 14 * 15 * This program is distributed in the hope that it will be useful, 16 * but WITHOUT ANY WARRANTY; without even the implied warranty of 17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 18 * General Public License for more details. 19 * 20 * You should have received a copy of the GNU General Public 21 * License along with this program; if not, write to the 22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 23 * Boston, MA 021110-1307, USA. 24 */ 25 26 #include <linux/types.h> 27 #include <linux/slab.h> 28 #include <linux/highmem.h> 29 #include <linux/mm.h> 30 #include <linux/kthread.h> 31 #include <linux/pagemap.h> 32 #include <linux/debugfs.h> 33 #include <linux/seq_file.h> 34 #include <linux/time.h> 35 #include <linux/quotaops.h> 36 #include <linux/sched/signal.h> 37 38 #define MLOG_MASK_PREFIX ML_DLM_GLUE 39 #include <cluster/masklog.h> 40 41 #include "ocfs2.h" 42 #include "ocfs2_lockingver.h" 43 44 #include "alloc.h" 45 #include "dcache.h" 46 #include "dlmglue.h" 47 #include "extent_map.h" 48 #include "file.h" 49 #include "heartbeat.h" 50 #include "inode.h" 51 #include "journal.h" 52 #include "stackglue.h" 53 #include "slot_map.h" 54 #include "super.h" 55 #include "uptodate.h" 56 #include "quota.h" 57 #include "refcounttree.h" 58 #include "acl.h" 59 60 #include "buffer_head_io.h" 61 62 struct ocfs2_mask_waiter { 63 struct list_head mw_item; 64 int mw_status; 65 struct completion mw_complete; 66 unsigned long mw_mask; 67 unsigned long mw_goal; 68 #ifdef CONFIG_OCFS2_FS_STATS 69 ktime_t mw_lock_start; 70 #endif 71 }; 72 73 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres); 74 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres); 75 static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres); 76 static struct ocfs2_super *ocfs2_get_qinfo_osb(struct ocfs2_lock_res *lockres); 77 78 /* 79 * Return value from ->downconvert_worker functions. 80 * 81 * These control the precise actions of ocfs2_unblock_lock() 82 * and ocfs2_process_blocked_lock() 83 * 84 */ 85 enum ocfs2_unblock_action { 86 UNBLOCK_CONTINUE = 0, /* Continue downconvert */ 87 UNBLOCK_CONTINUE_POST = 1, /* Continue downconvert, fire 88 * ->post_unlock callback */ 89 UNBLOCK_STOP_POST = 2, /* Do not downconvert, fire 90 * ->post_unlock() callback. */ 91 }; 92 93 struct ocfs2_unblock_ctl { 94 int requeue; 95 enum ocfs2_unblock_action unblock_action; 96 }; 97 98 /* Lockdep class keys */ 99 #ifdef CONFIG_DEBUG_LOCK_ALLOC 100 static struct lock_class_key lockdep_keys[OCFS2_NUM_LOCK_TYPES]; 101 #endif 102 103 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres, 104 int new_level); 105 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres); 106 107 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres, 108 int blocking); 109 110 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres, 111 int blocking); 112 113 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb, 114 struct ocfs2_lock_res *lockres); 115 116 static void ocfs2_set_qinfo_lvb(struct ocfs2_lock_res *lockres); 117 118 static int ocfs2_check_refcount_downconvert(struct ocfs2_lock_res *lockres, 119 int new_level); 120 static int ocfs2_refcount_convert_worker(struct ocfs2_lock_res *lockres, 121 int blocking); 122 123 #define mlog_meta_lvb(__level, __lockres) ocfs2_dump_meta_lvb_info(__level, __PRETTY_FUNCTION__, __LINE__, __lockres) 124 125 /* This aids in debugging situations where a bad LVB might be involved. */ 126 static void ocfs2_dump_meta_lvb_info(u64 level, 127 const char *function, 128 unsigned int line, 129 struct ocfs2_lock_res *lockres) 130 { 131 struct ocfs2_meta_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 132 133 mlog(level, "LVB information for %s (called from %s:%u):\n", 134 lockres->l_name, function, line); 135 mlog(level, "version: %u, clusters: %u, generation: 0x%x\n", 136 lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters), 137 be32_to_cpu(lvb->lvb_igeneration)); 138 mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n", 139 (unsigned long long)be64_to_cpu(lvb->lvb_isize), 140 be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid), 141 be16_to_cpu(lvb->lvb_imode)); 142 mlog(level, "nlink %u, atime_packed 0x%llx, ctime_packed 0x%llx, " 143 "mtime_packed 0x%llx iattr 0x%x\n", be16_to_cpu(lvb->lvb_inlink), 144 (long long)be64_to_cpu(lvb->lvb_iatime_packed), 145 (long long)be64_to_cpu(lvb->lvb_ictime_packed), 146 (long long)be64_to_cpu(lvb->lvb_imtime_packed), 147 be32_to_cpu(lvb->lvb_iattr)); 148 } 149 150 151 /* 152 * OCFS2 Lock Resource Operations 153 * 154 * These fine tune the behavior of the generic dlmglue locking infrastructure. 155 * 156 * The most basic of lock types can point ->l_priv to their respective 157 * struct ocfs2_super and allow the default actions to manage things. 158 * 159 * Right now, each lock type also needs to implement an init function, 160 * and trivial lock/unlock wrappers. ocfs2_simple_drop_lockres() 161 * should be called when the lock is no longer needed (i.e., object 162 * destruction time). 163 */ 164 struct ocfs2_lock_res_ops { 165 /* 166 * Translate an ocfs2_lock_res * into an ocfs2_super *. Define 167 * this callback if ->l_priv is not an ocfs2_super pointer 168 */ 169 struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *); 170 171 /* 172 * Optionally called in the downconvert thread after a 173 * successful downconvert. The lockres will not be referenced 174 * after this callback is called, so it is safe to free 175 * memory, etc. 176 * 177 * The exact semantics of when this is called are controlled 178 * by ->downconvert_worker() 179 */ 180 void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *); 181 182 /* 183 * Allow a lock type to add checks to determine whether it is 184 * safe to downconvert a lock. Return 0 to re-queue the 185 * downconvert at a later time, nonzero to continue. 186 * 187 * For most locks, the default checks that there are no 188 * incompatible holders are sufficient. 189 * 190 * Called with the lockres spinlock held. 191 */ 192 int (*check_downconvert)(struct ocfs2_lock_res *, int); 193 194 /* 195 * Allows a lock type to populate the lock value block. This 196 * is called on downconvert, and when we drop a lock. 197 * 198 * Locks that want to use this should set LOCK_TYPE_USES_LVB 199 * in the flags field. 200 * 201 * Called with the lockres spinlock held. 202 */ 203 void (*set_lvb)(struct ocfs2_lock_res *); 204 205 /* 206 * Called from the downconvert thread when it is determined 207 * that a lock will be downconverted. This is called without 208 * any locks held so the function can do work that might 209 * schedule (syncing out data, etc). 210 * 211 * This should return any one of the ocfs2_unblock_action 212 * values, depending on what it wants the thread to do. 213 */ 214 int (*downconvert_worker)(struct ocfs2_lock_res *, int); 215 216 /* 217 * LOCK_TYPE_* flags which describe the specific requirements 218 * of a lock type. Descriptions of each individual flag follow. 219 */ 220 int flags; 221 }; 222 223 /* 224 * Some locks want to "refresh" potentially stale data when a 225 * meaningful (PRMODE or EXMODE) lock level is first obtained. If this 226 * flag is set, the OCFS2_LOCK_NEEDS_REFRESH flag will be set on the 227 * individual lockres l_flags member from the ast function. It is 228 * expected that the locking wrapper will clear the 229 * OCFS2_LOCK_NEEDS_REFRESH flag when done. 230 */ 231 #define LOCK_TYPE_REQUIRES_REFRESH 0x1 232 233 /* 234 * Indicate that a lock type makes use of the lock value block. The 235 * ->set_lvb lock type callback must be defined. 236 */ 237 #define LOCK_TYPE_USES_LVB 0x2 238 239 static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = { 240 .get_osb = ocfs2_get_inode_osb, 241 .flags = 0, 242 }; 243 244 static struct ocfs2_lock_res_ops ocfs2_inode_inode_lops = { 245 .get_osb = ocfs2_get_inode_osb, 246 .check_downconvert = ocfs2_check_meta_downconvert, 247 .set_lvb = ocfs2_set_meta_lvb, 248 .downconvert_worker = ocfs2_data_convert_worker, 249 .flags = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB, 250 }; 251 252 static struct ocfs2_lock_res_ops ocfs2_super_lops = { 253 .flags = LOCK_TYPE_REQUIRES_REFRESH, 254 }; 255 256 static struct ocfs2_lock_res_ops ocfs2_rename_lops = { 257 .flags = 0, 258 }; 259 260 static struct ocfs2_lock_res_ops ocfs2_nfs_sync_lops = { 261 .flags = 0, 262 }; 263 264 static struct ocfs2_lock_res_ops ocfs2_trim_fs_lops = { 265 .flags = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB, 266 }; 267 268 static struct ocfs2_lock_res_ops ocfs2_orphan_scan_lops = { 269 .flags = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB, 270 }; 271 272 static struct ocfs2_lock_res_ops ocfs2_dentry_lops = { 273 .get_osb = ocfs2_get_dentry_osb, 274 .post_unlock = ocfs2_dentry_post_unlock, 275 .downconvert_worker = ocfs2_dentry_convert_worker, 276 .flags = 0, 277 }; 278 279 static struct ocfs2_lock_res_ops ocfs2_inode_open_lops = { 280 .get_osb = ocfs2_get_inode_osb, 281 .flags = 0, 282 }; 283 284 static struct ocfs2_lock_res_ops ocfs2_flock_lops = { 285 .get_osb = ocfs2_get_file_osb, 286 .flags = 0, 287 }; 288 289 static struct ocfs2_lock_res_ops ocfs2_qinfo_lops = { 290 .set_lvb = ocfs2_set_qinfo_lvb, 291 .get_osb = ocfs2_get_qinfo_osb, 292 .flags = LOCK_TYPE_REQUIRES_REFRESH | LOCK_TYPE_USES_LVB, 293 }; 294 295 static struct ocfs2_lock_res_ops ocfs2_refcount_block_lops = { 296 .check_downconvert = ocfs2_check_refcount_downconvert, 297 .downconvert_worker = ocfs2_refcount_convert_worker, 298 .flags = 0, 299 }; 300 301 static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres) 302 { 303 return lockres->l_type == OCFS2_LOCK_TYPE_META || 304 lockres->l_type == OCFS2_LOCK_TYPE_RW || 305 lockres->l_type == OCFS2_LOCK_TYPE_OPEN; 306 } 307 308 static inline struct ocfs2_lock_res *ocfs2_lksb_to_lock_res(struct ocfs2_dlm_lksb *lksb) 309 { 310 return container_of(lksb, struct ocfs2_lock_res, l_lksb); 311 } 312 313 static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres) 314 { 315 BUG_ON(!ocfs2_is_inode_lock(lockres)); 316 317 return (struct inode *) lockres->l_priv; 318 } 319 320 static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres) 321 { 322 BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY); 323 324 return (struct ocfs2_dentry_lock *)lockres->l_priv; 325 } 326 327 static inline struct ocfs2_mem_dqinfo *ocfs2_lock_res_qinfo(struct ocfs2_lock_res *lockres) 328 { 329 BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_QINFO); 330 331 return (struct ocfs2_mem_dqinfo *)lockres->l_priv; 332 } 333 334 static inline struct ocfs2_refcount_tree * 335 ocfs2_lock_res_refcount_tree(struct ocfs2_lock_res *res) 336 { 337 return container_of(res, struct ocfs2_refcount_tree, rf_lockres); 338 } 339 340 static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *lockres) 341 { 342 if (lockres->l_ops->get_osb) 343 return lockres->l_ops->get_osb(lockres); 344 345 return (struct ocfs2_super *)lockres->l_priv; 346 } 347 348 static int ocfs2_lock_create(struct ocfs2_super *osb, 349 struct ocfs2_lock_res *lockres, 350 int level, 351 u32 dlm_flags); 352 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres, 353 int wanted); 354 static void __ocfs2_cluster_unlock(struct ocfs2_super *osb, 355 struct ocfs2_lock_res *lockres, 356 int level, unsigned long caller_ip); 357 static inline void ocfs2_cluster_unlock(struct ocfs2_super *osb, 358 struct ocfs2_lock_res *lockres, 359 int level) 360 { 361 __ocfs2_cluster_unlock(osb, lockres, level, _RET_IP_); 362 } 363 364 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres); 365 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres); 366 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres); 367 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, int level); 368 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb, 369 struct ocfs2_lock_res *lockres); 370 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres, 371 int convert); 372 #define ocfs2_log_dlm_error(_func, _err, _lockres) do { \ 373 if ((_lockres)->l_type != OCFS2_LOCK_TYPE_DENTRY) \ 374 mlog(ML_ERROR, "DLM error %d while calling %s on resource %s\n", \ 375 _err, _func, _lockres->l_name); \ 376 else \ 377 mlog(ML_ERROR, "DLM error %d while calling %s on resource %.*s%08x\n", \ 378 _err, _func, OCFS2_DENTRY_LOCK_INO_START - 1, (_lockres)->l_name, \ 379 (unsigned int)ocfs2_get_dentry_lock_ino(_lockres)); \ 380 } while (0) 381 static int ocfs2_downconvert_thread(void *arg); 382 static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb, 383 struct ocfs2_lock_res *lockres); 384 static int ocfs2_inode_lock_update(struct inode *inode, 385 struct buffer_head **bh); 386 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb); 387 static inline int ocfs2_highest_compat_lock_level(int level); 388 static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres, 389 int new_level); 390 static int ocfs2_downconvert_lock(struct ocfs2_super *osb, 391 struct ocfs2_lock_res *lockres, 392 int new_level, 393 int lvb, 394 unsigned int generation); 395 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb, 396 struct ocfs2_lock_res *lockres); 397 static int ocfs2_cancel_convert(struct ocfs2_super *osb, 398 struct ocfs2_lock_res *lockres); 399 400 401 static void ocfs2_build_lock_name(enum ocfs2_lock_type type, 402 u64 blkno, 403 u32 generation, 404 char *name) 405 { 406 int len; 407 408 BUG_ON(type >= OCFS2_NUM_LOCK_TYPES); 409 410 len = snprintf(name, OCFS2_LOCK_ID_MAX_LEN, "%c%s%016llx%08x", 411 ocfs2_lock_type_char(type), OCFS2_LOCK_ID_PAD, 412 (long long)blkno, generation); 413 414 BUG_ON(len != (OCFS2_LOCK_ID_MAX_LEN - 1)); 415 416 mlog(0, "built lock resource with name: %s\n", name); 417 } 418 419 static DEFINE_SPINLOCK(ocfs2_dlm_tracking_lock); 420 421 static void ocfs2_add_lockres_tracking(struct ocfs2_lock_res *res, 422 struct ocfs2_dlm_debug *dlm_debug) 423 { 424 mlog(0, "Add tracking for lockres %s\n", res->l_name); 425 426 spin_lock(&ocfs2_dlm_tracking_lock); 427 list_add(&res->l_debug_list, &dlm_debug->d_lockres_tracking); 428 spin_unlock(&ocfs2_dlm_tracking_lock); 429 } 430 431 static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res) 432 { 433 spin_lock(&ocfs2_dlm_tracking_lock); 434 if (!list_empty(&res->l_debug_list)) 435 list_del_init(&res->l_debug_list); 436 spin_unlock(&ocfs2_dlm_tracking_lock); 437 } 438 439 #ifdef CONFIG_OCFS2_FS_STATS 440 static void ocfs2_init_lock_stats(struct ocfs2_lock_res *res) 441 { 442 res->l_lock_refresh = 0; 443 memset(&res->l_lock_prmode, 0, sizeof(struct ocfs2_lock_stats)); 444 memset(&res->l_lock_exmode, 0, sizeof(struct ocfs2_lock_stats)); 445 } 446 447 static void ocfs2_update_lock_stats(struct ocfs2_lock_res *res, int level, 448 struct ocfs2_mask_waiter *mw, int ret) 449 { 450 u32 usec; 451 ktime_t kt; 452 struct ocfs2_lock_stats *stats; 453 454 if (level == LKM_PRMODE) 455 stats = &res->l_lock_prmode; 456 else if (level == LKM_EXMODE) 457 stats = &res->l_lock_exmode; 458 else 459 return; 460 461 kt = ktime_sub(ktime_get(), mw->mw_lock_start); 462 usec = ktime_to_us(kt); 463 464 stats->ls_gets++; 465 stats->ls_total += ktime_to_ns(kt); 466 /* overflow */ 467 if (unlikely(stats->ls_gets == 0)) { 468 stats->ls_gets++; 469 stats->ls_total = ktime_to_ns(kt); 470 } 471 472 if (stats->ls_max < usec) 473 stats->ls_max = usec; 474 475 if (ret) 476 stats->ls_fail++; 477 } 478 479 static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres) 480 { 481 lockres->l_lock_refresh++; 482 } 483 484 static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw) 485 { 486 mw->mw_lock_start = ktime_get(); 487 } 488 #else 489 static inline void ocfs2_init_lock_stats(struct ocfs2_lock_res *res) 490 { 491 } 492 static inline void ocfs2_update_lock_stats(struct ocfs2_lock_res *res, 493 int level, struct ocfs2_mask_waiter *mw, int ret) 494 { 495 } 496 static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres) 497 { 498 } 499 static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw) 500 { 501 } 502 #endif 503 504 static void ocfs2_lock_res_init_common(struct ocfs2_super *osb, 505 struct ocfs2_lock_res *res, 506 enum ocfs2_lock_type type, 507 struct ocfs2_lock_res_ops *ops, 508 void *priv) 509 { 510 res->l_type = type; 511 res->l_ops = ops; 512 res->l_priv = priv; 513 514 res->l_level = DLM_LOCK_IV; 515 res->l_requested = DLM_LOCK_IV; 516 res->l_blocking = DLM_LOCK_IV; 517 res->l_action = OCFS2_AST_INVALID; 518 res->l_unlock_action = OCFS2_UNLOCK_INVALID; 519 520 res->l_flags = OCFS2_LOCK_INITIALIZED; 521 522 ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug); 523 524 ocfs2_init_lock_stats(res); 525 #ifdef CONFIG_DEBUG_LOCK_ALLOC 526 if (type != OCFS2_LOCK_TYPE_OPEN) 527 lockdep_init_map(&res->l_lockdep_map, ocfs2_lock_type_strings[type], 528 &lockdep_keys[type], 0); 529 else 530 res->l_lockdep_map.key = NULL; 531 #endif 532 } 533 534 void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res) 535 { 536 /* This also clears out the lock status block */ 537 memset(res, 0, sizeof(struct ocfs2_lock_res)); 538 spin_lock_init(&res->l_lock); 539 init_waitqueue_head(&res->l_event); 540 INIT_LIST_HEAD(&res->l_blocked_list); 541 INIT_LIST_HEAD(&res->l_mask_waiters); 542 INIT_LIST_HEAD(&res->l_holders); 543 } 544 545 void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res, 546 enum ocfs2_lock_type type, 547 unsigned int generation, 548 struct inode *inode) 549 { 550 struct ocfs2_lock_res_ops *ops; 551 552 switch(type) { 553 case OCFS2_LOCK_TYPE_RW: 554 ops = &ocfs2_inode_rw_lops; 555 break; 556 case OCFS2_LOCK_TYPE_META: 557 ops = &ocfs2_inode_inode_lops; 558 break; 559 case OCFS2_LOCK_TYPE_OPEN: 560 ops = &ocfs2_inode_open_lops; 561 break; 562 default: 563 mlog_bug_on_msg(1, "type: %d\n", type); 564 ops = NULL; /* thanks, gcc */ 565 break; 566 }; 567 568 ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno, 569 generation, res->l_name); 570 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode); 571 } 572 573 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres) 574 { 575 struct inode *inode = ocfs2_lock_res_inode(lockres); 576 577 return OCFS2_SB(inode->i_sb); 578 } 579 580 static struct ocfs2_super *ocfs2_get_qinfo_osb(struct ocfs2_lock_res *lockres) 581 { 582 struct ocfs2_mem_dqinfo *info = lockres->l_priv; 583 584 return OCFS2_SB(info->dqi_gi.dqi_sb); 585 } 586 587 static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres) 588 { 589 struct ocfs2_file_private *fp = lockres->l_priv; 590 591 return OCFS2_SB(fp->fp_file->f_mapping->host->i_sb); 592 } 593 594 static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres) 595 { 596 __be64 inode_blkno_be; 597 598 memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], 599 sizeof(__be64)); 600 601 return be64_to_cpu(inode_blkno_be); 602 } 603 604 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres) 605 { 606 struct ocfs2_dentry_lock *dl = lockres->l_priv; 607 608 return OCFS2_SB(dl->dl_inode->i_sb); 609 } 610 611 void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl, 612 u64 parent, struct inode *inode) 613 { 614 int len; 615 u64 inode_blkno = OCFS2_I(inode)->ip_blkno; 616 __be64 inode_blkno_be = cpu_to_be64(inode_blkno); 617 struct ocfs2_lock_res *lockres = &dl->dl_lockres; 618 619 ocfs2_lock_res_init_once(lockres); 620 621 /* 622 * Unfortunately, the standard lock naming scheme won't work 623 * here because we have two 16 byte values to use. Instead, 624 * we'll stuff the inode number as a binary value. We still 625 * want error prints to show something without garbling the 626 * display, so drop a null byte in there before the inode 627 * number. A future version of OCFS2 will likely use all 628 * binary lock names. The stringified names have been a 629 * tremendous aid in debugging, but now that the debugfs 630 * interface exists, we can mangle things there if need be. 631 * 632 * NOTE: We also drop the standard "pad" value (the total lock 633 * name size stays the same though - the last part is all 634 * zeros due to the memset in ocfs2_lock_res_init_once() 635 */ 636 len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START, 637 "%c%016llx", 638 ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY), 639 (long long)parent); 640 641 BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1)); 642 643 memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be, 644 sizeof(__be64)); 645 646 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres, 647 OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops, 648 dl); 649 } 650 651 static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res, 652 struct ocfs2_super *osb) 653 { 654 /* Superblock lockres doesn't come from a slab so we call init 655 * once on it manually. */ 656 ocfs2_lock_res_init_once(res); 657 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO, 658 0, res->l_name); 659 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER, 660 &ocfs2_super_lops, osb); 661 } 662 663 static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res, 664 struct ocfs2_super *osb) 665 { 666 /* Rename lockres doesn't come from a slab so we call init 667 * once on it manually. */ 668 ocfs2_lock_res_init_once(res); 669 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name); 670 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME, 671 &ocfs2_rename_lops, osb); 672 } 673 674 static void ocfs2_nfs_sync_lock_res_init(struct ocfs2_lock_res *res, 675 struct ocfs2_super *osb) 676 { 677 /* nfs_sync lockres doesn't come from a slab so we call init 678 * once on it manually. */ 679 ocfs2_lock_res_init_once(res); 680 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_NFS_SYNC, 0, 0, res->l_name); 681 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_NFS_SYNC, 682 &ocfs2_nfs_sync_lops, osb); 683 } 684 685 void ocfs2_trim_fs_lock_res_init(struct ocfs2_super *osb) 686 { 687 struct ocfs2_lock_res *lockres = &osb->osb_trim_fs_lockres; 688 689 /* Only one trimfs thread are allowed to work at the same time. */ 690 mutex_lock(&osb->obs_trim_fs_mutex); 691 692 ocfs2_lock_res_init_once(lockres); 693 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_TRIM_FS, 0, 0, lockres->l_name); 694 ocfs2_lock_res_init_common(osb, lockres, OCFS2_LOCK_TYPE_TRIM_FS, 695 &ocfs2_trim_fs_lops, osb); 696 } 697 698 void ocfs2_trim_fs_lock_res_uninit(struct ocfs2_super *osb) 699 { 700 struct ocfs2_lock_res *lockres = &osb->osb_trim_fs_lockres; 701 702 ocfs2_simple_drop_lockres(osb, lockres); 703 ocfs2_lock_res_free(lockres); 704 705 mutex_unlock(&osb->obs_trim_fs_mutex); 706 } 707 708 static void ocfs2_orphan_scan_lock_res_init(struct ocfs2_lock_res *res, 709 struct ocfs2_super *osb) 710 { 711 ocfs2_lock_res_init_once(res); 712 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_ORPHAN_SCAN, 0, 0, res->l_name); 713 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_ORPHAN_SCAN, 714 &ocfs2_orphan_scan_lops, osb); 715 } 716 717 void ocfs2_file_lock_res_init(struct ocfs2_lock_res *lockres, 718 struct ocfs2_file_private *fp) 719 { 720 struct inode *inode = fp->fp_file->f_mapping->host; 721 struct ocfs2_inode_info *oi = OCFS2_I(inode); 722 723 ocfs2_lock_res_init_once(lockres); 724 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_FLOCK, oi->ip_blkno, 725 inode->i_generation, lockres->l_name); 726 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres, 727 OCFS2_LOCK_TYPE_FLOCK, &ocfs2_flock_lops, 728 fp); 729 lockres->l_flags |= OCFS2_LOCK_NOCACHE; 730 } 731 732 void ocfs2_qinfo_lock_res_init(struct ocfs2_lock_res *lockres, 733 struct ocfs2_mem_dqinfo *info) 734 { 735 ocfs2_lock_res_init_once(lockres); 736 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_QINFO, info->dqi_gi.dqi_type, 737 0, lockres->l_name); 738 ocfs2_lock_res_init_common(OCFS2_SB(info->dqi_gi.dqi_sb), lockres, 739 OCFS2_LOCK_TYPE_QINFO, &ocfs2_qinfo_lops, 740 info); 741 } 742 743 void ocfs2_refcount_lock_res_init(struct ocfs2_lock_res *lockres, 744 struct ocfs2_super *osb, u64 ref_blkno, 745 unsigned int generation) 746 { 747 ocfs2_lock_res_init_once(lockres); 748 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_REFCOUNT, ref_blkno, 749 generation, lockres->l_name); 750 ocfs2_lock_res_init_common(osb, lockres, OCFS2_LOCK_TYPE_REFCOUNT, 751 &ocfs2_refcount_block_lops, osb); 752 } 753 754 void ocfs2_lock_res_free(struct ocfs2_lock_res *res) 755 { 756 if (!(res->l_flags & OCFS2_LOCK_INITIALIZED)) 757 return; 758 759 ocfs2_remove_lockres_tracking(res); 760 761 mlog_bug_on_msg(!list_empty(&res->l_blocked_list), 762 "Lockres %s is on the blocked list\n", 763 res->l_name); 764 mlog_bug_on_msg(!list_empty(&res->l_mask_waiters), 765 "Lockres %s has mask waiters pending\n", 766 res->l_name); 767 mlog_bug_on_msg(spin_is_locked(&res->l_lock), 768 "Lockres %s is locked\n", 769 res->l_name); 770 mlog_bug_on_msg(res->l_ro_holders, 771 "Lockres %s has %u ro holders\n", 772 res->l_name, res->l_ro_holders); 773 mlog_bug_on_msg(res->l_ex_holders, 774 "Lockres %s has %u ex holders\n", 775 res->l_name, res->l_ex_holders); 776 777 /* Need to clear out the lock status block for the dlm */ 778 memset(&res->l_lksb, 0, sizeof(res->l_lksb)); 779 780 res->l_flags = 0UL; 781 } 782 783 /* 784 * Keep a list of processes who have interest in a lockres. 785 * Note: this is now only uesed for check recursive cluster locking. 786 */ 787 static inline void ocfs2_add_holder(struct ocfs2_lock_res *lockres, 788 struct ocfs2_lock_holder *oh) 789 { 790 INIT_LIST_HEAD(&oh->oh_list); 791 oh->oh_owner_pid = get_pid(task_pid(current)); 792 793 spin_lock(&lockres->l_lock); 794 list_add_tail(&oh->oh_list, &lockres->l_holders); 795 spin_unlock(&lockres->l_lock); 796 } 797 798 static struct ocfs2_lock_holder * 799 ocfs2_pid_holder(struct ocfs2_lock_res *lockres, 800 struct pid *pid) 801 { 802 struct ocfs2_lock_holder *oh; 803 804 spin_lock(&lockres->l_lock); 805 list_for_each_entry(oh, &lockres->l_holders, oh_list) { 806 if (oh->oh_owner_pid == pid) { 807 spin_unlock(&lockres->l_lock); 808 return oh; 809 } 810 } 811 spin_unlock(&lockres->l_lock); 812 return NULL; 813 } 814 815 static inline void ocfs2_remove_holder(struct ocfs2_lock_res *lockres, 816 struct ocfs2_lock_holder *oh) 817 { 818 spin_lock(&lockres->l_lock); 819 list_del(&oh->oh_list); 820 spin_unlock(&lockres->l_lock); 821 822 put_pid(oh->oh_owner_pid); 823 } 824 825 826 static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres, 827 int level) 828 { 829 BUG_ON(!lockres); 830 831 switch(level) { 832 case DLM_LOCK_EX: 833 lockres->l_ex_holders++; 834 break; 835 case DLM_LOCK_PR: 836 lockres->l_ro_holders++; 837 break; 838 default: 839 BUG(); 840 } 841 } 842 843 static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres, 844 int level) 845 { 846 BUG_ON(!lockres); 847 848 switch(level) { 849 case DLM_LOCK_EX: 850 BUG_ON(!lockres->l_ex_holders); 851 lockres->l_ex_holders--; 852 break; 853 case DLM_LOCK_PR: 854 BUG_ON(!lockres->l_ro_holders); 855 lockres->l_ro_holders--; 856 break; 857 default: 858 BUG(); 859 } 860 } 861 862 /* WARNING: This function lives in a world where the only three lock 863 * levels are EX, PR, and NL. It *will* have to be adjusted when more 864 * lock types are added. */ 865 static inline int ocfs2_highest_compat_lock_level(int level) 866 { 867 int new_level = DLM_LOCK_EX; 868 869 if (level == DLM_LOCK_EX) 870 new_level = DLM_LOCK_NL; 871 else if (level == DLM_LOCK_PR) 872 new_level = DLM_LOCK_PR; 873 return new_level; 874 } 875 876 static void lockres_set_flags(struct ocfs2_lock_res *lockres, 877 unsigned long newflags) 878 { 879 struct ocfs2_mask_waiter *mw, *tmp; 880 881 assert_spin_locked(&lockres->l_lock); 882 883 lockres->l_flags = newflags; 884 885 list_for_each_entry_safe(mw, tmp, &lockres->l_mask_waiters, mw_item) { 886 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal) 887 continue; 888 889 list_del_init(&mw->mw_item); 890 mw->mw_status = 0; 891 complete(&mw->mw_complete); 892 } 893 } 894 static void lockres_or_flags(struct ocfs2_lock_res *lockres, unsigned long or) 895 { 896 lockres_set_flags(lockres, lockres->l_flags | or); 897 } 898 static void lockres_clear_flags(struct ocfs2_lock_res *lockres, 899 unsigned long clear) 900 { 901 lockres_set_flags(lockres, lockres->l_flags & ~clear); 902 } 903 904 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres) 905 { 906 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 907 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED)); 908 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); 909 BUG_ON(lockres->l_blocking <= DLM_LOCK_NL); 910 911 lockres->l_level = lockres->l_requested; 912 if (lockres->l_level <= 913 ocfs2_highest_compat_lock_level(lockres->l_blocking)) { 914 lockres->l_blocking = DLM_LOCK_NL; 915 lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED); 916 } 917 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 918 } 919 920 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres) 921 { 922 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 923 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED)); 924 925 /* Convert from RO to EX doesn't really need anything as our 926 * information is already up to data. Convert from NL to 927 * *anything* however should mark ourselves as needing an 928 * update */ 929 if (lockres->l_level == DLM_LOCK_NL && 930 lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 931 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 932 933 lockres->l_level = lockres->l_requested; 934 935 /* 936 * We set the OCFS2_LOCK_UPCONVERT_FINISHING flag before clearing 937 * the OCFS2_LOCK_BUSY flag to prevent the dc thread from 938 * downconverting the lock before the upconvert has fully completed. 939 * Do not prevent the dc thread from downconverting if NONBLOCK lock 940 * had already returned. 941 */ 942 if (!(lockres->l_flags & OCFS2_LOCK_NONBLOCK_FINISHED)) 943 lockres_or_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING); 944 else 945 lockres_clear_flags(lockres, OCFS2_LOCK_NONBLOCK_FINISHED); 946 947 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 948 } 949 950 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres) 951 { 952 BUG_ON((!(lockres->l_flags & OCFS2_LOCK_BUSY))); 953 BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED); 954 955 if (lockres->l_requested > DLM_LOCK_NL && 956 !(lockres->l_flags & OCFS2_LOCK_LOCAL) && 957 lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 958 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 959 960 lockres->l_level = lockres->l_requested; 961 lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED); 962 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 963 } 964 965 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, 966 int level) 967 { 968 int needs_downconvert = 0; 969 970 assert_spin_locked(&lockres->l_lock); 971 972 if (level > lockres->l_blocking) { 973 /* only schedule a downconvert if we haven't already scheduled 974 * one that goes low enough to satisfy the level we're 975 * blocking. this also catches the case where we get 976 * duplicate BASTs */ 977 if (ocfs2_highest_compat_lock_level(level) < 978 ocfs2_highest_compat_lock_level(lockres->l_blocking)) 979 needs_downconvert = 1; 980 981 lockres->l_blocking = level; 982 } 983 984 mlog(ML_BASTS, "lockres %s, block %d, level %d, l_block %d, dwn %d\n", 985 lockres->l_name, level, lockres->l_level, lockres->l_blocking, 986 needs_downconvert); 987 988 if (needs_downconvert) 989 lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED); 990 mlog(0, "needs_downconvert = %d\n", needs_downconvert); 991 return needs_downconvert; 992 } 993 994 /* 995 * OCFS2_LOCK_PENDING and l_pending_gen. 996 * 997 * Why does OCFS2_LOCK_PENDING exist? To close a race between setting 998 * OCFS2_LOCK_BUSY and calling ocfs2_dlm_lock(). See ocfs2_unblock_lock() 999 * for more details on the race. 1000 * 1001 * OCFS2_LOCK_PENDING closes the race quite nicely. However, it introduces 1002 * a race on itself. In o2dlm, we can get the ast before ocfs2_dlm_lock() 1003 * returns. The ast clears OCFS2_LOCK_BUSY, and must therefore clear 1004 * OCFS2_LOCK_PENDING at the same time. When ocfs2_dlm_lock() returns, 1005 * the caller is going to try to clear PENDING again. If nothing else is 1006 * happening, __lockres_clear_pending() sees PENDING is unset and does 1007 * nothing. 1008 * 1009 * But what if another path (eg downconvert thread) has just started a 1010 * new locking action? The other path has re-set PENDING. Our path 1011 * cannot clear PENDING, because that will re-open the original race 1012 * window. 1013 * 1014 * [Example] 1015 * 1016 * ocfs2_meta_lock() 1017 * ocfs2_cluster_lock() 1018 * set BUSY 1019 * set PENDING 1020 * drop l_lock 1021 * ocfs2_dlm_lock() 1022 * ocfs2_locking_ast() ocfs2_downconvert_thread() 1023 * clear PENDING ocfs2_unblock_lock() 1024 * take_l_lock 1025 * !BUSY 1026 * ocfs2_prepare_downconvert() 1027 * set BUSY 1028 * set PENDING 1029 * drop l_lock 1030 * take l_lock 1031 * clear PENDING 1032 * drop l_lock 1033 * <window> 1034 * ocfs2_dlm_lock() 1035 * 1036 * So as you can see, we now have a window where l_lock is not held, 1037 * PENDING is not set, and ocfs2_dlm_lock() has not been called. 1038 * 1039 * The core problem is that ocfs2_cluster_lock() has cleared the PENDING 1040 * set by ocfs2_prepare_downconvert(). That wasn't nice. 1041 * 1042 * To solve this we introduce l_pending_gen. A call to 1043 * lockres_clear_pending() will only do so when it is passed a generation 1044 * number that matches the lockres. lockres_set_pending() will return the 1045 * current generation number. When ocfs2_cluster_lock() goes to clear 1046 * PENDING, it passes the generation it got from set_pending(). In our 1047 * example above, the generation numbers will *not* match. Thus, 1048 * ocfs2_cluster_lock() will not clear the PENDING set by 1049 * ocfs2_prepare_downconvert(). 1050 */ 1051 1052 /* Unlocked version for ocfs2_locking_ast() */ 1053 static void __lockres_clear_pending(struct ocfs2_lock_res *lockres, 1054 unsigned int generation, 1055 struct ocfs2_super *osb) 1056 { 1057 assert_spin_locked(&lockres->l_lock); 1058 1059 /* 1060 * The ast and locking functions can race us here. The winner 1061 * will clear pending, the loser will not. 1062 */ 1063 if (!(lockres->l_flags & OCFS2_LOCK_PENDING) || 1064 (lockres->l_pending_gen != generation)) 1065 return; 1066 1067 lockres_clear_flags(lockres, OCFS2_LOCK_PENDING); 1068 lockres->l_pending_gen++; 1069 1070 /* 1071 * The downconvert thread may have skipped us because we 1072 * were PENDING. Wake it up. 1073 */ 1074 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) 1075 ocfs2_wake_downconvert_thread(osb); 1076 } 1077 1078 /* Locked version for callers of ocfs2_dlm_lock() */ 1079 static void lockres_clear_pending(struct ocfs2_lock_res *lockres, 1080 unsigned int generation, 1081 struct ocfs2_super *osb) 1082 { 1083 unsigned long flags; 1084 1085 spin_lock_irqsave(&lockres->l_lock, flags); 1086 __lockres_clear_pending(lockres, generation, osb); 1087 spin_unlock_irqrestore(&lockres->l_lock, flags); 1088 } 1089 1090 static unsigned int lockres_set_pending(struct ocfs2_lock_res *lockres) 1091 { 1092 assert_spin_locked(&lockres->l_lock); 1093 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 1094 1095 lockres_or_flags(lockres, OCFS2_LOCK_PENDING); 1096 1097 return lockres->l_pending_gen; 1098 } 1099 1100 static void ocfs2_blocking_ast(struct ocfs2_dlm_lksb *lksb, int level) 1101 { 1102 struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb); 1103 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 1104 int needs_downconvert; 1105 unsigned long flags; 1106 1107 BUG_ON(level <= DLM_LOCK_NL); 1108 1109 mlog(ML_BASTS, "BAST fired for lockres %s, blocking %d, level %d, " 1110 "type %s\n", lockres->l_name, level, lockres->l_level, 1111 ocfs2_lock_type_string(lockres->l_type)); 1112 1113 /* 1114 * We can skip the bast for locks which don't enable caching - 1115 * they'll be dropped at the earliest possible time anyway. 1116 */ 1117 if (lockres->l_flags & OCFS2_LOCK_NOCACHE) 1118 return; 1119 1120 spin_lock_irqsave(&lockres->l_lock, flags); 1121 needs_downconvert = ocfs2_generic_handle_bast(lockres, level); 1122 if (needs_downconvert) 1123 ocfs2_schedule_blocked_lock(osb, lockres); 1124 spin_unlock_irqrestore(&lockres->l_lock, flags); 1125 1126 wake_up(&lockres->l_event); 1127 1128 ocfs2_wake_downconvert_thread(osb); 1129 } 1130 1131 static void ocfs2_locking_ast(struct ocfs2_dlm_lksb *lksb) 1132 { 1133 struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb); 1134 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 1135 unsigned long flags; 1136 int status; 1137 1138 spin_lock_irqsave(&lockres->l_lock, flags); 1139 1140 status = ocfs2_dlm_lock_status(&lockres->l_lksb); 1141 1142 if (status == -EAGAIN) { 1143 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 1144 goto out; 1145 } 1146 1147 if (status) { 1148 mlog(ML_ERROR, "lockres %s: lksb status value of %d!\n", 1149 lockres->l_name, status); 1150 spin_unlock_irqrestore(&lockres->l_lock, flags); 1151 return; 1152 } 1153 1154 mlog(ML_BASTS, "AST fired for lockres %s, action %d, unlock %d, " 1155 "level %d => %d\n", lockres->l_name, lockres->l_action, 1156 lockres->l_unlock_action, lockres->l_level, lockres->l_requested); 1157 1158 switch(lockres->l_action) { 1159 case OCFS2_AST_ATTACH: 1160 ocfs2_generic_handle_attach_action(lockres); 1161 lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL); 1162 break; 1163 case OCFS2_AST_CONVERT: 1164 ocfs2_generic_handle_convert_action(lockres); 1165 break; 1166 case OCFS2_AST_DOWNCONVERT: 1167 ocfs2_generic_handle_downconvert_action(lockres); 1168 break; 1169 default: 1170 mlog(ML_ERROR, "lockres %s: AST fired with invalid action: %u, " 1171 "flags 0x%lx, unlock: %u\n", 1172 lockres->l_name, lockres->l_action, lockres->l_flags, 1173 lockres->l_unlock_action); 1174 BUG(); 1175 } 1176 out: 1177 /* set it to something invalid so if we get called again we 1178 * can catch it. */ 1179 lockres->l_action = OCFS2_AST_INVALID; 1180 1181 /* Did we try to cancel this lock? Clear that state */ 1182 if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) 1183 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 1184 1185 /* 1186 * We may have beaten the locking functions here. We certainly 1187 * know that dlm_lock() has been called :-) 1188 * Because we can't have two lock calls in flight at once, we 1189 * can use lockres->l_pending_gen. 1190 */ 1191 __lockres_clear_pending(lockres, lockres->l_pending_gen, osb); 1192 1193 wake_up(&lockres->l_event); 1194 spin_unlock_irqrestore(&lockres->l_lock, flags); 1195 } 1196 1197 static void ocfs2_unlock_ast(struct ocfs2_dlm_lksb *lksb, int error) 1198 { 1199 struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb); 1200 unsigned long flags; 1201 1202 mlog(ML_BASTS, "UNLOCK AST fired for lockres %s, action = %d\n", 1203 lockres->l_name, lockres->l_unlock_action); 1204 1205 spin_lock_irqsave(&lockres->l_lock, flags); 1206 if (error) { 1207 mlog(ML_ERROR, "Dlm passes error %d for lock %s, " 1208 "unlock_action %d\n", error, lockres->l_name, 1209 lockres->l_unlock_action); 1210 spin_unlock_irqrestore(&lockres->l_lock, flags); 1211 return; 1212 } 1213 1214 switch(lockres->l_unlock_action) { 1215 case OCFS2_UNLOCK_CANCEL_CONVERT: 1216 mlog(0, "Cancel convert success for %s\n", lockres->l_name); 1217 lockres->l_action = OCFS2_AST_INVALID; 1218 /* Downconvert thread may have requeued this lock, we 1219 * need to wake it. */ 1220 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) 1221 ocfs2_wake_downconvert_thread(ocfs2_get_lockres_osb(lockres)); 1222 break; 1223 case OCFS2_UNLOCK_DROP_LOCK: 1224 lockres->l_level = DLM_LOCK_IV; 1225 break; 1226 default: 1227 BUG(); 1228 } 1229 1230 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 1231 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 1232 wake_up(&lockres->l_event); 1233 spin_unlock_irqrestore(&lockres->l_lock, flags); 1234 } 1235 1236 /* 1237 * This is the filesystem locking protocol. It provides the lock handling 1238 * hooks for the underlying DLM. It has a maximum version number. 1239 * The version number allows interoperability with systems running at 1240 * the same major number and an equal or smaller minor number. 1241 * 1242 * Whenever the filesystem does new things with locks (adds or removes a 1243 * lock, orders them differently, does different things underneath a lock), 1244 * the version must be changed. The protocol is negotiated when joining 1245 * the dlm domain. A node may join the domain if its major version is 1246 * identical to all other nodes and its minor version is greater than 1247 * or equal to all other nodes. When its minor version is greater than 1248 * the other nodes, it will run at the minor version specified by the 1249 * other nodes. 1250 * 1251 * If a locking change is made that will not be compatible with older 1252 * versions, the major number must be increased and the minor version set 1253 * to zero. If a change merely adds a behavior that can be disabled when 1254 * speaking to older versions, the minor version must be increased. If a 1255 * change adds a fully backwards compatible change (eg, LVB changes that 1256 * are just ignored by older versions), the version does not need to be 1257 * updated. 1258 */ 1259 static struct ocfs2_locking_protocol lproto = { 1260 .lp_max_version = { 1261 .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR, 1262 .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR, 1263 }, 1264 .lp_lock_ast = ocfs2_locking_ast, 1265 .lp_blocking_ast = ocfs2_blocking_ast, 1266 .lp_unlock_ast = ocfs2_unlock_ast, 1267 }; 1268 1269 void ocfs2_set_locking_protocol(void) 1270 { 1271 ocfs2_stack_glue_set_max_proto_version(&lproto.lp_max_version); 1272 } 1273 1274 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres, 1275 int convert) 1276 { 1277 unsigned long flags; 1278 1279 spin_lock_irqsave(&lockres->l_lock, flags); 1280 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 1281 lockres_clear_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING); 1282 if (convert) 1283 lockres->l_action = OCFS2_AST_INVALID; 1284 else 1285 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 1286 spin_unlock_irqrestore(&lockres->l_lock, flags); 1287 1288 wake_up(&lockres->l_event); 1289 } 1290 1291 /* Note: If we detect another process working on the lock (i.e., 1292 * OCFS2_LOCK_BUSY), we'll bail out returning 0. It's up to the caller 1293 * to do the right thing in that case. 1294 */ 1295 static int ocfs2_lock_create(struct ocfs2_super *osb, 1296 struct ocfs2_lock_res *lockres, 1297 int level, 1298 u32 dlm_flags) 1299 { 1300 int ret = 0; 1301 unsigned long flags; 1302 unsigned int gen; 1303 1304 mlog(0, "lock %s, level = %d, flags = %u\n", lockres->l_name, level, 1305 dlm_flags); 1306 1307 spin_lock_irqsave(&lockres->l_lock, flags); 1308 if ((lockres->l_flags & OCFS2_LOCK_ATTACHED) || 1309 (lockres->l_flags & OCFS2_LOCK_BUSY)) { 1310 spin_unlock_irqrestore(&lockres->l_lock, flags); 1311 goto bail; 1312 } 1313 1314 lockres->l_action = OCFS2_AST_ATTACH; 1315 lockres->l_requested = level; 1316 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 1317 gen = lockres_set_pending(lockres); 1318 spin_unlock_irqrestore(&lockres->l_lock, flags); 1319 1320 ret = ocfs2_dlm_lock(osb->cconn, 1321 level, 1322 &lockres->l_lksb, 1323 dlm_flags, 1324 lockres->l_name, 1325 OCFS2_LOCK_ID_MAX_LEN - 1); 1326 lockres_clear_pending(lockres, gen, osb); 1327 if (ret) { 1328 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 1329 ocfs2_recover_from_dlm_error(lockres, 1); 1330 } 1331 1332 mlog(0, "lock %s, return from ocfs2_dlm_lock\n", lockres->l_name); 1333 1334 bail: 1335 return ret; 1336 } 1337 1338 static inline int ocfs2_check_wait_flag(struct ocfs2_lock_res *lockres, 1339 int flag) 1340 { 1341 unsigned long flags; 1342 int ret; 1343 1344 spin_lock_irqsave(&lockres->l_lock, flags); 1345 ret = lockres->l_flags & flag; 1346 spin_unlock_irqrestore(&lockres->l_lock, flags); 1347 1348 return ret; 1349 } 1350 1351 static inline void ocfs2_wait_on_busy_lock(struct ocfs2_lock_res *lockres) 1352 1353 { 1354 wait_event(lockres->l_event, 1355 !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_BUSY)); 1356 } 1357 1358 static inline void ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res *lockres) 1359 1360 { 1361 wait_event(lockres->l_event, 1362 !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_REFRESHING)); 1363 } 1364 1365 /* predict what lock level we'll be dropping down to on behalf 1366 * of another node, and return true if the currently wanted 1367 * level will be compatible with it. */ 1368 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres, 1369 int wanted) 1370 { 1371 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); 1372 1373 return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking); 1374 } 1375 1376 static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw) 1377 { 1378 INIT_LIST_HEAD(&mw->mw_item); 1379 init_completion(&mw->mw_complete); 1380 ocfs2_init_start_time(mw); 1381 } 1382 1383 static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw) 1384 { 1385 wait_for_completion(&mw->mw_complete); 1386 /* Re-arm the completion in case we want to wait on it again */ 1387 reinit_completion(&mw->mw_complete); 1388 return mw->mw_status; 1389 } 1390 1391 static void lockres_add_mask_waiter(struct ocfs2_lock_res *lockres, 1392 struct ocfs2_mask_waiter *mw, 1393 unsigned long mask, 1394 unsigned long goal) 1395 { 1396 BUG_ON(!list_empty(&mw->mw_item)); 1397 1398 assert_spin_locked(&lockres->l_lock); 1399 1400 list_add_tail(&mw->mw_item, &lockres->l_mask_waiters); 1401 mw->mw_mask = mask; 1402 mw->mw_goal = goal; 1403 } 1404 1405 /* returns 0 if the mw that was removed was already satisfied, -EBUSY 1406 * if the mask still hadn't reached its goal */ 1407 static int __lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres, 1408 struct ocfs2_mask_waiter *mw) 1409 { 1410 int ret = 0; 1411 1412 assert_spin_locked(&lockres->l_lock); 1413 if (!list_empty(&mw->mw_item)) { 1414 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal) 1415 ret = -EBUSY; 1416 1417 list_del_init(&mw->mw_item); 1418 init_completion(&mw->mw_complete); 1419 } 1420 1421 return ret; 1422 } 1423 1424 static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres, 1425 struct ocfs2_mask_waiter *mw) 1426 { 1427 unsigned long flags; 1428 int ret = 0; 1429 1430 spin_lock_irqsave(&lockres->l_lock, flags); 1431 ret = __lockres_remove_mask_waiter(lockres, mw); 1432 spin_unlock_irqrestore(&lockres->l_lock, flags); 1433 1434 return ret; 1435 1436 } 1437 1438 static int ocfs2_wait_for_mask_interruptible(struct ocfs2_mask_waiter *mw, 1439 struct ocfs2_lock_res *lockres) 1440 { 1441 int ret; 1442 1443 ret = wait_for_completion_interruptible(&mw->mw_complete); 1444 if (ret) 1445 lockres_remove_mask_waiter(lockres, mw); 1446 else 1447 ret = mw->mw_status; 1448 /* Re-arm the completion in case we want to wait on it again */ 1449 reinit_completion(&mw->mw_complete); 1450 return ret; 1451 } 1452 1453 static int __ocfs2_cluster_lock(struct ocfs2_super *osb, 1454 struct ocfs2_lock_res *lockres, 1455 int level, 1456 u32 lkm_flags, 1457 int arg_flags, 1458 int l_subclass, 1459 unsigned long caller_ip) 1460 { 1461 struct ocfs2_mask_waiter mw; 1462 int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR); 1463 int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */ 1464 unsigned long flags; 1465 unsigned int gen; 1466 int noqueue_attempted = 0; 1467 int dlm_locked = 0; 1468 int kick_dc = 0; 1469 1470 if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED)) { 1471 mlog_errno(-EINVAL); 1472 return -EINVAL; 1473 } 1474 1475 ocfs2_init_mask_waiter(&mw); 1476 1477 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) 1478 lkm_flags |= DLM_LKF_VALBLK; 1479 1480 again: 1481 wait = 0; 1482 1483 spin_lock_irqsave(&lockres->l_lock, flags); 1484 1485 if (catch_signals && signal_pending(current)) { 1486 ret = -ERESTARTSYS; 1487 goto unlock; 1488 } 1489 1490 mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING, 1491 "Cluster lock called on freeing lockres %s! flags " 1492 "0x%lx\n", lockres->l_name, lockres->l_flags); 1493 1494 /* We only compare against the currently granted level 1495 * here. If the lock is blocked waiting on a downconvert, 1496 * we'll get caught below. */ 1497 if (lockres->l_flags & OCFS2_LOCK_BUSY && 1498 level > lockres->l_level) { 1499 /* is someone sitting in dlm_lock? If so, wait on 1500 * them. */ 1501 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1502 wait = 1; 1503 goto unlock; 1504 } 1505 1506 if (lockres->l_flags & OCFS2_LOCK_UPCONVERT_FINISHING) { 1507 /* 1508 * We've upconverted. If the lock now has a level we can 1509 * work with, we take it. If, however, the lock is not at the 1510 * required level, we go thru the full cycle. One way this could 1511 * happen is if a process requesting an upconvert to PR is 1512 * closely followed by another requesting upconvert to an EX. 1513 * If the process requesting EX lands here, we want it to 1514 * continue attempting to upconvert and let the process 1515 * requesting PR take the lock. 1516 * If multiple processes request upconvert to PR, the first one 1517 * here will take the lock. The others will have to go thru the 1518 * OCFS2_LOCK_BLOCKED check to ensure that there is no pending 1519 * downconvert request. 1520 */ 1521 if (level <= lockres->l_level) 1522 goto update_holders; 1523 } 1524 1525 if (lockres->l_flags & OCFS2_LOCK_BLOCKED && 1526 !ocfs2_may_continue_on_blocked_lock(lockres, level)) { 1527 /* is the lock is currently blocked on behalf of 1528 * another node */ 1529 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BLOCKED, 0); 1530 wait = 1; 1531 goto unlock; 1532 } 1533 1534 if (level > lockres->l_level) { 1535 if (noqueue_attempted > 0) { 1536 ret = -EAGAIN; 1537 goto unlock; 1538 } 1539 if (lkm_flags & DLM_LKF_NOQUEUE) 1540 noqueue_attempted = 1; 1541 1542 if (lockres->l_action != OCFS2_AST_INVALID) 1543 mlog(ML_ERROR, "lockres %s has action %u pending\n", 1544 lockres->l_name, lockres->l_action); 1545 1546 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 1547 lockres->l_action = OCFS2_AST_ATTACH; 1548 lkm_flags &= ~DLM_LKF_CONVERT; 1549 } else { 1550 lockres->l_action = OCFS2_AST_CONVERT; 1551 lkm_flags |= DLM_LKF_CONVERT; 1552 } 1553 1554 lockres->l_requested = level; 1555 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 1556 gen = lockres_set_pending(lockres); 1557 spin_unlock_irqrestore(&lockres->l_lock, flags); 1558 1559 BUG_ON(level == DLM_LOCK_IV); 1560 BUG_ON(level == DLM_LOCK_NL); 1561 1562 mlog(ML_BASTS, "lockres %s, convert from %d to %d\n", 1563 lockres->l_name, lockres->l_level, level); 1564 1565 /* call dlm_lock to upgrade lock now */ 1566 ret = ocfs2_dlm_lock(osb->cconn, 1567 level, 1568 &lockres->l_lksb, 1569 lkm_flags, 1570 lockres->l_name, 1571 OCFS2_LOCK_ID_MAX_LEN - 1); 1572 lockres_clear_pending(lockres, gen, osb); 1573 if (ret) { 1574 if (!(lkm_flags & DLM_LKF_NOQUEUE) || 1575 (ret != -EAGAIN)) { 1576 ocfs2_log_dlm_error("ocfs2_dlm_lock", 1577 ret, lockres); 1578 } 1579 ocfs2_recover_from_dlm_error(lockres, 1); 1580 goto out; 1581 } 1582 dlm_locked = 1; 1583 1584 mlog(0, "lock %s, successful return from ocfs2_dlm_lock\n", 1585 lockres->l_name); 1586 1587 /* At this point we've gone inside the dlm and need to 1588 * complete our work regardless. */ 1589 catch_signals = 0; 1590 1591 /* wait for busy to clear and carry on */ 1592 goto again; 1593 } 1594 1595 update_holders: 1596 /* Ok, if we get here then we're good to go. */ 1597 ocfs2_inc_holders(lockres, level); 1598 1599 ret = 0; 1600 unlock: 1601 lockres_clear_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING); 1602 1603 /* ocfs2_unblock_lock reques on seeing OCFS2_LOCK_UPCONVERT_FINISHING */ 1604 kick_dc = (lockres->l_flags & OCFS2_LOCK_BLOCKED); 1605 1606 spin_unlock_irqrestore(&lockres->l_lock, flags); 1607 if (kick_dc) 1608 ocfs2_wake_downconvert_thread(osb); 1609 out: 1610 /* 1611 * This is helping work around a lock inversion between the page lock 1612 * and dlm locks. One path holds the page lock while calling aops 1613 * which block acquiring dlm locks. The voting thread holds dlm 1614 * locks while acquiring page locks while down converting data locks. 1615 * This block is helping an aop path notice the inversion and back 1616 * off to unlock its page lock before trying the dlm lock again. 1617 */ 1618 if (wait && arg_flags & OCFS2_LOCK_NONBLOCK && 1619 mw.mw_mask & (OCFS2_LOCK_BUSY|OCFS2_LOCK_BLOCKED)) { 1620 wait = 0; 1621 spin_lock_irqsave(&lockres->l_lock, flags); 1622 if (__lockres_remove_mask_waiter(lockres, &mw)) { 1623 if (dlm_locked) 1624 lockres_or_flags(lockres, 1625 OCFS2_LOCK_NONBLOCK_FINISHED); 1626 spin_unlock_irqrestore(&lockres->l_lock, flags); 1627 ret = -EAGAIN; 1628 } else { 1629 spin_unlock_irqrestore(&lockres->l_lock, flags); 1630 goto again; 1631 } 1632 } 1633 if (wait) { 1634 ret = ocfs2_wait_for_mask(&mw); 1635 if (ret == 0) 1636 goto again; 1637 mlog_errno(ret); 1638 } 1639 ocfs2_update_lock_stats(lockres, level, &mw, ret); 1640 1641 #ifdef CONFIG_DEBUG_LOCK_ALLOC 1642 if (!ret && lockres->l_lockdep_map.key != NULL) { 1643 if (level == DLM_LOCK_PR) 1644 rwsem_acquire_read(&lockres->l_lockdep_map, l_subclass, 1645 !!(arg_flags & OCFS2_META_LOCK_NOQUEUE), 1646 caller_ip); 1647 else 1648 rwsem_acquire(&lockres->l_lockdep_map, l_subclass, 1649 !!(arg_flags & OCFS2_META_LOCK_NOQUEUE), 1650 caller_ip); 1651 } 1652 #endif 1653 return ret; 1654 } 1655 1656 static inline int ocfs2_cluster_lock(struct ocfs2_super *osb, 1657 struct ocfs2_lock_res *lockres, 1658 int level, 1659 u32 lkm_flags, 1660 int arg_flags) 1661 { 1662 return __ocfs2_cluster_lock(osb, lockres, level, lkm_flags, arg_flags, 1663 0, _RET_IP_); 1664 } 1665 1666 1667 static void __ocfs2_cluster_unlock(struct ocfs2_super *osb, 1668 struct ocfs2_lock_res *lockres, 1669 int level, 1670 unsigned long caller_ip) 1671 { 1672 unsigned long flags; 1673 1674 spin_lock_irqsave(&lockres->l_lock, flags); 1675 ocfs2_dec_holders(lockres, level); 1676 ocfs2_downconvert_on_unlock(osb, lockres); 1677 spin_unlock_irqrestore(&lockres->l_lock, flags); 1678 #ifdef CONFIG_DEBUG_LOCK_ALLOC 1679 if (lockres->l_lockdep_map.key != NULL) 1680 rwsem_release(&lockres->l_lockdep_map, 1, caller_ip); 1681 #endif 1682 } 1683 1684 static int ocfs2_create_new_lock(struct ocfs2_super *osb, 1685 struct ocfs2_lock_res *lockres, 1686 int ex, 1687 int local) 1688 { 1689 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 1690 unsigned long flags; 1691 u32 lkm_flags = local ? DLM_LKF_LOCAL : 0; 1692 1693 spin_lock_irqsave(&lockres->l_lock, flags); 1694 BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED); 1695 lockres_or_flags(lockres, OCFS2_LOCK_LOCAL); 1696 spin_unlock_irqrestore(&lockres->l_lock, flags); 1697 1698 return ocfs2_lock_create(osb, lockres, level, lkm_flags); 1699 } 1700 1701 /* Grants us an EX lock on the data and metadata resources, skipping 1702 * the normal cluster directory lookup. Use this ONLY on newly created 1703 * inodes which other nodes can't possibly see, and which haven't been 1704 * hashed in the inode hash yet. This can give us a good performance 1705 * increase as it'll skip the network broadcast normally associated 1706 * with creating a new lock resource. */ 1707 int ocfs2_create_new_inode_locks(struct inode *inode) 1708 { 1709 int ret; 1710 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1711 1712 BUG_ON(!ocfs2_inode_is_new(inode)); 1713 1714 mlog(0, "Inode %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno); 1715 1716 /* NOTE: That we don't increment any of the holder counts, nor 1717 * do we add anything to a journal handle. Since this is 1718 * supposed to be a new inode which the cluster doesn't know 1719 * about yet, there is no need to. As far as the LVB handling 1720 * is concerned, this is basically like acquiring an EX lock 1721 * on a resource which has an invalid one -- we'll set it 1722 * valid when we release the EX. */ 1723 1724 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1, 1); 1725 if (ret) { 1726 mlog_errno(ret); 1727 goto bail; 1728 } 1729 1730 /* 1731 * We don't want to use DLM_LKF_LOCAL on a meta data lock as they 1732 * don't use a generation in their lock names. 1733 */ 1734 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_inode_lockres, 1, 0); 1735 if (ret) { 1736 mlog_errno(ret); 1737 goto bail; 1738 } 1739 1740 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_open_lockres, 0, 0); 1741 if (ret) 1742 mlog_errno(ret); 1743 1744 bail: 1745 return ret; 1746 } 1747 1748 int ocfs2_rw_lock(struct inode *inode, int write) 1749 { 1750 int status, level; 1751 struct ocfs2_lock_res *lockres; 1752 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1753 1754 mlog(0, "inode %llu take %s RW lock\n", 1755 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1756 write ? "EXMODE" : "PRMODE"); 1757 1758 if (ocfs2_mount_local(osb)) 1759 return 0; 1760 1761 lockres = &OCFS2_I(inode)->ip_rw_lockres; 1762 1763 level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1764 1765 status = ocfs2_cluster_lock(osb, lockres, level, 0, 0); 1766 if (status < 0) 1767 mlog_errno(status); 1768 1769 return status; 1770 } 1771 1772 int ocfs2_try_rw_lock(struct inode *inode, int write) 1773 { 1774 int status, level; 1775 struct ocfs2_lock_res *lockres; 1776 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1777 1778 mlog(0, "inode %llu try to take %s RW lock\n", 1779 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1780 write ? "EXMODE" : "PRMODE"); 1781 1782 if (ocfs2_mount_local(osb)) 1783 return 0; 1784 1785 lockres = &OCFS2_I(inode)->ip_rw_lockres; 1786 1787 level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1788 1789 status = ocfs2_cluster_lock(osb, lockres, level, DLM_LKF_NOQUEUE, 0); 1790 return status; 1791 } 1792 1793 void ocfs2_rw_unlock(struct inode *inode, int write) 1794 { 1795 int level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1796 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres; 1797 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1798 1799 mlog(0, "inode %llu drop %s RW lock\n", 1800 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1801 write ? "EXMODE" : "PRMODE"); 1802 1803 if (!ocfs2_mount_local(osb)) 1804 ocfs2_cluster_unlock(osb, lockres, level); 1805 } 1806 1807 /* 1808 * ocfs2_open_lock always get PR mode lock. 1809 */ 1810 int ocfs2_open_lock(struct inode *inode) 1811 { 1812 int status = 0; 1813 struct ocfs2_lock_res *lockres; 1814 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1815 1816 mlog(0, "inode %llu take PRMODE open lock\n", 1817 (unsigned long long)OCFS2_I(inode)->ip_blkno); 1818 1819 if (ocfs2_is_hard_readonly(osb) || ocfs2_mount_local(osb)) 1820 goto out; 1821 1822 lockres = &OCFS2_I(inode)->ip_open_lockres; 1823 1824 status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_PR, 0, 0); 1825 if (status < 0) 1826 mlog_errno(status); 1827 1828 out: 1829 return status; 1830 } 1831 1832 int ocfs2_try_open_lock(struct inode *inode, int write) 1833 { 1834 int status = 0, level; 1835 struct ocfs2_lock_res *lockres; 1836 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1837 1838 mlog(0, "inode %llu try to take %s open lock\n", 1839 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1840 write ? "EXMODE" : "PRMODE"); 1841 1842 if (ocfs2_is_hard_readonly(osb)) { 1843 if (write) 1844 status = -EROFS; 1845 goto out; 1846 } 1847 1848 if (ocfs2_mount_local(osb)) 1849 goto out; 1850 1851 lockres = &OCFS2_I(inode)->ip_open_lockres; 1852 1853 level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1854 1855 /* 1856 * The file system may already holding a PRMODE/EXMODE open lock. 1857 * Since we pass DLM_LKF_NOQUEUE, the request won't block waiting on 1858 * other nodes and the -EAGAIN will indicate to the caller that 1859 * this inode is still in use. 1860 */ 1861 status = ocfs2_cluster_lock(osb, lockres, level, DLM_LKF_NOQUEUE, 0); 1862 1863 out: 1864 return status; 1865 } 1866 1867 /* 1868 * ocfs2_open_unlock unlock PR and EX mode open locks. 1869 */ 1870 void ocfs2_open_unlock(struct inode *inode) 1871 { 1872 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_open_lockres; 1873 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1874 1875 mlog(0, "inode %llu drop open lock\n", 1876 (unsigned long long)OCFS2_I(inode)->ip_blkno); 1877 1878 if (ocfs2_mount_local(osb)) 1879 goto out; 1880 1881 if(lockres->l_ro_holders) 1882 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_PR); 1883 if(lockres->l_ex_holders) 1884 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX); 1885 1886 out: 1887 return; 1888 } 1889 1890 static int ocfs2_flock_handle_signal(struct ocfs2_lock_res *lockres, 1891 int level) 1892 { 1893 int ret; 1894 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 1895 unsigned long flags; 1896 struct ocfs2_mask_waiter mw; 1897 1898 ocfs2_init_mask_waiter(&mw); 1899 1900 retry_cancel: 1901 spin_lock_irqsave(&lockres->l_lock, flags); 1902 if (lockres->l_flags & OCFS2_LOCK_BUSY) { 1903 ret = ocfs2_prepare_cancel_convert(osb, lockres); 1904 if (ret) { 1905 spin_unlock_irqrestore(&lockres->l_lock, flags); 1906 ret = ocfs2_cancel_convert(osb, lockres); 1907 if (ret < 0) { 1908 mlog_errno(ret); 1909 goto out; 1910 } 1911 goto retry_cancel; 1912 } 1913 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1914 spin_unlock_irqrestore(&lockres->l_lock, flags); 1915 1916 ocfs2_wait_for_mask(&mw); 1917 goto retry_cancel; 1918 } 1919 1920 ret = -ERESTARTSYS; 1921 /* 1922 * We may still have gotten the lock, in which case there's no 1923 * point to restarting the syscall. 1924 */ 1925 if (lockres->l_level == level) 1926 ret = 0; 1927 1928 mlog(0, "Cancel returning %d. flags: 0x%lx, level: %d, act: %d\n", ret, 1929 lockres->l_flags, lockres->l_level, lockres->l_action); 1930 1931 spin_unlock_irqrestore(&lockres->l_lock, flags); 1932 1933 out: 1934 return ret; 1935 } 1936 1937 /* 1938 * ocfs2_file_lock() and ocfs2_file_unlock() map to a single pair of 1939 * flock() calls. The locking approach this requires is sufficiently 1940 * different from all other cluster lock types that we implement a 1941 * separate path to the "low-level" dlm calls. In particular: 1942 * 1943 * - No optimization of lock levels is done - we take at exactly 1944 * what's been requested. 1945 * 1946 * - No lock caching is employed. We immediately downconvert to 1947 * no-lock at unlock time. This also means flock locks never go on 1948 * the blocking list). 1949 * 1950 * - Since userspace can trivially deadlock itself with flock, we make 1951 * sure to allow cancellation of a misbehaving applications flock() 1952 * request. 1953 * 1954 * - Access to any flock lockres doesn't require concurrency, so we 1955 * can simplify the code by requiring the caller to guarantee 1956 * serialization of dlmglue flock calls. 1957 */ 1958 int ocfs2_file_lock(struct file *file, int ex, int trylock) 1959 { 1960 int ret, level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 1961 unsigned int lkm_flags = trylock ? DLM_LKF_NOQUEUE : 0; 1962 unsigned long flags; 1963 struct ocfs2_file_private *fp = file->private_data; 1964 struct ocfs2_lock_res *lockres = &fp->fp_flock; 1965 struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb); 1966 struct ocfs2_mask_waiter mw; 1967 1968 ocfs2_init_mask_waiter(&mw); 1969 1970 if ((lockres->l_flags & OCFS2_LOCK_BUSY) || 1971 (lockres->l_level > DLM_LOCK_NL)) { 1972 mlog(ML_ERROR, 1973 "File lock \"%s\" has busy or locked state: flags: 0x%lx, " 1974 "level: %u\n", lockres->l_name, lockres->l_flags, 1975 lockres->l_level); 1976 return -EINVAL; 1977 } 1978 1979 spin_lock_irqsave(&lockres->l_lock, flags); 1980 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 1981 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1982 spin_unlock_irqrestore(&lockres->l_lock, flags); 1983 1984 /* 1985 * Get the lock at NLMODE to start - that way we 1986 * can cancel the upconvert request if need be. 1987 */ 1988 ret = ocfs2_lock_create(osb, lockres, DLM_LOCK_NL, 0); 1989 if (ret < 0) { 1990 mlog_errno(ret); 1991 goto out; 1992 } 1993 1994 ret = ocfs2_wait_for_mask(&mw); 1995 if (ret) { 1996 mlog_errno(ret); 1997 goto out; 1998 } 1999 spin_lock_irqsave(&lockres->l_lock, flags); 2000 } 2001 2002 lockres->l_action = OCFS2_AST_CONVERT; 2003 lkm_flags |= DLM_LKF_CONVERT; 2004 lockres->l_requested = level; 2005 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 2006 2007 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 2008 spin_unlock_irqrestore(&lockres->l_lock, flags); 2009 2010 ret = ocfs2_dlm_lock(osb->cconn, level, &lockres->l_lksb, lkm_flags, 2011 lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1); 2012 if (ret) { 2013 if (!trylock || (ret != -EAGAIN)) { 2014 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 2015 ret = -EINVAL; 2016 } 2017 2018 ocfs2_recover_from_dlm_error(lockres, 1); 2019 lockres_remove_mask_waiter(lockres, &mw); 2020 goto out; 2021 } 2022 2023 ret = ocfs2_wait_for_mask_interruptible(&mw, lockres); 2024 if (ret == -ERESTARTSYS) { 2025 /* 2026 * Userspace can cause deadlock itself with 2027 * flock(). Current behavior locally is to allow the 2028 * deadlock, but abort the system call if a signal is 2029 * received. We follow this example, otherwise a 2030 * poorly written program could sit in kernel until 2031 * reboot. 2032 * 2033 * Handling this is a bit more complicated for Ocfs2 2034 * though. We can't exit this function with an 2035 * outstanding lock request, so a cancel convert is 2036 * required. We intentionally overwrite 'ret' - if the 2037 * cancel fails and the lock was granted, it's easier 2038 * to just bubble success back up to the user. 2039 */ 2040 ret = ocfs2_flock_handle_signal(lockres, level); 2041 } else if (!ret && (level > lockres->l_level)) { 2042 /* Trylock failed asynchronously */ 2043 BUG_ON(!trylock); 2044 ret = -EAGAIN; 2045 } 2046 2047 out: 2048 2049 mlog(0, "Lock: \"%s\" ex: %d, trylock: %d, returns: %d\n", 2050 lockres->l_name, ex, trylock, ret); 2051 return ret; 2052 } 2053 2054 void ocfs2_file_unlock(struct file *file) 2055 { 2056 int ret; 2057 unsigned int gen; 2058 unsigned long flags; 2059 struct ocfs2_file_private *fp = file->private_data; 2060 struct ocfs2_lock_res *lockres = &fp->fp_flock; 2061 struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb); 2062 struct ocfs2_mask_waiter mw; 2063 2064 ocfs2_init_mask_waiter(&mw); 2065 2066 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) 2067 return; 2068 2069 if (lockres->l_level == DLM_LOCK_NL) 2070 return; 2071 2072 mlog(0, "Unlock: \"%s\" flags: 0x%lx, level: %d, act: %d\n", 2073 lockres->l_name, lockres->l_flags, lockres->l_level, 2074 lockres->l_action); 2075 2076 spin_lock_irqsave(&lockres->l_lock, flags); 2077 /* 2078 * Fake a blocking ast for the downconvert code. 2079 */ 2080 lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED); 2081 lockres->l_blocking = DLM_LOCK_EX; 2082 2083 gen = ocfs2_prepare_downconvert(lockres, DLM_LOCK_NL); 2084 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 2085 spin_unlock_irqrestore(&lockres->l_lock, flags); 2086 2087 ret = ocfs2_downconvert_lock(osb, lockres, DLM_LOCK_NL, 0, gen); 2088 if (ret) { 2089 mlog_errno(ret); 2090 return; 2091 } 2092 2093 ret = ocfs2_wait_for_mask(&mw); 2094 if (ret) 2095 mlog_errno(ret); 2096 } 2097 2098 static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb, 2099 struct ocfs2_lock_res *lockres) 2100 { 2101 int kick = 0; 2102 2103 /* If we know that another node is waiting on our lock, kick 2104 * the downconvert thread * pre-emptively when we reach a release 2105 * condition. */ 2106 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) { 2107 switch(lockres->l_blocking) { 2108 case DLM_LOCK_EX: 2109 if (!lockres->l_ex_holders && !lockres->l_ro_holders) 2110 kick = 1; 2111 break; 2112 case DLM_LOCK_PR: 2113 if (!lockres->l_ex_holders) 2114 kick = 1; 2115 break; 2116 default: 2117 BUG(); 2118 } 2119 } 2120 2121 if (kick) 2122 ocfs2_wake_downconvert_thread(osb); 2123 } 2124 2125 #define OCFS2_SEC_BITS 34 2126 #define OCFS2_SEC_SHIFT (64 - 34) 2127 #define OCFS2_NSEC_MASK ((1ULL << OCFS2_SEC_SHIFT) - 1) 2128 2129 /* LVB only has room for 64 bits of time here so we pack it for 2130 * now. */ 2131 static u64 ocfs2_pack_timespec(struct timespec64 *spec) 2132 { 2133 u64 res; 2134 u64 sec = clamp_t(time64_t, spec->tv_sec, 0, 0x3ffffffffull); 2135 u32 nsec = spec->tv_nsec; 2136 2137 res = (sec << OCFS2_SEC_SHIFT) | (nsec & OCFS2_NSEC_MASK); 2138 2139 return res; 2140 } 2141 2142 /* Call this with the lockres locked. I am reasonably sure we don't 2143 * need ip_lock in this function as anyone who would be changing those 2144 * values is supposed to be blocked in ocfs2_inode_lock right now. */ 2145 static void __ocfs2_stuff_meta_lvb(struct inode *inode) 2146 { 2147 struct ocfs2_inode_info *oi = OCFS2_I(inode); 2148 struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; 2149 struct ocfs2_meta_lvb *lvb; 2150 2151 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2152 2153 /* 2154 * Invalidate the LVB of a deleted inode - this way other 2155 * nodes are forced to go to disk and discover the new inode 2156 * status. 2157 */ 2158 if (oi->ip_flags & OCFS2_INODE_DELETED) { 2159 lvb->lvb_version = 0; 2160 goto out; 2161 } 2162 2163 lvb->lvb_version = OCFS2_LVB_VERSION; 2164 lvb->lvb_isize = cpu_to_be64(i_size_read(inode)); 2165 lvb->lvb_iclusters = cpu_to_be32(oi->ip_clusters); 2166 lvb->lvb_iuid = cpu_to_be32(i_uid_read(inode)); 2167 lvb->lvb_igid = cpu_to_be32(i_gid_read(inode)); 2168 lvb->lvb_imode = cpu_to_be16(inode->i_mode); 2169 lvb->lvb_inlink = cpu_to_be16(inode->i_nlink); 2170 lvb->lvb_iatime_packed = 2171 cpu_to_be64(ocfs2_pack_timespec(&inode->i_atime)); 2172 lvb->lvb_ictime_packed = 2173 cpu_to_be64(ocfs2_pack_timespec(&inode->i_ctime)); 2174 lvb->lvb_imtime_packed = 2175 cpu_to_be64(ocfs2_pack_timespec(&inode->i_mtime)); 2176 lvb->lvb_iattr = cpu_to_be32(oi->ip_attr); 2177 lvb->lvb_idynfeatures = cpu_to_be16(oi->ip_dyn_features); 2178 lvb->lvb_igeneration = cpu_to_be32(inode->i_generation); 2179 2180 out: 2181 mlog_meta_lvb(0, lockres); 2182 } 2183 2184 static void ocfs2_unpack_timespec(struct timespec64 *spec, 2185 u64 packed_time) 2186 { 2187 spec->tv_sec = packed_time >> OCFS2_SEC_SHIFT; 2188 spec->tv_nsec = packed_time & OCFS2_NSEC_MASK; 2189 } 2190 2191 static void ocfs2_refresh_inode_from_lvb(struct inode *inode) 2192 { 2193 struct ocfs2_inode_info *oi = OCFS2_I(inode); 2194 struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; 2195 struct ocfs2_meta_lvb *lvb; 2196 2197 mlog_meta_lvb(0, lockres); 2198 2199 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2200 2201 /* We're safe here without the lockres lock... */ 2202 spin_lock(&oi->ip_lock); 2203 oi->ip_clusters = be32_to_cpu(lvb->lvb_iclusters); 2204 i_size_write(inode, be64_to_cpu(lvb->lvb_isize)); 2205 2206 oi->ip_attr = be32_to_cpu(lvb->lvb_iattr); 2207 oi->ip_dyn_features = be16_to_cpu(lvb->lvb_idynfeatures); 2208 ocfs2_set_inode_flags(inode); 2209 2210 /* fast-symlinks are a special case */ 2211 if (S_ISLNK(inode->i_mode) && !oi->ip_clusters) 2212 inode->i_blocks = 0; 2213 else 2214 inode->i_blocks = ocfs2_inode_sector_count(inode); 2215 2216 i_uid_write(inode, be32_to_cpu(lvb->lvb_iuid)); 2217 i_gid_write(inode, be32_to_cpu(lvb->lvb_igid)); 2218 inode->i_mode = be16_to_cpu(lvb->lvb_imode); 2219 set_nlink(inode, be16_to_cpu(lvb->lvb_inlink)); 2220 ocfs2_unpack_timespec(&inode->i_atime, 2221 be64_to_cpu(lvb->lvb_iatime_packed)); 2222 ocfs2_unpack_timespec(&inode->i_mtime, 2223 be64_to_cpu(lvb->lvb_imtime_packed)); 2224 ocfs2_unpack_timespec(&inode->i_ctime, 2225 be64_to_cpu(lvb->lvb_ictime_packed)); 2226 spin_unlock(&oi->ip_lock); 2227 } 2228 2229 static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode, 2230 struct ocfs2_lock_res *lockres) 2231 { 2232 struct ocfs2_meta_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2233 2234 if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) 2235 && lvb->lvb_version == OCFS2_LVB_VERSION 2236 && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation) 2237 return 1; 2238 return 0; 2239 } 2240 2241 /* Determine whether a lock resource needs to be refreshed, and 2242 * arbitrate who gets to refresh it. 2243 * 2244 * 0 means no refresh needed. 2245 * 2246 * > 0 means you need to refresh this and you MUST call 2247 * ocfs2_complete_lock_res_refresh afterwards. */ 2248 static int ocfs2_should_refresh_lock_res(struct ocfs2_lock_res *lockres) 2249 { 2250 unsigned long flags; 2251 int status = 0; 2252 2253 refresh_check: 2254 spin_lock_irqsave(&lockres->l_lock, flags); 2255 if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) { 2256 spin_unlock_irqrestore(&lockres->l_lock, flags); 2257 goto bail; 2258 } 2259 2260 if (lockres->l_flags & OCFS2_LOCK_REFRESHING) { 2261 spin_unlock_irqrestore(&lockres->l_lock, flags); 2262 2263 ocfs2_wait_on_refreshing_lock(lockres); 2264 goto refresh_check; 2265 } 2266 2267 /* Ok, I'll be the one to refresh this lock. */ 2268 lockres_or_flags(lockres, OCFS2_LOCK_REFRESHING); 2269 spin_unlock_irqrestore(&lockres->l_lock, flags); 2270 2271 status = 1; 2272 bail: 2273 mlog(0, "status %d\n", status); 2274 return status; 2275 } 2276 2277 /* If status is non zero, I'll mark it as not being in refresh 2278 * anymroe, but i won't clear the needs refresh flag. */ 2279 static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockres, 2280 int status) 2281 { 2282 unsigned long flags; 2283 2284 spin_lock_irqsave(&lockres->l_lock, flags); 2285 lockres_clear_flags(lockres, OCFS2_LOCK_REFRESHING); 2286 if (!status) 2287 lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 2288 spin_unlock_irqrestore(&lockres->l_lock, flags); 2289 2290 wake_up(&lockres->l_event); 2291 } 2292 2293 /* may or may not return a bh if it went to disk. */ 2294 static int ocfs2_inode_lock_update(struct inode *inode, 2295 struct buffer_head **bh) 2296 { 2297 int status = 0; 2298 struct ocfs2_inode_info *oi = OCFS2_I(inode); 2299 struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; 2300 struct ocfs2_dinode *fe; 2301 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 2302 2303 if (ocfs2_mount_local(osb)) 2304 goto bail; 2305 2306 spin_lock(&oi->ip_lock); 2307 if (oi->ip_flags & OCFS2_INODE_DELETED) { 2308 mlog(0, "Orphaned inode %llu was deleted while we " 2309 "were waiting on a lock. ip_flags = 0x%x\n", 2310 (unsigned long long)oi->ip_blkno, oi->ip_flags); 2311 spin_unlock(&oi->ip_lock); 2312 status = -ENOENT; 2313 goto bail; 2314 } 2315 spin_unlock(&oi->ip_lock); 2316 2317 if (!ocfs2_should_refresh_lock_res(lockres)) 2318 goto bail; 2319 2320 /* This will discard any caching information we might have had 2321 * for the inode metadata. */ 2322 ocfs2_metadata_cache_purge(INODE_CACHE(inode)); 2323 2324 ocfs2_extent_map_trunc(inode, 0); 2325 2326 if (ocfs2_meta_lvb_is_trustable(inode, lockres)) { 2327 mlog(0, "Trusting LVB on inode %llu\n", 2328 (unsigned long long)oi->ip_blkno); 2329 ocfs2_refresh_inode_from_lvb(inode); 2330 } else { 2331 /* Boo, we have to go to disk. */ 2332 /* read bh, cast, ocfs2_refresh_inode */ 2333 status = ocfs2_read_inode_block(inode, bh); 2334 if (status < 0) { 2335 mlog_errno(status); 2336 goto bail_refresh; 2337 } 2338 fe = (struct ocfs2_dinode *) (*bh)->b_data; 2339 2340 /* This is a good chance to make sure we're not 2341 * locking an invalid object. ocfs2_read_inode_block() 2342 * already checked that the inode block is sane. 2343 * 2344 * We bug on a stale inode here because we checked 2345 * above whether it was wiped from disk. The wiping 2346 * node provides a guarantee that we receive that 2347 * message and can mark the inode before dropping any 2348 * locks associated with it. */ 2349 mlog_bug_on_msg(inode->i_generation != 2350 le32_to_cpu(fe->i_generation), 2351 "Invalid dinode %llu disk generation: %u " 2352 "inode->i_generation: %u\n", 2353 (unsigned long long)oi->ip_blkno, 2354 le32_to_cpu(fe->i_generation), 2355 inode->i_generation); 2356 mlog_bug_on_msg(le64_to_cpu(fe->i_dtime) || 2357 !(fe->i_flags & cpu_to_le32(OCFS2_VALID_FL)), 2358 "Stale dinode %llu dtime: %llu flags: 0x%x\n", 2359 (unsigned long long)oi->ip_blkno, 2360 (unsigned long long)le64_to_cpu(fe->i_dtime), 2361 le32_to_cpu(fe->i_flags)); 2362 2363 ocfs2_refresh_inode(inode, fe); 2364 ocfs2_track_lock_refresh(lockres); 2365 } 2366 2367 status = 0; 2368 bail_refresh: 2369 ocfs2_complete_lock_res_refresh(lockres, status); 2370 bail: 2371 return status; 2372 } 2373 2374 static int ocfs2_assign_bh(struct inode *inode, 2375 struct buffer_head **ret_bh, 2376 struct buffer_head *passed_bh) 2377 { 2378 int status; 2379 2380 if (passed_bh) { 2381 /* Ok, the update went to disk for us, use the 2382 * returned bh. */ 2383 *ret_bh = passed_bh; 2384 get_bh(*ret_bh); 2385 2386 return 0; 2387 } 2388 2389 status = ocfs2_read_inode_block(inode, ret_bh); 2390 if (status < 0) 2391 mlog_errno(status); 2392 2393 return status; 2394 } 2395 2396 /* 2397 * returns < 0 error if the callback will never be called, otherwise 2398 * the result of the lock will be communicated via the callback. 2399 */ 2400 int ocfs2_inode_lock_full_nested(struct inode *inode, 2401 struct buffer_head **ret_bh, 2402 int ex, 2403 int arg_flags, 2404 int subclass) 2405 { 2406 int status, level, acquired; 2407 u32 dlm_flags; 2408 struct ocfs2_lock_res *lockres = NULL; 2409 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 2410 struct buffer_head *local_bh = NULL; 2411 2412 mlog(0, "inode %llu, take %s META lock\n", 2413 (unsigned long long)OCFS2_I(inode)->ip_blkno, 2414 ex ? "EXMODE" : "PRMODE"); 2415 2416 status = 0; 2417 acquired = 0; 2418 /* We'll allow faking a readonly metadata lock for 2419 * rodevices. */ 2420 if (ocfs2_is_hard_readonly(osb)) { 2421 if (ex) 2422 status = -EROFS; 2423 goto getbh; 2424 } 2425 2426 if ((arg_flags & OCFS2_META_LOCK_GETBH) || 2427 ocfs2_mount_local(osb)) 2428 goto update; 2429 2430 if (!(arg_flags & OCFS2_META_LOCK_RECOVERY)) 2431 ocfs2_wait_for_recovery(osb); 2432 2433 lockres = &OCFS2_I(inode)->ip_inode_lockres; 2434 level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2435 dlm_flags = 0; 2436 if (arg_flags & OCFS2_META_LOCK_NOQUEUE) 2437 dlm_flags |= DLM_LKF_NOQUEUE; 2438 2439 status = __ocfs2_cluster_lock(osb, lockres, level, dlm_flags, 2440 arg_flags, subclass, _RET_IP_); 2441 if (status < 0) { 2442 if (status != -EAGAIN) 2443 mlog_errno(status); 2444 goto bail; 2445 } 2446 2447 /* Notify the error cleanup path to drop the cluster lock. */ 2448 acquired = 1; 2449 2450 /* We wait twice because a node may have died while we were in 2451 * the lower dlm layers. The second time though, we've 2452 * committed to owning this lock so we don't allow signals to 2453 * abort the operation. */ 2454 if (!(arg_flags & OCFS2_META_LOCK_RECOVERY)) 2455 ocfs2_wait_for_recovery(osb); 2456 2457 update: 2458 /* 2459 * We only see this flag if we're being called from 2460 * ocfs2_read_locked_inode(). It means we're locking an inode 2461 * which hasn't been populated yet, so clear the refresh flag 2462 * and let the caller handle it. 2463 */ 2464 if (inode->i_state & I_NEW) { 2465 status = 0; 2466 if (lockres) 2467 ocfs2_complete_lock_res_refresh(lockres, 0); 2468 goto bail; 2469 } 2470 2471 /* This is fun. The caller may want a bh back, or it may 2472 * not. ocfs2_inode_lock_update definitely wants one in, but 2473 * may or may not read one, depending on what's in the 2474 * LVB. The result of all of this is that we've *only* gone to 2475 * disk if we have to, so the complexity is worthwhile. */ 2476 status = ocfs2_inode_lock_update(inode, &local_bh); 2477 if (status < 0) { 2478 if (status != -ENOENT) 2479 mlog_errno(status); 2480 goto bail; 2481 } 2482 getbh: 2483 if (ret_bh) { 2484 status = ocfs2_assign_bh(inode, ret_bh, local_bh); 2485 if (status < 0) { 2486 mlog_errno(status); 2487 goto bail; 2488 } 2489 } 2490 2491 bail: 2492 if (status < 0) { 2493 if (ret_bh && (*ret_bh)) { 2494 brelse(*ret_bh); 2495 *ret_bh = NULL; 2496 } 2497 if (acquired) 2498 ocfs2_inode_unlock(inode, ex); 2499 } 2500 2501 if (local_bh) 2502 brelse(local_bh); 2503 2504 return status; 2505 } 2506 2507 /* 2508 * This is working around a lock inversion between tasks acquiring DLM 2509 * locks while holding a page lock and the downconvert thread which 2510 * blocks dlm lock acquiry while acquiring page locks. 2511 * 2512 * ** These _with_page variantes are only intended to be called from aop 2513 * methods that hold page locks and return a very specific *positive* error 2514 * code that aop methods pass up to the VFS -- test for errors with != 0. ** 2515 * 2516 * The DLM is called such that it returns -EAGAIN if it would have 2517 * blocked waiting for the downconvert thread. In that case we unlock 2518 * our page so the downconvert thread can make progress. Once we've 2519 * done this we have to return AOP_TRUNCATED_PAGE so the aop method 2520 * that called us can bubble that back up into the VFS who will then 2521 * immediately retry the aop call. 2522 */ 2523 int ocfs2_inode_lock_with_page(struct inode *inode, 2524 struct buffer_head **ret_bh, 2525 int ex, 2526 struct page *page) 2527 { 2528 int ret; 2529 2530 ret = ocfs2_inode_lock_full(inode, ret_bh, ex, OCFS2_LOCK_NONBLOCK); 2531 if (ret == -EAGAIN) { 2532 unlock_page(page); 2533 /* 2534 * If we can't get inode lock immediately, we should not return 2535 * directly here, since this will lead to a softlockup problem. 2536 * The method is to get a blocking lock and immediately unlock 2537 * before returning, this can avoid CPU resource waste due to 2538 * lots of retries, and benefits fairness in getting lock. 2539 */ 2540 if (ocfs2_inode_lock(inode, ret_bh, ex) == 0) 2541 ocfs2_inode_unlock(inode, ex); 2542 ret = AOP_TRUNCATED_PAGE; 2543 } 2544 2545 return ret; 2546 } 2547 2548 int ocfs2_inode_lock_atime(struct inode *inode, 2549 struct vfsmount *vfsmnt, 2550 int *level, int wait) 2551 { 2552 int ret; 2553 2554 if (wait) 2555 ret = ocfs2_inode_lock(inode, NULL, 0); 2556 else 2557 ret = ocfs2_try_inode_lock(inode, NULL, 0); 2558 2559 if (ret < 0) { 2560 if (ret != -EAGAIN) 2561 mlog_errno(ret); 2562 return ret; 2563 } 2564 2565 /* 2566 * If we should update atime, we will get EX lock, 2567 * otherwise we just get PR lock. 2568 */ 2569 if (ocfs2_should_update_atime(inode, vfsmnt)) { 2570 struct buffer_head *bh = NULL; 2571 2572 ocfs2_inode_unlock(inode, 0); 2573 if (wait) 2574 ret = ocfs2_inode_lock(inode, &bh, 1); 2575 else 2576 ret = ocfs2_try_inode_lock(inode, &bh, 1); 2577 2578 if (ret < 0) { 2579 if (ret != -EAGAIN) 2580 mlog_errno(ret); 2581 return ret; 2582 } 2583 *level = 1; 2584 if (ocfs2_should_update_atime(inode, vfsmnt)) 2585 ocfs2_update_inode_atime(inode, bh); 2586 if (bh) 2587 brelse(bh); 2588 } else 2589 *level = 0; 2590 2591 return ret; 2592 } 2593 2594 void ocfs2_inode_unlock(struct inode *inode, 2595 int ex) 2596 { 2597 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2598 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_inode_lockres; 2599 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 2600 2601 mlog(0, "inode %llu drop %s META lock\n", 2602 (unsigned long long)OCFS2_I(inode)->ip_blkno, 2603 ex ? "EXMODE" : "PRMODE"); 2604 2605 if (!ocfs2_is_hard_readonly(osb) && 2606 !ocfs2_mount_local(osb)) 2607 ocfs2_cluster_unlock(osb, lockres, level); 2608 } 2609 2610 /* 2611 * This _tracker variantes are introduced to deal with the recursive cluster 2612 * locking issue. The idea is to keep track of a lock holder on the stack of 2613 * the current process. If there's a lock holder on the stack, we know the 2614 * task context is already protected by cluster locking. Currently, they're 2615 * used in some VFS entry routines. 2616 * 2617 * return < 0 on error, return == 0 if there's no lock holder on the stack 2618 * before this call, return == 1 if this call would be a recursive locking. 2619 * return == -1 if this lock attempt will cause an upgrade which is forbidden. 2620 * 2621 * When taking lock levels into account,we face some different situations. 2622 * 2623 * 1. no lock is held 2624 * In this case, just lock the inode as requested and return 0 2625 * 2626 * 2. We are holding a lock 2627 * For this situation, things diverges into several cases 2628 * 2629 * wanted holding what to do 2630 * ex ex see 2.1 below 2631 * ex pr see 2.2 below 2632 * pr ex see 2.1 below 2633 * pr pr see 2.1 below 2634 * 2635 * 2.1 lock level that is been held is compatible 2636 * with the wanted level, so no lock action will be tacken. 2637 * 2638 * 2.2 Otherwise, an upgrade is needed, but it is forbidden. 2639 * 2640 * Reason why upgrade within a process is forbidden is that 2641 * lock upgrade may cause dead lock. The following illustrates 2642 * how it happens. 2643 * 2644 * thread on node1 thread on node2 2645 * ocfs2_inode_lock_tracker(ex=0) 2646 * 2647 * <====== ocfs2_inode_lock_tracker(ex=1) 2648 * 2649 * ocfs2_inode_lock_tracker(ex=1) 2650 */ 2651 int ocfs2_inode_lock_tracker(struct inode *inode, 2652 struct buffer_head **ret_bh, 2653 int ex, 2654 struct ocfs2_lock_holder *oh) 2655 { 2656 int status = 0; 2657 struct ocfs2_lock_res *lockres; 2658 struct ocfs2_lock_holder *tmp_oh; 2659 struct pid *pid = task_pid(current); 2660 2661 2662 lockres = &OCFS2_I(inode)->ip_inode_lockres; 2663 tmp_oh = ocfs2_pid_holder(lockres, pid); 2664 2665 if (!tmp_oh) { 2666 /* 2667 * This corresponds to the case 1. 2668 * We haven't got any lock before. 2669 */ 2670 status = ocfs2_inode_lock_full(inode, ret_bh, ex, 0); 2671 if (status < 0) { 2672 if (status != -ENOENT) 2673 mlog_errno(status); 2674 return status; 2675 } 2676 2677 oh->oh_ex = ex; 2678 ocfs2_add_holder(lockres, oh); 2679 return 0; 2680 } 2681 2682 if (unlikely(ex && !tmp_oh->oh_ex)) { 2683 /* 2684 * case 2.2 upgrade may cause dead lock, forbid it. 2685 */ 2686 mlog(ML_ERROR, "Recursive locking is not permitted to " 2687 "upgrade to EX level from PR level.\n"); 2688 dump_stack(); 2689 return -EINVAL; 2690 } 2691 2692 /* 2693 * case 2.1 OCFS2_META_LOCK_GETBH flag make ocfs2_inode_lock_full. 2694 * ignore the lock level and just update it. 2695 */ 2696 if (ret_bh) { 2697 status = ocfs2_inode_lock_full(inode, ret_bh, ex, 2698 OCFS2_META_LOCK_GETBH); 2699 if (status < 0) { 2700 if (status != -ENOENT) 2701 mlog_errno(status); 2702 return status; 2703 } 2704 } 2705 return tmp_oh ? 1 : 0; 2706 } 2707 2708 void ocfs2_inode_unlock_tracker(struct inode *inode, 2709 int ex, 2710 struct ocfs2_lock_holder *oh, 2711 int had_lock) 2712 { 2713 struct ocfs2_lock_res *lockres; 2714 2715 lockres = &OCFS2_I(inode)->ip_inode_lockres; 2716 /* had_lock means that the currect process already takes the cluster 2717 * lock previously. 2718 * If had_lock is 1, we have nothing to do here. 2719 * If had_lock is 0, we will release the lock. 2720 */ 2721 if (!had_lock) { 2722 ocfs2_inode_unlock(inode, oh->oh_ex); 2723 ocfs2_remove_holder(lockres, oh); 2724 } 2725 } 2726 2727 int ocfs2_orphan_scan_lock(struct ocfs2_super *osb, u32 *seqno) 2728 { 2729 struct ocfs2_lock_res *lockres; 2730 struct ocfs2_orphan_scan_lvb *lvb; 2731 int status = 0; 2732 2733 if (ocfs2_is_hard_readonly(osb)) 2734 return -EROFS; 2735 2736 if (ocfs2_mount_local(osb)) 2737 return 0; 2738 2739 lockres = &osb->osb_orphan_scan.os_lockres; 2740 status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0); 2741 if (status < 0) 2742 return status; 2743 2744 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2745 if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) && 2746 lvb->lvb_version == OCFS2_ORPHAN_LVB_VERSION) 2747 *seqno = be32_to_cpu(lvb->lvb_os_seqno); 2748 else 2749 *seqno = osb->osb_orphan_scan.os_seqno + 1; 2750 2751 return status; 2752 } 2753 2754 void ocfs2_orphan_scan_unlock(struct ocfs2_super *osb, u32 seqno) 2755 { 2756 struct ocfs2_lock_res *lockres; 2757 struct ocfs2_orphan_scan_lvb *lvb; 2758 2759 if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb)) { 2760 lockres = &osb->osb_orphan_scan.os_lockres; 2761 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2762 lvb->lvb_version = OCFS2_ORPHAN_LVB_VERSION; 2763 lvb->lvb_os_seqno = cpu_to_be32(seqno); 2764 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX); 2765 } 2766 } 2767 2768 int ocfs2_super_lock(struct ocfs2_super *osb, 2769 int ex) 2770 { 2771 int status = 0; 2772 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2773 struct ocfs2_lock_res *lockres = &osb->osb_super_lockres; 2774 2775 if (ocfs2_is_hard_readonly(osb)) 2776 return -EROFS; 2777 2778 if (ocfs2_mount_local(osb)) 2779 goto bail; 2780 2781 status = ocfs2_cluster_lock(osb, lockres, level, 0, 0); 2782 if (status < 0) { 2783 mlog_errno(status); 2784 goto bail; 2785 } 2786 2787 /* The super block lock path is really in the best position to 2788 * know when resources covered by the lock need to be 2789 * refreshed, so we do it here. Of course, making sense of 2790 * everything is up to the caller :) */ 2791 status = ocfs2_should_refresh_lock_res(lockres); 2792 if (status) { 2793 status = ocfs2_refresh_slot_info(osb); 2794 2795 ocfs2_complete_lock_res_refresh(lockres, status); 2796 2797 if (status < 0) { 2798 ocfs2_cluster_unlock(osb, lockres, level); 2799 mlog_errno(status); 2800 } 2801 ocfs2_track_lock_refresh(lockres); 2802 } 2803 bail: 2804 return status; 2805 } 2806 2807 void ocfs2_super_unlock(struct ocfs2_super *osb, 2808 int ex) 2809 { 2810 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2811 struct ocfs2_lock_res *lockres = &osb->osb_super_lockres; 2812 2813 if (!ocfs2_mount_local(osb)) 2814 ocfs2_cluster_unlock(osb, lockres, level); 2815 } 2816 2817 int ocfs2_rename_lock(struct ocfs2_super *osb) 2818 { 2819 int status; 2820 struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres; 2821 2822 if (ocfs2_is_hard_readonly(osb)) 2823 return -EROFS; 2824 2825 if (ocfs2_mount_local(osb)) 2826 return 0; 2827 2828 status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0); 2829 if (status < 0) 2830 mlog_errno(status); 2831 2832 return status; 2833 } 2834 2835 void ocfs2_rename_unlock(struct ocfs2_super *osb) 2836 { 2837 struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres; 2838 2839 if (!ocfs2_mount_local(osb)) 2840 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX); 2841 } 2842 2843 int ocfs2_nfs_sync_lock(struct ocfs2_super *osb, int ex) 2844 { 2845 int status; 2846 struct ocfs2_lock_res *lockres = &osb->osb_nfs_sync_lockres; 2847 2848 if (ocfs2_is_hard_readonly(osb)) 2849 return -EROFS; 2850 2851 if (ocfs2_mount_local(osb)) 2852 return 0; 2853 2854 status = ocfs2_cluster_lock(osb, lockres, ex ? LKM_EXMODE : LKM_PRMODE, 2855 0, 0); 2856 if (status < 0) 2857 mlog(ML_ERROR, "lock on nfs sync lock failed %d\n", status); 2858 2859 return status; 2860 } 2861 2862 void ocfs2_nfs_sync_unlock(struct ocfs2_super *osb, int ex) 2863 { 2864 struct ocfs2_lock_res *lockres = &osb->osb_nfs_sync_lockres; 2865 2866 if (!ocfs2_mount_local(osb)) 2867 ocfs2_cluster_unlock(osb, lockres, 2868 ex ? LKM_EXMODE : LKM_PRMODE); 2869 } 2870 2871 int ocfs2_trim_fs_lock(struct ocfs2_super *osb, 2872 struct ocfs2_trim_fs_info *info, int trylock) 2873 { 2874 int status; 2875 struct ocfs2_trim_fs_lvb *lvb; 2876 struct ocfs2_lock_res *lockres = &osb->osb_trim_fs_lockres; 2877 2878 if (info) 2879 info->tf_valid = 0; 2880 2881 if (ocfs2_is_hard_readonly(osb)) 2882 return -EROFS; 2883 2884 if (ocfs2_mount_local(osb)) 2885 return 0; 2886 2887 status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 2888 trylock ? DLM_LKF_NOQUEUE : 0, 0); 2889 if (status < 0) { 2890 if (status != -EAGAIN) 2891 mlog_errno(status); 2892 return status; 2893 } 2894 2895 if (info) { 2896 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2897 if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) && 2898 lvb->lvb_version == OCFS2_TRIMFS_LVB_VERSION) { 2899 info->tf_valid = 1; 2900 info->tf_success = lvb->lvb_success; 2901 info->tf_nodenum = be32_to_cpu(lvb->lvb_nodenum); 2902 info->tf_start = be64_to_cpu(lvb->lvb_start); 2903 info->tf_len = be64_to_cpu(lvb->lvb_len); 2904 info->tf_minlen = be64_to_cpu(lvb->lvb_minlen); 2905 info->tf_trimlen = be64_to_cpu(lvb->lvb_trimlen); 2906 } 2907 } 2908 2909 return status; 2910 } 2911 2912 void ocfs2_trim_fs_unlock(struct ocfs2_super *osb, 2913 struct ocfs2_trim_fs_info *info) 2914 { 2915 struct ocfs2_trim_fs_lvb *lvb; 2916 struct ocfs2_lock_res *lockres = &osb->osb_trim_fs_lockres; 2917 2918 if (ocfs2_mount_local(osb)) 2919 return; 2920 2921 if (info) { 2922 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2923 lvb->lvb_version = OCFS2_TRIMFS_LVB_VERSION; 2924 lvb->lvb_success = info->tf_success; 2925 lvb->lvb_nodenum = cpu_to_be32(info->tf_nodenum); 2926 lvb->lvb_start = cpu_to_be64(info->tf_start); 2927 lvb->lvb_len = cpu_to_be64(info->tf_len); 2928 lvb->lvb_minlen = cpu_to_be64(info->tf_minlen); 2929 lvb->lvb_trimlen = cpu_to_be64(info->tf_trimlen); 2930 } 2931 2932 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX); 2933 } 2934 2935 int ocfs2_dentry_lock(struct dentry *dentry, int ex) 2936 { 2937 int ret; 2938 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2939 struct ocfs2_dentry_lock *dl = dentry->d_fsdata; 2940 struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb); 2941 2942 BUG_ON(!dl); 2943 2944 if (ocfs2_is_hard_readonly(osb)) { 2945 if (ex) 2946 return -EROFS; 2947 return 0; 2948 } 2949 2950 if (ocfs2_mount_local(osb)) 2951 return 0; 2952 2953 ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0); 2954 if (ret < 0) 2955 mlog_errno(ret); 2956 2957 return ret; 2958 } 2959 2960 void ocfs2_dentry_unlock(struct dentry *dentry, int ex) 2961 { 2962 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2963 struct ocfs2_dentry_lock *dl = dentry->d_fsdata; 2964 struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb); 2965 2966 if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb)) 2967 ocfs2_cluster_unlock(osb, &dl->dl_lockres, level); 2968 } 2969 2970 /* Reference counting of the dlm debug structure. We want this because 2971 * open references on the debug inodes can live on after a mount, so 2972 * we can't rely on the ocfs2_super to always exist. */ 2973 static void ocfs2_dlm_debug_free(struct kref *kref) 2974 { 2975 struct ocfs2_dlm_debug *dlm_debug; 2976 2977 dlm_debug = container_of(kref, struct ocfs2_dlm_debug, d_refcnt); 2978 2979 kfree(dlm_debug); 2980 } 2981 2982 void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug) 2983 { 2984 if (dlm_debug) 2985 kref_put(&dlm_debug->d_refcnt, ocfs2_dlm_debug_free); 2986 } 2987 2988 static void ocfs2_get_dlm_debug(struct ocfs2_dlm_debug *debug) 2989 { 2990 kref_get(&debug->d_refcnt); 2991 } 2992 2993 struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void) 2994 { 2995 struct ocfs2_dlm_debug *dlm_debug; 2996 2997 dlm_debug = kmalloc(sizeof(struct ocfs2_dlm_debug), GFP_KERNEL); 2998 if (!dlm_debug) { 2999 mlog_errno(-ENOMEM); 3000 goto out; 3001 } 3002 3003 kref_init(&dlm_debug->d_refcnt); 3004 INIT_LIST_HEAD(&dlm_debug->d_lockres_tracking); 3005 dlm_debug->d_locking_state = NULL; 3006 out: 3007 return dlm_debug; 3008 } 3009 3010 /* Access to this is arbitrated for us via seq_file->sem. */ 3011 struct ocfs2_dlm_seq_priv { 3012 struct ocfs2_dlm_debug *p_dlm_debug; 3013 struct ocfs2_lock_res p_iter_res; 3014 struct ocfs2_lock_res p_tmp_res; 3015 }; 3016 3017 static struct ocfs2_lock_res *ocfs2_dlm_next_res(struct ocfs2_lock_res *start, 3018 struct ocfs2_dlm_seq_priv *priv) 3019 { 3020 struct ocfs2_lock_res *iter, *ret = NULL; 3021 struct ocfs2_dlm_debug *dlm_debug = priv->p_dlm_debug; 3022 3023 assert_spin_locked(&ocfs2_dlm_tracking_lock); 3024 3025 list_for_each_entry(iter, &start->l_debug_list, l_debug_list) { 3026 /* discover the head of the list */ 3027 if (&iter->l_debug_list == &dlm_debug->d_lockres_tracking) { 3028 mlog(0, "End of list found, %p\n", ret); 3029 break; 3030 } 3031 3032 /* We track our "dummy" iteration lockres' by a NULL 3033 * l_ops field. */ 3034 if (iter->l_ops != NULL) { 3035 ret = iter; 3036 break; 3037 } 3038 } 3039 3040 return ret; 3041 } 3042 3043 static void *ocfs2_dlm_seq_start(struct seq_file *m, loff_t *pos) 3044 { 3045 struct ocfs2_dlm_seq_priv *priv = m->private; 3046 struct ocfs2_lock_res *iter; 3047 3048 spin_lock(&ocfs2_dlm_tracking_lock); 3049 iter = ocfs2_dlm_next_res(&priv->p_iter_res, priv); 3050 if (iter) { 3051 /* Since lockres' have the lifetime of their container 3052 * (which can be inodes, ocfs2_supers, etc) we want to 3053 * copy this out to a temporary lockres while still 3054 * under the spinlock. Obviously after this we can't 3055 * trust any pointers on the copy returned, but that's 3056 * ok as the information we want isn't typically held 3057 * in them. */ 3058 priv->p_tmp_res = *iter; 3059 iter = &priv->p_tmp_res; 3060 } 3061 spin_unlock(&ocfs2_dlm_tracking_lock); 3062 3063 return iter; 3064 } 3065 3066 static void ocfs2_dlm_seq_stop(struct seq_file *m, void *v) 3067 { 3068 } 3069 3070 static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos) 3071 { 3072 struct ocfs2_dlm_seq_priv *priv = m->private; 3073 struct ocfs2_lock_res *iter = v; 3074 struct ocfs2_lock_res *dummy = &priv->p_iter_res; 3075 3076 spin_lock(&ocfs2_dlm_tracking_lock); 3077 iter = ocfs2_dlm_next_res(iter, priv); 3078 list_del_init(&dummy->l_debug_list); 3079 if (iter) { 3080 list_add(&dummy->l_debug_list, &iter->l_debug_list); 3081 priv->p_tmp_res = *iter; 3082 iter = &priv->p_tmp_res; 3083 } 3084 spin_unlock(&ocfs2_dlm_tracking_lock); 3085 3086 return iter; 3087 } 3088 3089 /* 3090 * Version is used by debugfs.ocfs2 to determine the format being used 3091 * 3092 * New in version 2 3093 * - Lock stats printed 3094 * New in version 3 3095 * - Max time in lock stats is in usecs (instead of nsecs) 3096 */ 3097 #define OCFS2_DLM_DEBUG_STR_VERSION 3 3098 static int ocfs2_dlm_seq_show(struct seq_file *m, void *v) 3099 { 3100 int i; 3101 char *lvb; 3102 struct ocfs2_lock_res *lockres = v; 3103 3104 if (!lockres) 3105 return -EINVAL; 3106 3107 seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION); 3108 3109 if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY) 3110 seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1, 3111 lockres->l_name, 3112 (unsigned int)ocfs2_get_dentry_lock_ino(lockres)); 3113 else 3114 seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name); 3115 3116 seq_printf(m, "%d\t" 3117 "0x%lx\t" 3118 "0x%x\t" 3119 "0x%x\t" 3120 "%u\t" 3121 "%u\t" 3122 "%d\t" 3123 "%d\t", 3124 lockres->l_level, 3125 lockres->l_flags, 3126 lockres->l_action, 3127 lockres->l_unlock_action, 3128 lockres->l_ro_holders, 3129 lockres->l_ex_holders, 3130 lockres->l_requested, 3131 lockres->l_blocking); 3132 3133 /* Dump the raw LVB */ 3134 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 3135 for(i = 0; i < DLM_LVB_LEN; i++) 3136 seq_printf(m, "0x%x\t", lvb[i]); 3137 3138 #ifdef CONFIG_OCFS2_FS_STATS 3139 # define lock_num_prmode(_l) ((_l)->l_lock_prmode.ls_gets) 3140 # define lock_num_exmode(_l) ((_l)->l_lock_exmode.ls_gets) 3141 # define lock_num_prmode_failed(_l) ((_l)->l_lock_prmode.ls_fail) 3142 # define lock_num_exmode_failed(_l) ((_l)->l_lock_exmode.ls_fail) 3143 # define lock_total_prmode(_l) ((_l)->l_lock_prmode.ls_total) 3144 # define lock_total_exmode(_l) ((_l)->l_lock_exmode.ls_total) 3145 # define lock_max_prmode(_l) ((_l)->l_lock_prmode.ls_max) 3146 # define lock_max_exmode(_l) ((_l)->l_lock_exmode.ls_max) 3147 # define lock_refresh(_l) ((_l)->l_lock_refresh) 3148 #else 3149 # define lock_num_prmode(_l) (0) 3150 # define lock_num_exmode(_l) (0) 3151 # define lock_num_prmode_failed(_l) (0) 3152 # define lock_num_exmode_failed(_l) (0) 3153 # define lock_total_prmode(_l) (0ULL) 3154 # define lock_total_exmode(_l) (0ULL) 3155 # define lock_max_prmode(_l) (0) 3156 # define lock_max_exmode(_l) (0) 3157 # define lock_refresh(_l) (0) 3158 #endif 3159 /* The following seq_print was added in version 2 of this output */ 3160 seq_printf(m, "%u\t" 3161 "%u\t" 3162 "%u\t" 3163 "%u\t" 3164 "%llu\t" 3165 "%llu\t" 3166 "%u\t" 3167 "%u\t" 3168 "%u\t", 3169 lock_num_prmode(lockres), 3170 lock_num_exmode(lockres), 3171 lock_num_prmode_failed(lockres), 3172 lock_num_exmode_failed(lockres), 3173 lock_total_prmode(lockres), 3174 lock_total_exmode(lockres), 3175 lock_max_prmode(lockres), 3176 lock_max_exmode(lockres), 3177 lock_refresh(lockres)); 3178 3179 /* End the line */ 3180 seq_printf(m, "\n"); 3181 return 0; 3182 } 3183 3184 static const struct seq_operations ocfs2_dlm_seq_ops = { 3185 .start = ocfs2_dlm_seq_start, 3186 .stop = ocfs2_dlm_seq_stop, 3187 .next = ocfs2_dlm_seq_next, 3188 .show = ocfs2_dlm_seq_show, 3189 }; 3190 3191 static int ocfs2_dlm_debug_release(struct inode *inode, struct file *file) 3192 { 3193 struct seq_file *seq = file->private_data; 3194 struct ocfs2_dlm_seq_priv *priv = seq->private; 3195 struct ocfs2_lock_res *res = &priv->p_iter_res; 3196 3197 ocfs2_remove_lockres_tracking(res); 3198 ocfs2_put_dlm_debug(priv->p_dlm_debug); 3199 return seq_release_private(inode, file); 3200 } 3201 3202 static int ocfs2_dlm_debug_open(struct inode *inode, struct file *file) 3203 { 3204 struct ocfs2_dlm_seq_priv *priv; 3205 struct ocfs2_super *osb; 3206 3207 priv = __seq_open_private(file, &ocfs2_dlm_seq_ops, sizeof(*priv)); 3208 if (!priv) { 3209 mlog_errno(-ENOMEM); 3210 return -ENOMEM; 3211 } 3212 3213 osb = inode->i_private; 3214 ocfs2_get_dlm_debug(osb->osb_dlm_debug); 3215 priv->p_dlm_debug = osb->osb_dlm_debug; 3216 INIT_LIST_HEAD(&priv->p_iter_res.l_debug_list); 3217 3218 ocfs2_add_lockres_tracking(&priv->p_iter_res, 3219 priv->p_dlm_debug); 3220 3221 return 0; 3222 } 3223 3224 static const struct file_operations ocfs2_dlm_debug_fops = { 3225 .open = ocfs2_dlm_debug_open, 3226 .release = ocfs2_dlm_debug_release, 3227 .read = seq_read, 3228 .llseek = seq_lseek, 3229 }; 3230 3231 static int ocfs2_dlm_init_debug(struct ocfs2_super *osb) 3232 { 3233 int ret = 0; 3234 struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug; 3235 3236 dlm_debug->d_locking_state = debugfs_create_file("locking_state", 3237 S_IFREG|S_IRUSR, 3238 osb->osb_debug_root, 3239 osb, 3240 &ocfs2_dlm_debug_fops); 3241 if (!dlm_debug->d_locking_state) { 3242 ret = -EINVAL; 3243 mlog(ML_ERROR, 3244 "Unable to create locking state debugfs file.\n"); 3245 goto out; 3246 } 3247 3248 ocfs2_get_dlm_debug(dlm_debug); 3249 out: 3250 return ret; 3251 } 3252 3253 static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb) 3254 { 3255 struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug; 3256 3257 if (dlm_debug) { 3258 debugfs_remove(dlm_debug->d_locking_state); 3259 ocfs2_put_dlm_debug(dlm_debug); 3260 } 3261 } 3262 3263 int ocfs2_dlm_init(struct ocfs2_super *osb) 3264 { 3265 int status = 0; 3266 struct ocfs2_cluster_connection *conn = NULL; 3267 3268 if (ocfs2_mount_local(osb)) { 3269 osb->node_num = 0; 3270 goto local; 3271 } 3272 3273 status = ocfs2_dlm_init_debug(osb); 3274 if (status < 0) { 3275 mlog_errno(status); 3276 goto bail; 3277 } 3278 3279 /* launch downconvert thread */ 3280 osb->dc_task = kthread_run(ocfs2_downconvert_thread, osb, "ocfs2dc-%s", 3281 osb->uuid_str); 3282 if (IS_ERR(osb->dc_task)) { 3283 status = PTR_ERR(osb->dc_task); 3284 osb->dc_task = NULL; 3285 mlog_errno(status); 3286 goto bail; 3287 } 3288 3289 /* for now, uuid == domain */ 3290 status = ocfs2_cluster_connect(osb->osb_cluster_stack, 3291 osb->osb_cluster_name, 3292 strlen(osb->osb_cluster_name), 3293 osb->uuid_str, 3294 strlen(osb->uuid_str), 3295 &lproto, ocfs2_do_node_down, osb, 3296 &conn); 3297 if (status) { 3298 mlog_errno(status); 3299 goto bail; 3300 } 3301 3302 status = ocfs2_cluster_this_node(conn, &osb->node_num); 3303 if (status < 0) { 3304 mlog_errno(status); 3305 mlog(ML_ERROR, 3306 "could not find this host's node number\n"); 3307 ocfs2_cluster_disconnect(conn, 0); 3308 goto bail; 3309 } 3310 3311 local: 3312 ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb); 3313 ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb); 3314 ocfs2_nfs_sync_lock_res_init(&osb->osb_nfs_sync_lockres, osb); 3315 ocfs2_orphan_scan_lock_res_init(&osb->osb_orphan_scan.os_lockres, osb); 3316 3317 osb->cconn = conn; 3318 bail: 3319 if (status < 0) { 3320 ocfs2_dlm_shutdown_debug(osb); 3321 if (osb->dc_task) 3322 kthread_stop(osb->dc_task); 3323 } 3324 3325 return status; 3326 } 3327 3328 void ocfs2_dlm_shutdown(struct ocfs2_super *osb, 3329 int hangup_pending) 3330 { 3331 ocfs2_drop_osb_locks(osb); 3332 3333 /* 3334 * Now that we have dropped all locks and ocfs2_dismount_volume() 3335 * has disabled recovery, the DLM won't be talking to us. It's 3336 * safe to tear things down before disconnecting the cluster. 3337 */ 3338 3339 if (osb->dc_task) { 3340 kthread_stop(osb->dc_task); 3341 osb->dc_task = NULL; 3342 } 3343 3344 ocfs2_lock_res_free(&osb->osb_super_lockres); 3345 ocfs2_lock_res_free(&osb->osb_rename_lockres); 3346 ocfs2_lock_res_free(&osb->osb_nfs_sync_lockres); 3347 ocfs2_lock_res_free(&osb->osb_orphan_scan.os_lockres); 3348 3349 ocfs2_cluster_disconnect(osb->cconn, hangup_pending); 3350 osb->cconn = NULL; 3351 3352 ocfs2_dlm_shutdown_debug(osb); 3353 } 3354 3355 static int ocfs2_drop_lock(struct ocfs2_super *osb, 3356 struct ocfs2_lock_res *lockres) 3357 { 3358 int ret; 3359 unsigned long flags; 3360 u32 lkm_flags = 0; 3361 3362 /* We didn't get anywhere near actually using this lockres. */ 3363 if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED)) 3364 goto out; 3365 3366 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) 3367 lkm_flags |= DLM_LKF_VALBLK; 3368 3369 spin_lock_irqsave(&lockres->l_lock, flags); 3370 3371 mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_FREEING), 3372 "lockres %s, flags 0x%lx\n", 3373 lockres->l_name, lockres->l_flags); 3374 3375 while (lockres->l_flags & OCFS2_LOCK_BUSY) { 3376 mlog(0, "waiting on busy lock \"%s\": flags = %lx, action = " 3377 "%u, unlock_action = %u\n", 3378 lockres->l_name, lockres->l_flags, lockres->l_action, 3379 lockres->l_unlock_action); 3380 3381 spin_unlock_irqrestore(&lockres->l_lock, flags); 3382 3383 /* XXX: Today we just wait on any busy 3384 * locks... Perhaps we need to cancel converts in the 3385 * future? */ 3386 ocfs2_wait_on_busy_lock(lockres); 3387 3388 spin_lock_irqsave(&lockres->l_lock, flags); 3389 } 3390 3391 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) { 3392 if (lockres->l_flags & OCFS2_LOCK_ATTACHED && 3393 lockres->l_level == DLM_LOCK_EX && 3394 !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) 3395 lockres->l_ops->set_lvb(lockres); 3396 } 3397 3398 if (lockres->l_flags & OCFS2_LOCK_BUSY) 3399 mlog(ML_ERROR, "destroying busy lock: \"%s\"\n", 3400 lockres->l_name); 3401 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) 3402 mlog(0, "destroying blocked lock: \"%s\"\n", lockres->l_name); 3403 3404 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 3405 spin_unlock_irqrestore(&lockres->l_lock, flags); 3406 goto out; 3407 } 3408 3409 lockres_clear_flags(lockres, OCFS2_LOCK_ATTACHED); 3410 3411 /* make sure we never get here while waiting for an ast to 3412 * fire. */ 3413 BUG_ON(lockres->l_action != OCFS2_AST_INVALID); 3414 3415 /* is this necessary? */ 3416 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 3417 lockres->l_unlock_action = OCFS2_UNLOCK_DROP_LOCK; 3418 spin_unlock_irqrestore(&lockres->l_lock, flags); 3419 3420 mlog(0, "lock %s\n", lockres->l_name); 3421 3422 ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, lkm_flags); 3423 if (ret) { 3424 ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres); 3425 mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags); 3426 ocfs2_dlm_dump_lksb(&lockres->l_lksb); 3427 BUG(); 3428 } 3429 mlog(0, "lock %s, successful return from ocfs2_dlm_unlock\n", 3430 lockres->l_name); 3431 3432 ocfs2_wait_on_busy_lock(lockres); 3433 out: 3434 return 0; 3435 } 3436 3437 static void ocfs2_process_blocked_lock(struct ocfs2_super *osb, 3438 struct ocfs2_lock_res *lockres); 3439 3440 /* Mark the lockres as being dropped. It will no longer be 3441 * queued if blocking, but we still may have to wait on it 3442 * being dequeued from the downconvert thread before we can consider 3443 * it safe to drop. 3444 * 3445 * You can *not* attempt to call cluster_lock on this lockres anymore. */ 3446 void ocfs2_mark_lockres_freeing(struct ocfs2_super *osb, 3447 struct ocfs2_lock_res *lockres) 3448 { 3449 int status; 3450 struct ocfs2_mask_waiter mw; 3451 unsigned long flags, flags2; 3452 3453 ocfs2_init_mask_waiter(&mw); 3454 3455 spin_lock_irqsave(&lockres->l_lock, flags); 3456 lockres->l_flags |= OCFS2_LOCK_FREEING; 3457 if (lockres->l_flags & OCFS2_LOCK_QUEUED && current == osb->dc_task) { 3458 /* 3459 * We know the downconvert is queued but not in progress 3460 * because we are the downconvert thread and processing 3461 * different lock. So we can just remove the lock from the 3462 * queue. This is not only an optimization but also a way 3463 * to avoid the following deadlock: 3464 * ocfs2_dentry_post_unlock() 3465 * ocfs2_dentry_lock_put() 3466 * ocfs2_drop_dentry_lock() 3467 * iput() 3468 * ocfs2_evict_inode() 3469 * ocfs2_clear_inode() 3470 * ocfs2_mark_lockres_freeing() 3471 * ... blocks waiting for OCFS2_LOCK_QUEUED 3472 * since we are the downconvert thread which 3473 * should clear the flag. 3474 */ 3475 spin_unlock_irqrestore(&lockres->l_lock, flags); 3476 spin_lock_irqsave(&osb->dc_task_lock, flags2); 3477 list_del_init(&lockres->l_blocked_list); 3478 osb->blocked_lock_count--; 3479 spin_unlock_irqrestore(&osb->dc_task_lock, flags2); 3480 /* 3481 * Warn if we recurse into another post_unlock call. Strictly 3482 * speaking it isn't a problem but we need to be careful if 3483 * that happens (stack overflow, deadlocks, ...) so warn if 3484 * ocfs2 grows a path for which this can happen. 3485 */ 3486 WARN_ON_ONCE(lockres->l_ops->post_unlock); 3487 /* Since the lock is freeing we don't do much in the fn below */ 3488 ocfs2_process_blocked_lock(osb, lockres); 3489 return; 3490 } 3491 while (lockres->l_flags & OCFS2_LOCK_QUEUED) { 3492 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_QUEUED, 0); 3493 spin_unlock_irqrestore(&lockres->l_lock, flags); 3494 3495 mlog(0, "Waiting on lockres %s\n", lockres->l_name); 3496 3497 status = ocfs2_wait_for_mask(&mw); 3498 if (status) 3499 mlog_errno(status); 3500 3501 spin_lock_irqsave(&lockres->l_lock, flags); 3502 } 3503 spin_unlock_irqrestore(&lockres->l_lock, flags); 3504 } 3505 3506 void ocfs2_simple_drop_lockres(struct ocfs2_super *osb, 3507 struct ocfs2_lock_res *lockres) 3508 { 3509 int ret; 3510 3511 ocfs2_mark_lockres_freeing(osb, lockres); 3512 ret = ocfs2_drop_lock(osb, lockres); 3513 if (ret) 3514 mlog_errno(ret); 3515 } 3516 3517 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb) 3518 { 3519 ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres); 3520 ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres); 3521 ocfs2_simple_drop_lockres(osb, &osb->osb_nfs_sync_lockres); 3522 ocfs2_simple_drop_lockres(osb, &osb->osb_orphan_scan.os_lockres); 3523 } 3524 3525 int ocfs2_drop_inode_locks(struct inode *inode) 3526 { 3527 int status, err; 3528 3529 /* No need to call ocfs2_mark_lockres_freeing here - 3530 * ocfs2_clear_inode has done it for us. */ 3531 3532 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 3533 &OCFS2_I(inode)->ip_open_lockres); 3534 if (err < 0) 3535 mlog_errno(err); 3536 3537 status = err; 3538 3539 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 3540 &OCFS2_I(inode)->ip_inode_lockres); 3541 if (err < 0) 3542 mlog_errno(err); 3543 if (err < 0 && !status) 3544 status = err; 3545 3546 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 3547 &OCFS2_I(inode)->ip_rw_lockres); 3548 if (err < 0) 3549 mlog_errno(err); 3550 if (err < 0 && !status) 3551 status = err; 3552 3553 return status; 3554 } 3555 3556 static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres, 3557 int new_level) 3558 { 3559 assert_spin_locked(&lockres->l_lock); 3560 3561 BUG_ON(lockres->l_blocking <= DLM_LOCK_NL); 3562 3563 if (lockres->l_level <= new_level) { 3564 mlog(ML_ERROR, "lockres %s, lvl %d <= %d, blcklst %d, mask %d, " 3565 "type %d, flags 0x%lx, hold %d %d, act %d %d, req %d, " 3566 "block %d, pgen %d\n", lockres->l_name, lockres->l_level, 3567 new_level, list_empty(&lockres->l_blocked_list), 3568 list_empty(&lockres->l_mask_waiters), lockres->l_type, 3569 lockres->l_flags, lockres->l_ro_holders, 3570 lockres->l_ex_holders, lockres->l_action, 3571 lockres->l_unlock_action, lockres->l_requested, 3572 lockres->l_blocking, lockres->l_pending_gen); 3573 BUG(); 3574 } 3575 3576 mlog(ML_BASTS, "lockres %s, level %d => %d, blocking %d\n", 3577 lockres->l_name, lockres->l_level, new_level, lockres->l_blocking); 3578 3579 lockres->l_action = OCFS2_AST_DOWNCONVERT; 3580 lockres->l_requested = new_level; 3581 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 3582 return lockres_set_pending(lockres); 3583 } 3584 3585 static int ocfs2_downconvert_lock(struct ocfs2_super *osb, 3586 struct ocfs2_lock_res *lockres, 3587 int new_level, 3588 int lvb, 3589 unsigned int generation) 3590 { 3591 int ret; 3592 u32 dlm_flags = DLM_LKF_CONVERT; 3593 3594 mlog(ML_BASTS, "lockres %s, level %d => %d\n", lockres->l_name, 3595 lockres->l_level, new_level); 3596 3597 /* 3598 * On DLM_LKF_VALBLK, fsdlm behaves differently with o2cb. It always 3599 * expects DLM_LKF_VALBLK being set if the LKB has LVB, so that 3600 * we can recover correctly from node failure. Otherwise, we may get 3601 * invalid LVB in LKB, but without DLM_SBF_VALNOTVALID being set. 3602 */ 3603 if (ocfs2_userspace_stack(osb) && 3604 lockres->l_ops->flags & LOCK_TYPE_USES_LVB) 3605 lvb = 1; 3606 3607 if (lvb) 3608 dlm_flags |= DLM_LKF_VALBLK; 3609 3610 ret = ocfs2_dlm_lock(osb->cconn, 3611 new_level, 3612 &lockres->l_lksb, 3613 dlm_flags, 3614 lockres->l_name, 3615 OCFS2_LOCK_ID_MAX_LEN - 1); 3616 lockres_clear_pending(lockres, generation, osb); 3617 if (ret) { 3618 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 3619 ocfs2_recover_from_dlm_error(lockres, 1); 3620 goto bail; 3621 } 3622 3623 ret = 0; 3624 bail: 3625 return ret; 3626 } 3627 3628 /* returns 1 when the caller should unlock and call ocfs2_dlm_unlock */ 3629 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb, 3630 struct ocfs2_lock_res *lockres) 3631 { 3632 assert_spin_locked(&lockres->l_lock); 3633 3634 if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) { 3635 /* If we're already trying to cancel a lock conversion 3636 * then just drop the spinlock and allow the caller to 3637 * requeue this lock. */ 3638 mlog(ML_BASTS, "lockres %s, skip convert\n", lockres->l_name); 3639 return 0; 3640 } 3641 3642 /* were we in a convert when we got the bast fire? */ 3643 BUG_ON(lockres->l_action != OCFS2_AST_CONVERT && 3644 lockres->l_action != OCFS2_AST_DOWNCONVERT); 3645 /* set things up for the unlockast to know to just 3646 * clear out the ast_action and unset busy, etc. */ 3647 lockres->l_unlock_action = OCFS2_UNLOCK_CANCEL_CONVERT; 3648 3649 mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_BUSY), 3650 "lock %s, invalid flags: 0x%lx\n", 3651 lockres->l_name, lockres->l_flags); 3652 3653 mlog(ML_BASTS, "lockres %s\n", lockres->l_name); 3654 3655 return 1; 3656 } 3657 3658 static int ocfs2_cancel_convert(struct ocfs2_super *osb, 3659 struct ocfs2_lock_res *lockres) 3660 { 3661 int ret; 3662 3663 ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, 3664 DLM_LKF_CANCEL); 3665 if (ret) { 3666 ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres); 3667 ocfs2_recover_from_dlm_error(lockres, 0); 3668 } 3669 3670 mlog(ML_BASTS, "lockres %s\n", lockres->l_name); 3671 3672 return ret; 3673 } 3674 3675 static int ocfs2_unblock_lock(struct ocfs2_super *osb, 3676 struct ocfs2_lock_res *lockres, 3677 struct ocfs2_unblock_ctl *ctl) 3678 { 3679 unsigned long flags; 3680 int blocking; 3681 int new_level; 3682 int level; 3683 int ret = 0; 3684 int set_lvb = 0; 3685 unsigned int gen; 3686 3687 spin_lock_irqsave(&lockres->l_lock, flags); 3688 3689 recheck: 3690 /* 3691 * Is it still blocking? If not, we have no more work to do. 3692 */ 3693 if (!(lockres->l_flags & OCFS2_LOCK_BLOCKED)) { 3694 BUG_ON(lockres->l_blocking != DLM_LOCK_NL); 3695 spin_unlock_irqrestore(&lockres->l_lock, flags); 3696 ret = 0; 3697 goto leave; 3698 } 3699 3700 if (lockres->l_flags & OCFS2_LOCK_BUSY) { 3701 /* XXX 3702 * This is a *big* race. The OCFS2_LOCK_PENDING flag 3703 * exists entirely for one reason - another thread has set 3704 * OCFS2_LOCK_BUSY, but has *NOT* yet called dlm_lock(). 3705 * 3706 * If we do ocfs2_cancel_convert() before the other thread 3707 * calls dlm_lock(), our cancel will do nothing. We will 3708 * get no ast, and we will have no way of knowing the 3709 * cancel failed. Meanwhile, the other thread will call 3710 * into dlm_lock() and wait...forever. 3711 * 3712 * Why forever? Because another node has asked for the 3713 * lock first; that's why we're here in unblock_lock(). 3714 * 3715 * The solution is OCFS2_LOCK_PENDING. When PENDING is 3716 * set, we just requeue the unblock. Only when the other 3717 * thread has called dlm_lock() and cleared PENDING will 3718 * we then cancel their request. 3719 * 3720 * All callers of dlm_lock() must set OCFS2_DLM_PENDING 3721 * at the same time they set OCFS2_DLM_BUSY. They must 3722 * clear OCFS2_DLM_PENDING after dlm_lock() returns. 3723 */ 3724 if (lockres->l_flags & OCFS2_LOCK_PENDING) { 3725 mlog(ML_BASTS, "lockres %s, ReQ: Pending\n", 3726 lockres->l_name); 3727 goto leave_requeue; 3728 } 3729 3730 ctl->requeue = 1; 3731 ret = ocfs2_prepare_cancel_convert(osb, lockres); 3732 spin_unlock_irqrestore(&lockres->l_lock, flags); 3733 if (ret) { 3734 ret = ocfs2_cancel_convert(osb, lockres); 3735 if (ret < 0) 3736 mlog_errno(ret); 3737 } 3738 goto leave; 3739 } 3740 3741 /* 3742 * This prevents livelocks. OCFS2_LOCK_UPCONVERT_FINISHING flag is 3743 * set when the ast is received for an upconvert just before the 3744 * OCFS2_LOCK_BUSY flag is cleared. Now if the fs received a bast 3745 * on the heels of the ast, we want to delay the downconvert just 3746 * enough to allow the up requestor to do its task. Because this 3747 * lock is in the blocked queue, the lock will be downconverted 3748 * as soon as the requestor is done with the lock. 3749 */ 3750 if (lockres->l_flags & OCFS2_LOCK_UPCONVERT_FINISHING) 3751 goto leave_requeue; 3752 3753 /* 3754 * How can we block and yet be at NL? We were trying to upconvert 3755 * from NL and got canceled. The code comes back here, and now 3756 * we notice and clear BLOCKING. 3757 */ 3758 if (lockres->l_level == DLM_LOCK_NL) { 3759 BUG_ON(lockres->l_ex_holders || lockres->l_ro_holders); 3760 mlog(ML_BASTS, "lockres %s, Aborting dc\n", lockres->l_name); 3761 lockres->l_blocking = DLM_LOCK_NL; 3762 lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED); 3763 spin_unlock_irqrestore(&lockres->l_lock, flags); 3764 goto leave; 3765 } 3766 3767 /* if we're blocking an exclusive and we have *any* holders, 3768 * then requeue. */ 3769 if ((lockres->l_blocking == DLM_LOCK_EX) 3770 && (lockres->l_ex_holders || lockres->l_ro_holders)) { 3771 mlog(ML_BASTS, "lockres %s, ReQ: EX/PR Holders %u,%u\n", 3772 lockres->l_name, lockres->l_ex_holders, 3773 lockres->l_ro_holders); 3774 goto leave_requeue; 3775 } 3776 3777 /* If it's a PR we're blocking, then only 3778 * requeue if we've got any EX holders */ 3779 if (lockres->l_blocking == DLM_LOCK_PR && 3780 lockres->l_ex_holders) { 3781 mlog(ML_BASTS, "lockres %s, ReQ: EX Holders %u\n", 3782 lockres->l_name, lockres->l_ex_holders); 3783 goto leave_requeue; 3784 } 3785 3786 /* 3787 * Can we get a lock in this state if the holder counts are 3788 * zero? The meta data unblock code used to check this. 3789 */ 3790 if ((lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 3791 && (lockres->l_flags & OCFS2_LOCK_REFRESHING)) { 3792 mlog(ML_BASTS, "lockres %s, ReQ: Lock Refreshing\n", 3793 lockres->l_name); 3794 goto leave_requeue; 3795 } 3796 3797 new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking); 3798 3799 if (lockres->l_ops->check_downconvert 3800 && !lockres->l_ops->check_downconvert(lockres, new_level)) { 3801 mlog(ML_BASTS, "lockres %s, ReQ: Checkpointing\n", 3802 lockres->l_name); 3803 goto leave_requeue; 3804 } 3805 3806 /* If we get here, then we know that there are no more 3807 * incompatible holders (and anyone asking for an incompatible 3808 * lock is blocked). We can now downconvert the lock */ 3809 if (!lockres->l_ops->downconvert_worker) 3810 goto downconvert; 3811 3812 /* Some lockres types want to do a bit of work before 3813 * downconverting a lock. Allow that here. The worker function 3814 * may sleep, so we save off a copy of what we're blocking as 3815 * it may change while we're not holding the spin lock. */ 3816 blocking = lockres->l_blocking; 3817 level = lockres->l_level; 3818 spin_unlock_irqrestore(&lockres->l_lock, flags); 3819 3820 ctl->unblock_action = lockres->l_ops->downconvert_worker(lockres, blocking); 3821 3822 if (ctl->unblock_action == UNBLOCK_STOP_POST) { 3823 mlog(ML_BASTS, "lockres %s, UNBLOCK_STOP_POST\n", 3824 lockres->l_name); 3825 goto leave; 3826 } 3827 3828 spin_lock_irqsave(&lockres->l_lock, flags); 3829 if ((blocking != lockres->l_blocking) || (level != lockres->l_level)) { 3830 /* If this changed underneath us, then we can't drop 3831 * it just yet. */ 3832 mlog(ML_BASTS, "lockres %s, block=%d:%d, level=%d:%d, " 3833 "Recheck\n", lockres->l_name, blocking, 3834 lockres->l_blocking, level, lockres->l_level); 3835 goto recheck; 3836 } 3837 3838 downconvert: 3839 ctl->requeue = 0; 3840 3841 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) { 3842 if (lockres->l_level == DLM_LOCK_EX) 3843 set_lvb = 1; 3844 3845 /* 3846 * We only set the lvb if the lock has been fully 3847 * refreshed - otherwise we risk setting stale 3848 * data. Otherwise, there's no need to actually clear 3849 * out the lvb here as it's value is still valid. 3850 */ 3851 if (set_lvb && !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) 3852 lockres->l_ops->set_lvb(lockres); 3853 } 3854 3855 gen = ocfs2_prepare_downconvert(lockres, new_level); 3856 spin_unlock_irqrestore(&lockres->l_lock, flags); 3857 ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb, 3858 gen); 3859 3860 leave: 3861 if (ret) 3862 mlog_errno(ret); 3863 return ret; 3864 3865 leave_requeue: 3866 spin_unlock_irqrestore(&lockres->l_lock, flags); 3867 ctl->requeue = 1; 3868 3869 return 0; 3870 } 3871 3872 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres, 3873 int blocking) 3874 { 3875 struct inode *inode; 3876 struct address_space *mapping; 3877 struct ocfs2_inode_info *oi; 3878 3879 inode = ocfs2_lock_res_inode(lockres); 3880 mapping = inode->i_mapping; 3881 3882 if (S_ISDIR(inode->i_mode)) { 3883 oi = OCFS2_I(inode); 3884 oi->ip_dir_lock_gen++; 3885 mlog(0, "generation: %u\n", oi->ip_dir_lock_gen); 3886 goto out; 3887 } 3888 3889 if (!S_ISREG(inode->i_mode)) 3890 goto out; 3891 3892 /* 3893 * We need this before the filemap_fdatawrite() so that it can 3894 * transfer the dirty bit from the PTE to the 3895 * page. Unfortunately this means that even for EX->PR 3896 * downconverts, we'll lose our mappings and have to build 3897 * them up again. 3898 */ 3899 unmap_mapping_range(mapping, 0, 0, 0); 3900 3901 if (filemap_fdatawrite(mapping)) { 3902 mlog(ML_ERROR, "Could not sync inode %llu for downconvert!", 3903 (unsigned long long)OCFS2_I(inode)->ip_blkno); 3904 } 3905 sync_mapping_buffers(mapping); 3906 if (blocking == DLM_LOCK_EX) { 3907 truncate_inode_pages(mapping, 0); 3908 } else { 3909 /* We only need to wait on the I/O if we're not also 3910 * truncating pages because truncate_inode_pages waits 3911 * for us above. We don't truncate pages if we're 3912 * blocking anything < EXMODE because we want to keep 3913 * them around in that case. */ 3914 filemap_fdatawait(mapping); 3915 } 3916 3917 forget_all_cached_acls(inode); 3918 3919 out: 3920 return UNBLOCK_CONTINUE; 3921 } 3922 3923 static int ocfs2_ci_checkpointed(struct ocfs2_caching_info *ci, 3924 struct ocfs2_lock_res *lockres, 3925 int new_level) 3926 { 3927 int checkpointed = ocfs2_ci_fully_checkpointed(ci); 3928 3929 BUG_ON(new_level != DLM_LOCK_NL && new_level != DLM_LOCK_PR); 3930 BUG_ON(lockres->l_level != DLM_LOCK_EX && !checkpointed); 3931 3932 if (checkpointed) 3933 return 1; 3934 3935 ocfs2_start_checkpoint(OCFS2_SB(ocfs2_metadata_cache_get_super(ci))); 3936 return 0; 3937 } 3938 3939 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres, 3940 int new_level) 3941 { 3942 struct inode *inode = ocfs2_lock_res_inode(lockres); 3943 3944 return ocfs2_ci_checkpointed(INODE_CACHE(inode), lockres, new_level); 3945 } 3946 3947 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres) 3948 { 3949 struct inode *inode = ocfs2_lock_res_inode(lockres); 3950 3951 __ocfs2_stuff_meta_lvb(inode); 3952 } 3953 3954 /* 3955 * Does the final reference drop on our dentry lock. Right now this 3956 * happens in the downconvert thread, but we could choose to simplify the 3957 * dlmglue API and push these off to the ocfs2_wq in the future. 3958 */ 3959 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb, 3960 struct ocfs2_lock_res *lockres) 3961 { 3962 struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres); 3963 ocfs2_dentry_lock_put(osb, dl); 3964 } 3965 3966 /* 3967 * d_delete() matching dentries before the lock downconvert. 3968 * 3969 * At this point, any process waiting to destroy the 3970 * dentry_lock due to last ref count is stopped by the 3971 * OCFS2_LOCK_QUEUED flag. 3972 * 3973 * We have two potential problems 3974 * 3975 * 1) If we do the last reference drop on our dentry_lock (via dput) 3976 * we'll wind up in ocfs2_release_dentry_lock(), waiting on 3977 * the downconvert to finish. Instead we take an elevated 3978 * reference and push the drop until after we've completed our 3979 * unblock processing. 3980 * 3981 * 2) There might be another process with a final reference, 3982 * waiting on us to finish processing. If this is the case, we 3983 * detect it and exit out - there's no more dentries anyway. 3984 */ 3985 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres, 3986 int blocking) 3987 { 3988 struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres); 3989 struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode); 3990 struct dentry *dentry; 3991 unsigned long flags; 3992 int extra_ref = 0; 3993 3994 /* 3995 * This node is blocking another node from getting a read 3996 * lock. This happens when we've renamed within a 3997 * directory. We've forced the other nodes to d_delete(), but 3998 * we never actually dropped our lock because it's still 3999 * valid. The downconvert code will retain a PR for this node, 4000 * so there's no further work to do. 4001 */ 4002 if (blocking == DLM_LOCK_PR) 4003 return UNBLOCK_CONTINUE; 4004 4005 /* 4006 * Mark this inode as potentially orphaned. The code in 4007 * ocfs2_delete_inode() will figure out whether it actually 4008 * needs to be freed or not. 4009 */ 4010 spin_lock(&oi->ip_lock); 4011 oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED; 4012 spin_unlock(&oi->ip_lock); 4013 4014 /* 4015 * Yuck. We need to make sure however that the check of 4016 * OCFS2_LOCK_FREEING and the extra reference are atomic with 4017 * respect to a reference decrement or the setting of that 4018 * flag. 4019 */ 4020 spin_lock_irqsave(&lockres->l_lock, flags); 4021 spin_lock(&dentry_attach_lock); 4022 if (!(lockres->l_flags & OCFS2_LOCK_FREEING) 4023 && dl->dl_count) { 4024 dl->dl_count++; 4025 extra_ref = 1; 4026 } 4027 spin_unlock(&dentry_attach_lock); 4028 spin_unlock_irqrestore(&lockres->l_lock, flags); 4029 4030 mlog(0, "extra_ref = %d\n", extra_ref); 4031 4032 /* 4033 * We have a process waiting on us in ocfs2_dentry_iput(), 4034 * which means we can't have any more outstanding 4035 * aliases. There's no need to do any more work. 4036 */ 4037 if (!extra_ref) 4038 return UNBLOCK_CONTINUE; 4039 4040 spin_lock(&dentry_attach_lock); 4041 while (1) { 4042 dentry = ocfs2_find_local_alias(dl->dl_inode, 4043 dl->dl_parent_blkno, 1); 4044 if (!dentry) 4045 break; 4046 spin_unlock(&dentry_attach_lock); 4047 4048 if (S_ISDIR(dl->dl_inode->i_mode)) 4049 shrink_dcache_parent(dentry); 4050 4051 mlog(0, "d_delete(%pd);\n", dentry); 4052 4053 /* 4054 * The following dcache calls may do an 4055 * iput(). Normally we don't want that from the 4056 * downconverting thread, but in this case it's ok 4057 * because the requesting node already has an 4058 * exclusive lock on the inode, so it can't be queued 4059 * for a downconvert. 4060 */ 4061 d_delete(dentry); 4062 dput(dentry); 4063 4064 spin_lock(&dentry_attach_lock); 4065 } 4066 spin_unlock(&dentry_attach_lock); 4067 4068 /* 4069 * If we are the last holder of this dentry lock, there is no 4070 * reason to downconvert so skip straight to the unlock. 4071 */ 4072 if (dl->dl_count == 1) 4073 return UNBLOCK_STOP_POST; 4074 4075 return UNBLOCK_CONTINUE_POST; 4076 } 4077 4078 static int ocfs2_check_refcount_downconvert(struct ocfs2_lock_res *lockres, 4079 int new_level) 4080 { 4081 struct ocfs2_refcount_tree *tree = 4082 ocfs2_lock_res_refcount_tree(lockres); 4083 4084 return ocfs2_ci_checkpointed(&tree->rf_ci, lockres, new_level); 4085 } 4086 4087 static int ocfs2_refcount_convert_worker(struct ocfs2_lock_res *lockres, 4088 int blocking) 4089 { 4090 struct ocfs2_refcount_tree *tree = 4091 ocfs2_lock_res_refcount_tree(lockres); 4092 4093 ocfs2_metadata_cache_purge(&tree->rf_ci); 4094 4095 return UNBLOCK_CONTINUE; 4096 } 4097 4098 static void ocfs2_set_qinfo_lvb(struct ocfs2_lock_res *lockres) 4099 { 4100 struct ocfs2_qinfo_lvb *lvb; 4101 struct ocfs2_mem_dqinfo *oinfo = ocfs2_lock_res_qinfo(lockres); 4102 struct mem_dqinfo *info = sb_dqinfo(oinfo->dqi_gi.dqi_sb, 4103 oinfo->dqi_gi.dqi_type); 4104 4105 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 4106 lvb->lvb_version = OCFS2_QINFO_LVB_VERSION; 4107 lvb->lvb_bgrace = cpu_to_be32(info->dqi_bgrace); 4108 lvb->lvb_igrace = cpu_to_be32(info->dqi_igrace); 4109 lvb->lvb_syncms = cpu_to_be32(oinfo->dqi_syncms); 4110 lvb->lvb_blocks = cpu_to_be32(oinfo->dqi_gi.dqi_blocks); 4111 lvb->lvb_free_blk = cpu_to_be32(oinfo->dqi_gi.dqi_free_blk); 4112 lvb->lvb_free_entry = cpu_to_be32(oinfo->dqi_gi.dqi_free_entry); 4113 } 4114 4115 void ocfs2_qinfo_unlock(struct ocfs2_mem_dqinfo *oinfo, int ex) 4116 { 4117 struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock; 4118 struct ocfs2_super *osb = OCFS2_SB(oinfo->dqi_gi.dqi_sb); 4119 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 4120 4121 if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb)) 4122 ocfs2_cluster_unlock(osb, lockres, level); 4123 } 4124 4125 static int ocfs2_refresh_qinfo(struct ocfs2_mem_dqinfo *oinfo) 4126 { 4127 struct mem_dqinfo *info = sb_dqinfo(oinfo->dqi_gi.dqi_sb, 4128 oinfo->dqi_gi.dqi_type); 4129 struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock; 4130 struct ocfs2_qinfo_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 4131 struct buffer_head *bh = NULL; 4132 struct ocfs2_global_disk_dqinfo *gdinfo; 4133 int status = 0; 4134 4135 if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) && 4136 lvb->lvb_version == OCFS2_QINFO_LVB_VERSION) { 4137 info->dqi_bgrace = be32_to_cpu(lvb->lvb_bgrace); 4138 info->dqi_igrace = be32_to_cpu(lvb->lvb_igrace); 4139 oinfo->dqi_syncms = be32_to_cpu(lvb->lvb_syncms); 4140 oinfo->dqi_gi.dqi_blocks = be32_to_cpu(lvb->lvb_blocks); 4141 oinfo->dqi_gi.dqi_free_blk = be32_to_cpu(lvb->lvb_free_blk); 4142 oinfo->dqi_gi.dqi_free_entry = 4143 be32_to_cpu(lvb->lvb_free_entry); 4144 } else { 4145 status = ocfs2_read_quota_phys_block(oinfo->dqi_gqinode, 4146 oinfo->dqi_giblk, &bh); 4147 if (status) { 4148 mlog_errno(status); 4149 goto bail; 4150 } 4151 gdinfo = (struct ocfs2_global_disk_dqinfo *) 4152 (bh->b_data + OCFS2_GLOBAL_INFO_OFF); 4153 info->dqi_bgrace = le32_to_cpu(gdinfo->dqi_bgrace); 4154 info->dqi_igrace = le32_to_cpu(gdinfo->dqi_igrace); 4155 oinfo->dqi_syncms = le32_to_cpu(gdinfo->dqi_syncms); 4156 oinfo->dqi_gi.dqi_blocks = le32_to_cpu(gdinfo->dqi_blocks); 4157 oinfo->dqi_gi.dqi_free_blk = le32_to_cpu(gdinfo->dqi_free_blk); 4158 oinfo->dqi_gi.dqi_free_entry = 4159 le32_to_cpu(gdinfo->dqi_free_entry); 4160 brelse(bh); 4161 ocfs2_track_lock_refresh(lockres); 4162 } 4163 4164 bail: 4165 return status; 4166 } 4167 4168 /* Lock quota info, this function expects at least shared lock on the quota file 4169 * so that we can safely refresh quota info from disk. */ 4170 int ocfs2_qinfo_lock(struct ocfs2_mem_dqinfo *oinfo, int ex) 4171 { 4172 struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock; 4173 struct ocfs2_super *osb = OCFS2_SB(oinfo->dqi_gi.dqi_sb); 4174 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 4175 int status = 0; 4176 4177 /* On RO devices, locking really isn't needed... */ 4178 if (ocfs2_is_hard_readonly(osb)) { 4179 if (ex) 4180 status = -EROFS; 4181 goto bail; 4182 } 4183 if (ocfs2_mount_local(osb)) 4184 goto bail; 4185 4186 status = ocfs2_cluster_lock(osb, lockres, level, 0, 0); 4187 if (status < 0) { 4188 mlog_errno(status); 4189 goto bail; 4190 } 4191 if (!ocfs2_should_refresh_lock_res(lockres)) 4192 goto bail; 4193 /* OK, we have the lock but we need to refresh the quota info */ 4194 status = ocfs2_refresh_qinfo(oinfo); 4195 if (status) 4196 ocfs2_qinfo_unlock(oinfo, ex); 4197 ocfs2_complete_lock_res_refresh(lockres, status); 4198 bail: 4199 return status; 4200 } 4201 4202 int ocfs2_refcount_lock(struct ocfs2_refcount_tree *ref_tree, int ex) 4203 { 4204 int status; 4205 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 4206 struct ocfs2_lock_res *lockres = &ref_tree->rf_lockres; 4207 struct ocfs2_super *osb = lockres->l_priv; 4208 4209 4210 if (ocfs2_is_hard_readonly(osb)) 4211 return -EROFS; 4212 4213 if (ocfs2_mount_local(osb)) 4214 return 0; 4215 4216 status = ocfs2_cluster_lock(osb, lockres, level, 0, 0); 4217 if (status < 0) 4218 mlog_errno(status); 4219 4220 return status; 4221 } 4222 4223 void ocfs2_refcount_unlock(struct ocfs2_refcount_tree *ref_tree, int ex) 4224 { 4225 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 4226 struct ocfs2_lock_res *lockres = &ref_tree->rf_lockres; 4227 struct ocfs2_super *osb = lockres->l_priv; 4228 4229 if (!ocfs2_mount_local(osb)) 4230 ocfs2_cluster_unlock(osb, lockres, level); 4231 } 4232 4233 static void ocfs2_process_blocked_lock(struct ocfs2_super *osb, 4234 struct ocfs2_lock_res *lockres) 4235 { 4236 int status; 4237 struct ocfs2_unblock_ctl ctl = {0, 0,}; 4238 unsigned long flags; 4239 4240 /* Our reference to the lockres in this function can be 4241 * considered valid until we remove the OCFS2_LOCK_QUEUED 4242 * flag. */ 4243 4244 BUG_ON(!lockres); 4245 BUG_ON(!lockres->l_ops); 4246 4247 mlog(ML_BASTS, "lockres %s blocked\n", lockres->l_name); 4248 4249 /* Detect whether a lock has been marked as going away while 4250 * the downconvert thread was processing other things. A lock can 4251 * still be marked with OCFS2_LOCK_FREEING after this check, 4252 * but short circuiting here will still save us some 4253 * performance. */ 4254 spin_lock_irqsave(&lockres->l_lock, flags); 4255 if (lockres->l_flags & OCFS2_LOCK_FREEING) 4256 goto unqueue; 4257 spin_unlock_irqrestore(&lockres->l_lock, flags); 4258 4259 status = ocfs2_unblock_lock(osb, lockres, &ctl); 4260 if (status < 0) 4261 mlog_errno(status); 4262 4263 spin_lock_irqsave(&lockres->l_lock, flags); 4264 unqueue: 4265 if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) { 4266 lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED); 4267 } else 4268 ocfs2_schedule_blocked_lock(osb, lockres); 4269 4270 mlog(ML_BASTS, "lockres %s, requeue = %s.\n", lockres->l_name, 4271 ctl.requeue ? "yes" : "no"); 4272 spin_unlock_irqrestore(&lockres->l_lock, flags); 4273 4274 if (ctl.unblock_action != UNBLOCK_CONTINUE 4275 && lockres->l_ops->post_unlock) 4276 lockres->l_ops->post_unlock(osb, lockres); 4277 } 4278 4279 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb, 4280 struct ocfs2_lock_res *lockres) 4281 { 4282 unsigned long flags; 4283 4284 assert_spin_locked(&lockres->l_lock); 4285 4286 if (lockres->l_flags & OCFS2_LOCK_FREEING) { 4287 /* Do not schedule a lock for downconvert when it's on 4288 * the way to destruction - any nodes wanting access 4289 * to the resource will get it soon. */ 4290 mlog(ML_BASTS, "lockres %s won't be scheduled: flags 0x%lx\n", 4291 lockres->l_name, lockres->l_flags); 4292 return; 4293 } 4294 4295 lockres_or_flags(lockres, OCFS2_LOCK_QUEUED); 4296 4297 spin_lock_irqsave(&osb->dc_task_lock, flags); 4298 if (list_empty(&lockres->l_blocked_list)) { 4299 list_add_tail(&lockres->l_blocked_list, 4300 &osb->blocked_lock_list); 4301 osb->blocked_lock_count++; 4302 } 4303 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4304 } 4305 4306 static void ocfs2_downconvert_thread_do_work(struct ocfs2_super *osb) 4307 { 4308 unsigned long processed; 4309 unsigned long flags; 4310 struct ocfs2_lock_res *lockres; 4311 4312 spin_lock_irqsave(&osb->dc_task_lock, flags); 4313 /* grab this early so we know to try again if a state change and 4314 * wake happens part-way through our work */ 4315 osb->dc_work_sequence = osb->dc_wake_sequence; 4316 4317 processed = osb->blocked_lock_count; 4318 /* 4319 * blocked lock processing in this loop might call iput which can 4320 * remove items off osb->blocked_lock_list. Downconvert up to 4321 * 'processed' number of locks, but stop short if we had some 4322 * removed in ocfs2_mark_lockres_freeing when downconverting. 4323 */ 4324 while (processed && !list_empty(&osb->blocked_lock_list)) { 4325 lockres = list_entry(osb->blocked_lock_list.next, 4326 struct ocfs2_lock_res, l_blocked_list); 4327 list_del_init(&lockres->l_blocked_list); 4328 osb->blocked_lock_count--; 4329 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4330 4331 BUG_ON(!processed); 4332 processed--; 4333 4334 ocfs2_process_blocked_lock(osb, lockres); 4335 4336 spin_lock_irqsave(&osb->dc_task_lock, flags); 4337 } 4338 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4339 } 4340 4341 static int ocfs2_downconvert_thread_lists_empty(struct ocfs2_super *osb) 4342 { 4343 int empty = 0; 4344 unsigned long flags; 4345 4346 spin_lock_irqsave(&osb->dc_task_lock, flags); 4347 if (list_empty(&osb->blocked_lock_list)) 4348 empty = 1; 4349 4350 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4351 return empty; 4352 } 4353 4354 static int ocfs2_downconvert_thread_should_wake(struct ocfs2_super *osb) 4355 { 4356 int should_wake = 0; 4357 unsigned long flags; 4358 4359 spin_lock_irqsave(&osb->dc_task_lock, flags); 4360 if (osb->dc_work_sequence != osb->dc_wake_sequence) 4361 should_wake = 1; 4362 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4363 4364 return should_wake; 4365 } 4366 4367 static int ocfs2_downconvert_thread(void *arg) 4368 { 4369 int status = 0; 4370 struct ocfs2_super *osb = arg; 4371 4372 /* only quit once we've been asked to stop and there is no more 4373 * work available */ 4374 while (!(kthread_should_stop() && 4375 ocfs2_downconvert_thread_lists_empty(osb))) { 4376 4377 wait_event_interruptible(osb->dc_event, 4378 ocfs2_downconvert_thread_should_wake(osb) || 4379 kthread_should_stop()); 4380 4381 mlog(0, "downconvert_thread: awoken\n"); 4382 4383 ocfs2_downconvert_thread_do_work(osb); 4384 } 4385 4386 osb->dc_task = NULL; 4387 return status; 4388 } 4389 4390 void ocfs2_wake_downconvert_thread(struct ocfs2_super *osb) 4391 { 4392 unsigned long flags; 4393 4394 spin_lock_irqsave(&osb->dc_task_lock, flags); 4395 /* make sure the voting thread gets a swipe at whatever changes 4396 * the caller may have made to the voting state */ 4397 osb->dc_wake_sequence++; 4398 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4399 wake_up(&osb->dc_event); 4400 } 4401