1 /* -*- mode: c; c-basic-offset: 8; -*- 2 * vim: noexpandtab sw=8 ts=8 sts=0: 3 * 4 * dlmglue.c 5 * 6 * Code which implements an OCFS2 specific interface to our DLM. 7 * 8 * Copyright (C) 2003, 2004 Oracle. All rights reserved. 9 * 10 * This program is free software; you can redistribute it and/or 11 * modify it under the terms of the GNU General Public 12 * License as published by the Free Software Foundation; either 13 * version 2 of the License, or (at your option) any later version. 14 * 15 * This program is distributed in the hope that it will be useful, 16 * but WITHOUT ANY WARRANTY; without even the implied warranty of 17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 18 * General Public License for more details. 19 * 20 * You should have received a copy of the GNU General Public 21 * License along with this program; if not, write to the 22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 23 * Boston, MA 021110-1307, USA. 24 */ 25 26 #include <linux/types.h> 27 #include <linux/slab.h> 28 #include <linux/highmem.h> 29 #include <linux/mm.h> 30 #include <linux/kthread.h> 31 #include <linux/pagemap.h> 32 #include <linux/debugfs.h> 33 #include <linux/seq_file.h> 34 #include <linux/time.h> 35 #include <linux/quotaops.h> 36 #include <linux/sched/signal.h> 37 38 #define MLOG_MASK_PREFIX ML_DLM_GLUE 39 #include <cluster/masklog.h> 40 41 #include "ocfs2.h" 42 #include "ocfs2_lockingver.h" 43 44 #include "alloc.h" 45 #include "dcache.h" 46 #include "dlmglue.h" 47 #include "extent_map.h" 48 #include "file.h" 49 #include "heartbeat.h" 50 #include "inode.h" 51 #include "journal.h" 52 #include "stackglue.h" 53 #include "slot_map.h" 54 #include "super.h" 55 #include "uptodate.h" 56 #include "quota.h" 57 #include "refcounttree.h" 58 #include "acl.h" 59 60 #include "buffer_head_io.h" 61 62 struct ocfs2_mask_waiter { 63 struct list_head mw_item; 64 int mw_status; 65 struct completion mw_complete; 66 unsigned long mw_mask; 67 unsigned long mw_goal; 68 #ifdef CONFIG_OCFS2_FS_STATS 69 ktime_t mw_lock_start; 70 #endif 71 }; 72 73 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres); 74 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres); 75 static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres); 76 static struct ocfs2_super *ocfs2_get_qinfo_osb(struct ocfs2_lock_res *lockres); 77 78 /* 79 * Return value from ->downconvert_worker functions. 80 * 81 * These control the precise actions of ocfs2_unblock_lock() 82 * and ocfs2_process_blocked_lock() 83 * 84 */ 85 enum ocfs2_unblock_action { 86 UNBLOCK_CONTINUE = 0, /* Continue downconvert */ 87 UNBLOCK_CONTINUE_POST = 1, /* Continue downconvert, fire 88 * ->post_unlock callback */ 89 UNBLOCK_STOP_POST = 2, /* Do not downconvert, fire 90 * ->post_unlock() callback. */ 91 }; 92 93 struct ocfs2_unblock_ctl { 94 int requeue; 95 enum ocfs2_unblock_action unblock_action; 96 }; 97 98 /* Lockdep class keys */ 99 #ifdef CONFIG_DEBUG_LOCK_ALLOC 100 static struct lock_class_key lockdep_keys[OCFS2_NUM_LOCK_TYPES]; 101 #endif 102 103 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres, 104 int new_level); 105 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres); 106 107 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres, 108 int blocking); 109 110 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres, 111 int blocking); 112 113 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb, 114 struct ocfs2_lock_res *lockres); 115 116 static void ocfs2_set_qinfo_lvb(struct ocfs2_lock_res *lockres); 117 118 static int ocfs2_check_refcount_downconvert(struct ocfs2_lock_res *lockres, 119 int new_level); 120 static int ocfs2_refcount_convert_worker(struct ocfs2_lock_res *lockres, 121 int blocking); 122 123 #define mlog_meta_lvb(__level, __lockres) ocfs2_dump_meta_lvb_info(__level, __PRETTY_FUNCTION__, __LINE__, __lockres) 124 125 /* This aids in debugging situations where a bad LVB might be involved. */ 126 static void ocfs2_dump_meta_lvb_info(u64 level, 127 const char *function, 128 unsigned int line, 129 struct ocfs2_lock_res *lockres) 130 { 131 struct ocfs2_meta_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 132 133 mlog(level, "LVB information for %s (called from %s:%u):\n", 134 lockres->l_name, function, line); 135 mlog(level, "version: %u, clusters: %u, generation: 0x%x\n", 136 lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters), 137 be32_to_cpu(lvb->lvb_igeneration)); 138 mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n", 139 (unsigned long long)be64_to_cpu(lvb->lvb_isize), 140 be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid), 141 be16_to_cpu(lvb->lvb_imode)); 142 mlog(level, "nlink %u, atime_packed 0x%llx, ctime_packed 0x%llx, " 143 "mtime_packed 0x%llx iattr 0x%x\n", be16_to_cpu(lvb->lvb_inlink), 144 (long long)be64_to_cpu(lvb->lvb_iatime_packed), 145 (long long)be64_to_cpu(lvb->lvb_ictime_packed), 146 (long long)be64_to_cpu(lvb->lvb_imtime_packed), 147 be32_to_cpu(lvb->lvb_iattr)); 148 } 149 150 151 /* 152 * OCFS2 Lock Resource Operations 153 * 154 * These fine tune the behavior of the generic dlmglue locking infrastructure. 155 * 156 * The most basic of lock types can point ->l_priv to their respective 157 * struct ocfs2_super and allow the default actions to manage things. 158 * 159 * Right now, each lock type also needs to implement an init function, 160 * and trivial lock/unlock wrappers. ocfs2_simple_drop_lockres() 161 * should be called when the lock is no longer needed (i.e., object 162 * destruction time). 163 */ 164 struct ocfs2_lock_res_ops { 165 /* 166 * Translate an ocfs2_lock_res * into an ocfs2_super *. Define 167 * this callback if ->l_priv is not an ocfs2_super pointer 168 */ 169 struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *); 170 171 /* 172 * Optionally called in the downconvert thread after a 173 * successful downconvert. The lockres will not be referenced 174 * after this callback is called, so it is safe to free 175 * memory, etc. 176 * 177 * The exact semantics of when this is called are controlled 178 * by ->downconvert_worker() 179 */ 180 void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *); 181 182 /* 183 * Allow a lock type to add checks to determine whether it is 184 * safe to downconvert a lock. Return 0 to re-queue the 185 * downconvert at a later time, nonzero to continue. 186 * 187 * For most locks, the default checks that there are no 188 * incompatible holders are sufficient. 189 * 190 * Called with the lockres spinlock held. 191 */ 192 int (*check_downconvert)(struct ocfs2_lock_res *, int); 193 194 /* 195 * Allows a lock type to populate the lock value block. This 196 * is called on downconvert, and when we drop a lock. 197 * 198 * Locks that want to use this should set LOCK_TYPE_USES_LVB 199 * in the flags field. 200 * 201 * Called with the lockres spinlock held. 202 */ 203 void (*set_lvb)(struct ocfs2_lock_res *); 204 205 /* 206 * Called from the downconvert thread when it is determined 207 * that a lock will be downconverted. This is called without 208 * any locks held so the function can do work that might 209 * schedule (syncing out data, etc). 210 * 211 * This should return any one of the ocfs2_unblock_action 212 * values, depending on what it wants the thread to do. 213 */ 214 int (*downconvert_worker)(struct ocfs2_lock_res *, int); 215 216 /* 217 * LOCK_TYPE_* flags which describe the specific requirements 218 * of a lock type. Descriptions of each individual flag follow. 219 */ 220 int flags; 221 }; 222 223 /* 224 * Some locks want to "refresh" potentially stale data when a 225 * meaningful (PRMODE or EXMODE) lock level is first obtained. If this 226 * flag is set, the OCFS2_LOCK_NEEDS_REFRESH flag will be set on the 227 * individual lockres l_flags member from the ast function. It is 228 * expected that the locking wrapper will clear the 229 * OCFS2_LOCK_NEEDS_REFRESH flag when done. 230 */ 231 #define LOCK_TYPE_REQUIRES_REFRESH 0x1 232 233 /* 234 * Indicate that a lock type makes use of the lock value block. The 235 * ->set_lvb lock type callback must be defined. 236 */ 237 #define LOCK_TYPE_USES_LVB 0x2 238 239 static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = { 240 .get_osb = ocfs2_get_inode_osb, 241 .flags = 0, 242 }; 243 244 static struct ocfs2_lock_res_ops ocfs2_inode_inode_lops = { 245 .get_osb = ocfs2_get_inode_osb, 246 .check_downconvert = ocfs2_check_meta_downconvert, 247 .set_lvb = ocfs2_set_meta_lvb, 248 .downconvert_worker = ocfs2_data_convert_worker, 249 .flags = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB, 250 }; 251 252 static struct ocfs2_lock_res_ops ocfs2_super_lops = { 253 .flags = LOCK_TYPE_REQUIRES_REFRESH, 254 }; 255 256 static struct ocfs2_lock_res_ops ocfs2_rename_lops = { 257 .flags = 0, 258 }; 259 260 static struct ocfs2_lock_res_ops ocfs2_nfs_sync_lops = { 261 .flags = 0, 262 }; 263 264 static struct ocfs2_lock_res_ops ocfs2_trim_fs_lops = { 265 .flags = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB, 266 }; 267 268 static struct ocfs2_lock_res_ops ocfs2_orphan_scan_lops = { 269 .flags = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB, 270 }; 271 272 static struct ocfs2_lock_res_ops ocfs2_dentry_lops = { 273 .get_osb = ocfs2_get_dentry_osb, 274 .post_unlock = ocfs2_dentry_post_unlock, 275 .downconvert_worker = ocfs2_dentry_convert_worker, 276 .flags = 0, 277 }; 278 279 static struct ocfs2_lock_res_ops ocfs2_inode_open_lops = { 280 .get_osb = ocfs2_get_inode_osb, 281 .flags = 0, 282 }; 283 284 static struct ocfs2_lock_res_ops ocfs2_flock_lops = { 285 .get_osb = ocfs2_get_file_osb, 286 .flags = 0, 287 }; 288 289 static struct ocfs2_lock_res_ops ocfs2_qinfo_lops = { 290 .set_lvb = ocfs2_set_qinfo_lvb, 291 .get_osb = ocfs2_get_qinfo_osb, 292 .flags = LOCK_TYPE_REQUIRES_REFRESH | LOCK_TYPE_USES_LVB, 293 }; 294 295 static struct ocfs2_lock_res_ops ocfs2_refcount_block_lops = { 296 .check_downconvert = ocfs2_check_refcount_downconvert, 297 .downconvert_worker = ocfs2_refcount_convert_worker, 298 .flags = 0, 299 }; 300 301 static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres) 302 { 303 return lockres->l_type == OCFS2_LOCK_TYPE_META || 304 lockres->l_type == OCFS2_LOCK_TYPE_RW || 305 lockres->l_type == OCFS2_LOCK_TYPE_OPEN; 306 } 307 308 static inline struct ocfs2_lock_res *ocfs2_lksb_to_lock_res(struct ocfs2_dlm_lksb *lksb) 309 { 310 return container_of(lksb, struct ocfs2_lock_res, l_lksb); 311 } 312 313 static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres) 314 { 315 BUG_ON(!ocfs2_is_inode_lock(lockres)); 316 317 return (struct inode *) lockres->l_priv; 318 } 319 320 static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres) 321 { 322 BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY); 323 324 return (struct ocfs2_dentry_lock *)lockres->l_priv; 325 } 326 327 static inline struct ocfs2_mem_dqinfo *ocfs2_lock_res_qinfo(struct ocfs2_lock_res *lockres) 328 { 329 BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_QINFO); 330 331 return (struct ocfs2_mem_dqinfo *)lockres->l_priv; 332 } 333 334 static inline struct ocfs2_refcount_tree * 335 ocfs2_lock_res_refcount_tree(struct ocfs2_lock_res *res) 336 { 337 return container_of(res, struct ocfs2_refcount_tree, rf_lockres); 338 } 339 340 static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *lockres) 341 { 342 if (lockres->l_ops->get_osb) 343 return lockres->l_ops->get_osb(lockres); 344 345 return (struct ocfs2_super *)lockres->l_priv; 346 } 347 348 static int ocfs2_lock_create(struct ocfs2_super *osb, 349 struct ocfs2_lock_res *lockres, 350 int level, 351 u32 dlm_flags); 352 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres, 353 int wanted); 354 static void __ocfs2_cluster_unlock(struct ocfs2_super *osb, 355 struct ocfs2_lock_res *lockres, 356 int level, unsigned long caller_ip); 357 static inline void ocfs2_cluster_unlock(struct ocfs2_super *osb, 358 struct ocfs2_lock_res *lockres, 359 int level) 360 { 361 __ocfs2_cluster_unlock(osb, lockres, level, _RET_IP_); 362 } 363 364 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres); 365 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres); 366 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres); 367 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, int level); 368 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb, 369 struct ocfs2_lock_res *lockres); 370 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres, 371 int convert); 372 #define ocfs2_log_dlm_error(_func, _err, _lockres) do { \ 373 if ((_lockres)->l_type != OCFS2_LOCK_TYPE_DENTRY) \ 374 mlog(ML_ERROR, "DLM error %d while calling %s on resource %s\n", \ 375 _err, _func, _lockres->l_name); \ 376 else \ 377 mlog(ML_ERROR, "DLM error %d while calling %s on resource %.*s%08x\n", \ 378 _err, _func, OCFS2_DENTRY_LOCK_INO_START - 1, (_lockres)->l_name, \ 379 (unsigned int)ocfs2_get_dentry_lock_ino(_lockres)); \ 380 } while (0) 381 static int ocfs2_downconvert_thread(void *arg); 382 static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb, 383 struct ocfs2_lock_res *lockres); 384 static int ocfs2_inode_lock_update(struct inode *inode, 385 struct buffer_head **bh); 386 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb); 387 static inline int ocfs2_highest_compat_lock_level(int level); 388 static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres, 389 int new_level); 390 static int ocfs2_downconvert_lock(struct ocfs2_super *osb, 391 struct ocfs2_lock_res *lockres, 392 int new_level, 393 int lvb, 394 unsigned int generation); 395 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb, 396 struct ocfs2_lock_res *lockres); 397 static int ocfs2_cancel_convert(struct ocfs2_super *osb, 398 struct ocfs2_lock_res *lockres); 399 400 401 static void ocfs2_build_lock_name(enum ocfs2_lock_type type, 402 u64 blkno, 403 u32 generation, 404 char *name) 405 { 406 int len; 407 408 BUG_ON(type >= OCFS2_NUM_LOCK_TYPES); 409 410 len = snprintf(name, OCFS2_LOCK_ID_MAX_LEN, "%c%s%016llx%08x", 411 ocfs2_lock_type_char(type), OCFS2_LOCK_ID_PAD, 412 (long long)blkno, generation); 413 414 BUG_ON(len != (OCFS2_LOCK_ID_MAX_LEN - 1)); 415 416 mlog(0, "built lock resource with name: %s\n", name); 417 } 418 419 static DEFINE_SPINLOCK(ocfs2_dlm_tracking_lock); 420 421 static void ocfs2_add_lockres_tracking(struct ocfs2_lock_res *res, 422 struct ocfs2_dlm_debug *dlm_debug) 423 { 424 mlog(0, "Add tracking for lockres %s\n", res->l_name); 425 426 spin_lock(&ocfs2_dlm_tracking_lock); 427 list_add(&res->l_debug_list, &dlm_debug->d_lockres_tracking); 428 spin_unlock(&ocfs2_dlm_tracking_lock); 429 } 430 431 static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res) 432 { 433 spin_lock(&ocfs2_dlm_tracking_lock); 434 if (!list_empty(&res->l_debug_list)) 435 list_del_init(&res->l_debug_list); 436 spin_unlock(&ocfs2_dlm_tracking_lock); 437 } 438 439 #ifdef CONFIG_OCFS2_FS_STATS 440 static void ocfs2_init_lock_stats(struct ocfs2_lock_res *res) 441 { 442 res->l_lock_refresh = 0; 443 memset(&res->l_lock_prmode, 0, sizeof(struct ocfs2_lock_stats)); 444 memset(&res->l_lock_exmode, 0, sizeof(struct ocfs2_lock_stats)); 445 } 446 447 static void ocfs2_update_lock_stats(struct ocfs2_lock_res *res, int level, 448 struct ocfs2_mask_waiter *mw, int ret) 449 { 450 u32 usec; 451 ktime_t kt; 452 struct ocfs2_lock_stats *stats; 453 454 if (level == LKM_PRMODE) 455 stats = &res->l_lock_prmode; 456 else if (level == LKM_EXMODE) 457 stats = &res->l_lock_exmode; 458 else 459 return; 460 461 kt = ktime_sub(ktime_get(), mw->mw_lock_start); 462 usec = ktime_to_us(kt); 463 464 stats->ls_gets++; 465 stats->ls_total += ktime_to_ns(kt); 466 /* overflow */ 467 if (unlikely(stats->ls_gets == 0)) { 468 stats->ls_gets++; 469 stats->ls_total = ktime_to_ns(kt); 470 } 471 472 if (stats->ls_max < usec) 473 stats->ls_max = usec; 474 475 if (ret) 476 stats->ls_fail++; 477 } 478 479 static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres) 480 { 481 lockres->l_lock_refresh++; 482 } 483 484 static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw) 485 { 486 mw->mw_lock_start = ktime_get(); 487 } 488 #else 489 static inline void ocfs2_init_lock_stats(struct ocfs2_lock_res *res) 490 { 491 } 492 static inline void ocfs2_update_lock_stats(struct ocfs2_lock_res *res, 493 int level, struct ocfs2_mask_waiter *mw, int ret) 494 { 495 } 496 static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres) 497 { 498 } 499 static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw) 500 { 501 } 502 #endif 503 504 static void ocfs2_lock_res_init_common(struct ocfs2_super *osb, 505 struct ocfs2_lock_res *res, 506 enum ocfs2_lock_type type, 507 struct ocfs2_lock_res_ops *ops, 508 void *priv) 509 { 510 res->l_type = type; 511 res->l_ops = ops; 512 res->l_priv = priv; 513 514 res->l_level = DLM_LOCK_IV; 515 res->l_requested = DLM_LOCK_IV; 516 res->l_blocking = DLM_LOCK_IV; 517 res->l_action = OCFS2_AST_INVALID; 518 res->l_unlock_action = OCFS2_UNLOCK_INVALID; 519 520 res->l_flags = OCFS2_LOCK_INITIALIZED; 521 522 ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug); 523 524 ocfs2_init_lock_stats(res); 525 #ifdef CONFIG_DEBUG_LOCK_ALLOC 526 if (type != OCFS2_LOCK_TYPE_OPEN) 527 lockdep_init_map(&res->l_lockdep_map, ocfs2_lock_type_strings[type], 528 &lockdep_keys[type], 0); 529 else 530 res->l_lockdep_map.key = NULL; 531 #endif 532 } 533 534 void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res) 535 { 536 /* This also clears out the lock status block */ 537 memset(res, 0, sizeof(struct ocfs2_lock_res)); 538 spin_lock_init(&res->l_lock); 539 init_waitqueue_head(&res->l_event); 540 INIT_LIST_HEAD(&res->l_blocked_list); 541 INIT_LIST_HEAD(&res->l_mask_waiters); 542 INIT_LIST_HEAD(&res->l_holders); 543 } 544 545 void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res, 546 enum ocfs2_lock_type type, 547 unsigned int generation, 548 struct inode *inode) 549 { 550 struct ocfs2_lock_res_ops *ops; 551 552 switch(type) { 553 case OCFS2_LOCK_TYPE_RW: 554 ops = &ocfs2_inode_rw_lops; 555 break; 556 case OCFS2_LOCK_TYPE_META: 557 ops = &ocfs2_inode_inode_lops; 558 break; 559 case OCFS2_LOCK_TYPE_OPEN: 560 ops = &ocfs2_inode_open_lops; 561 break; 562 default: 563 mlog_bug_on_msg(1, "type: %d\n", type); 564 ops = NULL; /* thanks, gcc */ 565 break; 566 }; 567 568 ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno, 569 generation, res->l_name); 570 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode); 571 } 572 573 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres) 574 { 575 struct inode *inode = ocfs2_lock_res_inode(lockres); 576 577 return OCFS2_SB(inode->i_sb); 578 } 579 580 static struct ocfs2_super *ocfs2_get_qinfo_osb(struct ocfs2_lock_res *lockres) 581 { 582 struct ocfs2_mem_dqinfo *info = lockres->l_priv; 583 584 return OCFS2_SB(info->dqi_gi.dqi_sb); 585 } 586 587 static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres) 588 { 589 struct ocfs2_file_private *fp = lockres->l_priv; 590 591 return OCFS2_SB(fp->fp_file->f_mapping->host->i_sb); 592 } 593 594 static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres) 595 { 596 __be64 inode_blkno_be; 597 598 memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], 599 sizeof(__be64)); 600 601 return be64_to_cpu(inode_blkno_be); 602 } 603 604 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres) 605 { 606 struct ocfs2_dentry_lock *dl = lockres->l_priv; 607 608 return OCFS2_SB(dl->dl_inode->i_sb); 609 } 610 611 void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl, 612 u64 parent, struct inode *inode) 613 { 614 int len; 615 u64 inode_blkno = OCFS2_I(inode)->ip_blkno; 616 __be64 inode_blkno_be = cpu_to_be64(inode_blkno); 617 struct ocfs2_lock_res *lockres = &dl->dl_lockres; 618 619 ocfs2_lock_res_init_once(lockres); 620 621 /* 622 * Unfortunately, the standard lock naming scheme won't work 623 * here because we have two 16 byte values to use. Instead, 624 * we'll stuff the inode number as a binary value. We still 625 * want error prints to show something without garbling the 626 * display, so drop a null byte in there before the inode 627 * number. A future version of OCFS2 will likely use all 628 * binary lock names. The stringified names have been a 629 * tremendous aid in debugging, but now that the debugfs 630 * interface exists, we can mangle things there if need be. 631 * 632 * NOTE: We also drop the standard "pad" value (the total lock 633 * name size stays the same though - the last part is all 634 * zeros due to the memset in ocfs2_lock_res_init_once() 635 */ 636 len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START, 637 "%c%016llx", 638 ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY), 639 (long long)parent); 640 641 BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1)); 642 643 memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be, 644 sizeof(__be64)); 645 646 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres, 647 OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops, 648 dl); 649 } 650 651 static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res, 652 struct ocfs2_super *osb) 653 { 654 /* Superblock lockres doesn't come from a slab so we call init 655 * once on it manually. */ 656 ocfs2_lock_res_init_once(res); 657 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO, 658 0, res->l_name); 659 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER, 660 &ocfs2_super_lops, osb); 661 } 662 663 static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res, 664 struct ocfs2_super *osb) 665 { 666 /* Rename lockres doesn't come from a slab so we call init 667 * once on it manually. */ 668 ocfs2_lock_res_init_once(res); 669 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name); 670 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME, 671 &ocfs2_rename_lops, osb); 672 } 673 674 static void ocfs2_nfs_sync_lock_res_init(struct ocfs2_lock_res *res, 675 struct ocfs2_super *osb) 676 { 677 /* nfs_sync lockres doesn't come from a slab so we call init 678 * once on it manually. */ 679 ocfs2_lock_res_init_once(res); 680 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_NFS_SYNC, 0, 0, res->l_name); 681 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_NFS_SYNC, 682 &ocfs2_nfs_sync_lops, osb); 683 } 684 685 void ocfs2_trim_fs_lock_res_init(struct ocfs2_super *osb) 686 { 687 struct ocfs2_lock_res *lockres = &osb->osb_trim_fs_lockres; 688 689 ocfs2_lock_res_init_once(lockres); 690 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_TRIM_FS, 0, 0, lockres->l_name); 691 ocfs2_lock_res_init_common(osb, lockres, OCFS2_LOCK_TYPE_TRIM_FS, 692 &ocfs2_trim_fs_lops, osb); 693 } 694 695 void ocfs2_trim_fs_lock_res_uninit(struct ocfs2_super *osb) 696 { 697 struct ocfs2_lock_res *lockres = &osb->osb_trim_fs_lockres; 698 699 ocfs2_simple_drop_lockres(osb, lockres); 700 ocfs2_lock_res_free(lockres); 701 } 702 703 static void ocfs2_orphan_scan_lock_res_init(struct ocfs2_lock_res *res, 704 struct ocfs2_super *osb) 705 { 706 ocfs2_lock_res_init_once(res); 707 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_ORPHAN_SCAN, 0, 0, res->l_name); 708 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_ORPHAN_SCAN, 709 &ocfs2_orphan_scan_lops, osb); 710 } 711 712 void ocfs2_file_lock_res_init(struct ocfs2_lock_res *lockres, 713 struct ocfs2_file_private *fp) 714 { 715 struct inode *inode = fp->fp_file->f_mapping->host; 716 struct ocfs2_inode_info *oi = OCFS2_I(inode); 717 718 ocfs2_lock_res_init_once(lockres); 719 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_FLOCK, oi->ip_blkno, 720 inode->i_generation, lockres->l_name); 721 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres, 722 OCFS2_LOCK_TYPE_FLOCK, &ocfs2_flock_lops, 723 fp); 724 lockres->l_flags |= OCFS2_LOCK_NOCACHE; 725 } 726 727 void ocfs2_qinfo_lock_res_init(struct ocfs2_lock_res *lockres, 728 struct ocfs2_mem_dqinfo *info) 729 { 730 ocfs2_lock_res_init_once(lockres); 731 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_QINFO, info->dqi_gi.dqi_type, 732 0, lockres->l_name); 733 ocfs2_lock_res_init_common(OCFS2_SB(info->dqi_gi.dqi_sb), lockres, 734 OCFS2_LOCK_TYPE_QINFO, &ocfs2_qinfo_lops, 735 info); 736 } 737 738 void ocfs2_refcount_lock_res_init(struct ocfs2_lock_res *lockres, 739 struct ocfs2_super *osb, u64 ref_blkno, 740 unsigned int generation) 741 { 742 ocfs2_lock_res_init_once(lockres); 743 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_REFCOUNT, ref_blkno, 744 generation, lockres->l_name); 745 ocfs2_lock_res_init_common(osb, lockres, OCFS2_LOCK_TYPE_REFCOUNT, 746 &ocfs2_refcount_block_lops, osb); 747 } 748 749 void ocfs2_lock_res_free(struct ocfs2_lock_res *res) 750 { 751 if (!(res->l_flags & OCFS2_LOCK_INITIALIZED)) 752 return; 753 754 ocfs2_remove_lockres_tracking(res); 755 756 mlog_bug_on_msg(!list_empty(&res->l_blocked_list), 757 "Lockres %s is on the blocked list\n", 758 res->l_name); 759 mlog_bug_on_msg(!list_empty(&res->l_mask_waiters), 760 "Lockres %s has mask waiters pending\n", 761 res->l_name); 762 mlog_bug_on_msg(spin_is_locked(&res->l_lock), 763 "Lockres %s is locked\n", 764 res->l_name); 765 mlog_bug_on_msg(res->l_ro_holders, 766 "Lockres %s has %u ro holders\n", 767 res->l_name, res->l_ro_holders); 768 mlog_bug_on_msg(res->l_ex_holders, 769 "Lockres %s has %u ex holders\n", 770 res->l_name, res->l_ex_holders); 771 772 /* Need to clear out the lock status block for the dlm */ 773 memset(&res->l_lksb, 0, sizeof(res->l_lksb)); 774 775 res->l_flags = 0UL; 776 } 777 778 /* 779 * Keep a list of processes who have interest in a lockres. 780 * Note: this is now only uesed for check recursive cluster locking. 781 */ 782 static inline void ocfs2_add_holder(struct ocfs2_lock_res *lockres, 783 struct ocfs2_lock_holder *oh) 784 { 785 INIT_LIST_HEAD(&oh->oh_list); 786 oh->oh_owner_pid = get_pid(task_pid(current)); 787 788 spin_lock(&lockres->l_lock); 789 list_add_tail(&oh->oh_list, &lockres->l_holders); 790 spin_unlock(&lockres->l_lock); 791 } 792 793 static struct ocfs2_lock_holder * 794 ocfs2_pid_holder(struct ocfs2_lock_res *lockres, 795 struct pid *pid) 796 { 797 struct ocfs2_lock_holder *oh; 798 799 spin_lock(&lockres->l_lock); 800 list_for_each_entry(oh, &lockres->l_holders, oh_list) { 801 if (oh->oh_owner_pid == pid) { 802 spin_unlock(&lockres->l_lock); 803 return oh; 804 } 805 } 806 spin_unlock(&lockres->l_lock); 807 return NULL; 808 } 809 810 static inline void ocfs2_remove_holder(struct ocfs2_lock_res *lockres, 811 struct ocfs2_lock_holder *oh) 812 { 813 spin_lock(&lockres->l_lock); 814 list_del(&oh->oh_list); 815 spin_unlock(&lockres->l_lock); 816 817 put_pid(oh->oh_owner_pid); 818 } 819 820 821 static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres, 822 int level) 823 { 824 BUG_ON(!lockres); 825 826 switch(level) { 827 case DLM_LOCK_EX: 828 lockres->l_ex_holders++; 829 break; 830 case DLM_LOCK_PR: 831 lockres->l_ro_holders++; 832 break; 833 default: 834 BUG(); 835 } 836 } 837 838 static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres, 839 int level) 840 { 841 BUG_ON(!lockres); 842 843 switch(level) { 844 case DLM_LOCK_EX: 845 BUG_ON(!lockres->l_ex_holders); 846 lockres->l_ex_holders--; 847 break; 848 case DLM_LOCK_PR: 849 BUG_ON(!lockres->l_ro_holders); 850 lockres->l_ro_holders--; 851 break; 852 default: 853 BUG(); 854 } 855 } 856 857 /* WARNING: This function lives in a world where the only three lock 858 * levels are EX, PR, and NL. It *will* have to be adjusted when more 859 * lock types are added. */ 860 static inline int ocfs2_highest_compat_lock_level(int level) 861 { 862 int new_level = DLM_LOCK_EX; 863 864 if (level == DLM_LOCK_EX) 865 new_level = DLM_LOCK_NL; 866 else if (level == DLM_LOCK_PR) 867 new_level = DLM_LOCK_PR; 868 return new_level; 869 } 870 871 static void lockres_set_flags(struct ocfs2_lock_res *lockres, 872 unsigned long newflags) 873 { 874 struct ocfs2_mask_waiter *mw, *tmp; 875 876 assert_spin_locked(&lockres->l_lock); 877 878 lockres->l_flags = newflags; 879 880 list_for_each_entry_safe(mw, tmp, &lockres->l_mask_waiters, mw_item) { 881 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal) 882 continue; 883 884 list_del_init(&mw->mw_item); 885 mw->mw_status = 0; 886 complete(&mw->mw_complete); 887 } 888 } 889 static void lockres_or_flags(struct ocfs2_lock_res *lockres, unsigned long or) 890 { 891 lockres_set_flags(lockres, lockres->l_flags | or); 892 } 893 static void lockres_clear_flags(struct ocfs2_lock_res *lockres, 894 unsigned long clear) 895 { 896 lockres_set_flags(lockres, lockres->l_flags & ~clear); 897 } 898 899 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres) 900 { 901 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 902 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED)); 903 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); 904 BUG_ON(lockres->l_blocking <= DLM_LOCK_NL); 905 906 lockres->l_level = lockres->l_requested; 907 if (lockres->l_level <= 908 ocfs2_highest_compat_lock_level(lockres->l_blocking)) { 909 lockres->l_blocking = DLM_LOCK_NL; 910 lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED); 911 } 912 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 913 } 914 915 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres) 916 { 917 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 918 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED)); 919 920 /* Convert from RO to EX doesn't really need anything as our 921 * information is already up to data. Convert from NL to 922 * *anything* however should mark ourselves as needing an 923 * update */ 924 if (lockres->l_level == DLM_LOCK_NL && 925 lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 926 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 927 928 lockres->l_level = lockres->l_requested; 929 930 /* 931 * We set the OCFS2_LOCK_UPCONVERT_FINISHING flag before clearing 932 * the OCFS2_LOCK_BUSY flag to prevent the dc thread from 933 * downconverting the lock before the upconvert has fully completed. 934 * Do not prevent the dc thread from downconverting if NONBLOCK lock 935 * had already returned. 936 */ 937 if (!(lockres->l_flags & OCFS2_LOCK_NONBLOCK_FINISHED)) 938 lockres_or_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING); 939 else 940 lockres_clear_flags(lockres, OCFS2_LOCK_NONBLOCK_FINISHED); 941 942 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 943 } 944 945 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres) 946 { 947 BUG_ON((!(lockres->l_flags & OCFS2_LOCK_BUSY))); 948 BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED); 949 950 if (lockres->l_requested > DLM_LOCK_NL && 951 !(lockres->l_flags & OCFS2_LOCK_LOCAL) && 952 lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 953 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 954 955 lockres->l_level = lockres->l_requested; 956 lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED); 957 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 958 } 959 960 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, 961 int level) 962 { 963 int needs_downconvert = 0; 964 965 assert_spin_locked(&lockres->l_lock); 966 967 if (level > lockres->l_blocking) { 968 /* only schedule a downconvert if we haven't already scheduled 969 * one that goes low enough to satisfy the level we're 970 * blocking. this also catches the case where we get 971 * duplicate BASTs */ 972 if (ocfs2_highest_compat_lock_level(level) < 973 ocfs2_highest_compat_lock_level(lockres->l_blocking)) 974 needs_downconvert = 1; 975 976 lockres->l_blocking = level; 977 } 978 979 mlog(ML_BASTS, "lockres %s, block %d, level %d, l_block %d, dwn %d\n", 980 lockres->l_name, level, lockres->l_level, lockres->l_blocking, 981 needs_downconvert); 982 983 if (needs_downconvert) 984 lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED); 985 mlog(0, "needs_downconvert = %d\n", needs_downconvert); 986 return needs_downconvert; 987 } 988 989 /* 990 * OCFS2_LOCK_PENDING and l_pending_gen. 991 * 992 * Why does OCFS2_LOCK_PENDING exist? To close a race between setting 993 * OCFS2_LOCK_BUSY and calling ocfs2_dlm_lock(). See ocfs2_unblock_lock() 994 * for more details on the race. 995 * 996 * OCFS2_LOCK_PENDING closes the race quite nicely. However, it introduces 997 * a race on itself. In o2dlm, we can get the ast before ocfs2_dlm_lock() 998 * returns. The ast clears OCFS2_LOCK_BUSY, and must therefore clear 999 * OCFS2_LOCK_PENDING at the same time. When ocfs2_dlm_lock() returns, 1000 * the caller is going to try to clear PENDING again. If nothing else is 1001 * happening, __lockres_clear_pending() sees PENDING is unset and does 1002 * nothing. 1003 * 1004 * But what if another path (eg downconvert thread) has just started a 1005 * new locking action? The other path has re-set PENDING. Our path 1006 * cannot clear PENDING, because that will re-open the original race 1007 * window. 1008 * 1009 * [Example] 1010 * 1011 * ocfs2_meta_lock() 1012 * ocfs2_cluster_lock() 1013 * set BUSY 1014 * set PENDING 1015 * drop l_lock 1016 * ocfs2_dlm_lock() 1017 * ocfs2_locking_ast() ocfs2_downconvert_thread() 1018 * clear PENDING ocfs2_unblock_lock() 1019 * take_l_lock 1020 * !BUSY 1021 * ocfs2_prepare_downconvert() 1022 * set BUSY 1023 * set PENDING 1024 * drop l_lock 1025 * take l_lock 1026 * clear PENDING 1027 * drop l_lock 1028 * <window> 1029 * ocfs2_dlm_lock() 1030 * 1031 * So as you can see, we now have a window where l_lock is not held, 1032 * PENDING is not set, and ocfs2_dlm_lock() has not been called. 1033 * 1034 * The core problem is that ocfs2_cluster_lock() has cleared the PENDING 1035 * set by ocfs2_prepare_downconvert(). That wasn't nice. 1036 * 1037 * To solve this we introduce l_pending_gen. A call to 1038 * lockres_clear_pending() will only do so when it is passed a generation 1039 * number that matches the lockres. lockres_set_pending() will return the 1040 * current generation number. When ocfs2_cluster_lock() goes to clear 1041 * PENDING, it passes the generation it got from set_pending(). In our 1042 * example above, the generation numbers will *not* match. Thus, 1043 * ocfs2_cluster_lock() will not clear the PENDING set by 1044 * ocfs2_prepare_downconvert(). 1045 */ 1046 1047 /* Unlocked version for ocfs2_locking_ast() */ 1048 static void __lockres_clear_pending(struct ocfs2_lock_res *lockres, 1049 unsigned int generation, 1050 struct ocfs2_super *osb) 1051 { 1052 assert_spin_locked(&lockres->l_lock); 1053 1054 /* 1055 * The ast and locking functions can race us here. The winner 1056 * will clear pending, the loser will not. 1057 */ 1058 if (!(lockres->l_flags & OCFS2_LOCK_PENDING) || 1059 (lockres->l_pending_gen != generation)) 1060 return; 1061 1062 lockres_clear_flags(lockres, OCFS2_LOCK_PENDING); 1063 lockres->l_pending_gen++; 1064 1065 /* 1066 * The downconvert thread may have skipped us because we 1067 * were PENDING. Wake it up. 1068 */ 1069 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) 1070 ocfs2_wake_downconvert_thread(osb); 1071 } 1072 1073 /* Locked version for callers of ocfs2_dlm_lock() */ 1074 static void lockres_clear_pending(struct ocfs2_lock_res *lockres, 1075 unsigned int generation, 1076 struct ocfs2_super *osb) 1077 { 1078 unsigned long flags; 1079 1080 spin_lock_irqsave(&lockres->l_lock, flags); 1081 __lockres_clear_pending(lockres, generation, osb); 1082 spin_unlock_irqrestore(&lockres->l_lock, flags); 1083 } 1084 1085 static unsigned int lockres_set_pending(struct ocfs2_lock_res *lockres) 1086 { 1087 assert_spin_locked(&lockres->l_lock); 1088 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 1089 1090 lockres_or_flags(lockres, OCFS2_LOCK_PENDING); 1091 1092 return lockres->l_pending_gen; 1093 } 1094 1095 static void ocfs2_blocking_ast(struct ocfs2_dlm_lksb *lksb, int level) 1096 { 1097 struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb); 1098 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 1099 int needs_downconvert; 1100 unsigned long flags; 1101 1102 BUG_ON(level <= DLM_LOCK_NL); 1103 1104 mlog(ML_BASTS, "BAST fired for lockres %s, blocking %d, level %d, " 1105 "type %s\n", lockres->l_name, level, lockres->l_level, 1106 ocfs2_lock_type_string(lockres->l_type)); 1107 1108 /* 1109 * We can skip the bast for locks which don't enable caching - 1110 * they'll be dropped at the earliest possible time anyway. 1111 */ 1112 if (lockres->l_flags & OCFS2_LOCK_NOCACHE) 1113 return; 1114 1115 spin_lock_irqsave(&lockres->l_lock, flags); 1116 needs_downconvert = ocfs2_generic_handle_bast(lockres, level); 1117 if (needs_downconvert) 1118 ocfs2_schedule_blocked_lock(osb, lockres); 1119 spin_unlock_irqrestore(&lockres->l_lock, flags); 1120 1121 wake_up(&lockres->l_event); 1122 1123 ocfs2_wake_downconvert_thread(osb); 1124 } 1125 1126 static void ocfs2_locking_ast(struct ocfs2_dlm_lksb *lksb) 1127 { 1128 struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb); 1129 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 1130 unsigned long flags; 1131 int status; 1132 1133 spin_lock_irqsave(&lockres->l_lock, flags); 1134 1135 status = ocfs2_dlm_lock_status(&lockres->l_lksb); 1136 1137 if (status == -EAGAIN) { 1138 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 1139 goto out; 1140 } 1141 1142 if (status) { 1143 mlog(ML_ERROR, "lockres %s: lksb status value of %d!\n", 1144 lockres->l_name, status); 1145 spin_unlock_irqrestore(&lockres->l_lock, flags); 1146 return; 1147 } 1148 1149 mlog(ML_BASTS, "AST fired for lockres %s, action %d, unlock %d, " 1150 "level %d => %d\n", lockres->l_name, lockres->l_action, 1151 lockres->l_unlock_action, lockres->l_level, lockres->l_requested); 1152 1153 switch(lockres->l_action) { 1154 case OCFS2_AST_ATTACH: 1155 ocfs2_generic_handle_attach_action(lockres); 1156 lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL); 1157 break; 1158 case OCFS2_AST_CONVERT: 1159 ocfs2_generic_handle_convert_action(lockres); 1160 break; 1161 case OCFS2_AST_DOWNCONVERT: 1162 ocfs2_generic_handle_downconvert_action(lockres); 1163 break; 1164 default: 1165 mlog(ML_ERROR, "lockres %s: AST fired with invalid action: %u, " 1166 "flags 0x%lx, unlock: %u\n", 1167 lockres->l_name, lockres->l_action, lockres->l_flags, 1168 lockres->l_unlock_action); 1169 BUG(); 1170 } 1171 out: 1172 /* set it to something invalid so if we get called again we 1173 * can catch it. */ 1174 lockres->l_action = OCFS2_AST_INVALID; 1175 1176 /* Did we try to cancel this lock? Clear that state */ 1177 if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) 1178 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 1179 1180 /* 1181 * We may have beaten the locking functions here. We certainly 1182 * know that dlm_lock() has been called :-) 1183 * Because we can't have two lock calls in flight at once, we 1184 * can use lockres->l_pending_gen. 1185 */ 1186 __lockres_clear_pending(lockres, lockres->l_pending_gen, osb); 1187 1188 wake_up(&lockres->l_event); 1189 spin_unlock_irqrestore(&lockres->l_lock, flags); 1190 } 1191 1192 static void ocfs2_unlock_ast(struct ocfs2_dlm_lksb *lksb, int error) 1193 { 1194 struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb); 1195 unsigned long flags; 1196 1197 mlog(ML_BASTS, "UNLOCK AST fired for lockres %s, action = %d\n", 1198 lockres->l_name, lockres->l_unlock_action); 1199 1200 spin_lock_irqsave(&lockres->l_lock, flags); 1201 if (error) { 1202 mlog(ML_ERROR, "Dlm passes error %d for lock %s, " 1203 "unlock_action %d\n", error, lockres->l_name, 1204 lockres->l_unlock_action); 1205 spin_unlock_irqrestore(&lockres->l_lock, flags); 1206 return; 1207 } 1208 1209 switch(lockres->l_unlock_action) { 1210 case OCFS2_UNLOCK_CANCEL_CONVERT: 1211 mlog(0, "Cancel convert success for %s\n", lockres->l_name); 1212 lockres->l_action = OCFS2_AST_INVALID; 1213 /* Downconvert thread may have requeued this lock, we 1214 * need to wake it. */ 1215 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) 1216 ocfs2_wake_downconvert_thread(ocfs2_get_lockres_osb(lockres)); 1217 break; 1218 case OCFS2_UNLOCK_DROP_LOCK: 1219 lockres->l_level = DLM_LOCK_IV; 1220 break; 1221 default: 1222 BUG(); 1223 } 1224 1225 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 1226 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 1227 wake_up(&lockres->l_event); 1228 spin_unlock_irqrestore(&lockres->l_lock, flags); 1229 } 1230 1231 /* 1232 * This is the filesystem locking protocol. It provides the lock handling 1233 * hooks for the underlying DLM. It has a maximum version number. 1234 * The version number allows interoperability with systems running at 1235 * the same major number and an equal or smaller minor number. 1236 * 1237 * Whenever the filesystem does new things with locks (adds or removes a 1238 * lock, orders them differently, does different things underneath a lock), 1239 * the version must be changed. The protocol is negotiated when joining 1240 * the dlm domain. A node may join the domain if its major version is 1241 * identical to all other nodes and its minor version is greater than 1242 * or equal to all other nodes. When its minor version is greater than 1243 * the other nodes, it will run at the minor version specified by the 1244 * other nodes. 1245 * 1246 * If a locking change is made that will not be compatible with older 1247 * versions, the major number must be increased and the minor version set 1248 * to zero. If a change merely adds a behavior that can be disabled when 1249 * speaking to older versions, the minor version must be increased. If a 1250 * change adds a fully backwards compatible change (eg, LVB changes that 1251 * are just ignored by older versions), the version does not need to be 1252 * updated. 1253 */ 1254 static struct ocfs2_locking_protocol lproto = { 1255 .lp_max_version = { 1256 .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR, 1257 .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR, 1258 }, 1259 .lp_lock_ast = ocfs2_locking_ast, 1260 .lp_blocking_ast = ocfs2_blocking_ast, 1261 .lp_unlock_ast = ocfs2_unlock_ast, 1262 }; 1263 1264 void ocfs2_set_locking_protocol(void) 1265 { 1266 ocfs2_stack_glue_set_max_proto_version(&lproto.lp_max_version); 1267 } 1268 1269 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres, 1270 int convert) 1271 { 1272 unsigned long flags; 1273 1274 spin_lock_irqsave(&lockres->l_lock, flags); 1275 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 1276 lockres_clear_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING); 1277 if (convert) 1278 lockres->l_action = OCFS2_AST_INVALID; 1279 else 1280 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 1281 spin_unlock_irqrestore(&lockres->l_lock, flags); 1282 1283 wake_up(&lockres->l_event); 1284 } 1285 1286 /* Note: If we detect another process working on the lock (i.e., 1287 * OCFS2_LOCK_BUSY), we'll bail out returning 0. It's up to the caller 1288 * to do the right thing in that case. 1289 */ 1290 static int ocfs2_lock_create(struct ocfs2_super *osb, 1291 struct ocfs2_lock_res *lockres, 1292 int level, 1293 u32 dlm_flags) 1294 { 1295 int ret = 0; 1296 unsigned long flags; 1297 unsigned int gen; 1298 1299 mlog(0, "lock %s, level = %d, flags = %u\n", lockres->l_name, level, 1300 dlm_flags); 1301 1302 spin_lock_irqsave(&lockres->l_lock, flags); 1303 if ((lockres->l_flags & OCFS2_LOCK_ATTACHED) || 1304 (lockres->l_flags & OCFS2_LOCK_BUSY)) { 1305 spin_unlock_irqrestore(&lockres->l_lock, flags); 1306 goto bail; 1307 } 1308 1309 lockres->l_action = OCFS2_AST_ATTACH; 1310 lockres->l_requested = level; 1311 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 1312 gen = lockres_set_pending(lockres); 1313 spin_unlock_irqrestore(&lockres->l_lock, flags); 1314 1315 ret = ocfs2_dlm_lock(osb->cconn, 1316 level, 1317 &lockres->l_lksb, 1318 dlm_flags, 1319 lockres->l_name, 1320 OCFS2_LOCK_ID_MAX_LEN - 1); 1321 lockres_clear_pending(lockres, gen, osb); 1322 if (ret) { 1323 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 1324 ocfs2_recover_from_dlm_error(lockres, 1); 1325 } 1326 1327 mlog(0, "lock %s, return from ocfs2_dlm_lock\n", lockres->l_name); 1328 1329 bail: 1330 return ret; 1331 } 1332 1333 static inline int ocfs2_check_wait_flag(struct ocfs2_lock_res *lockres, 1334 int flag) 1335 { 1336 unsigned long flags; 1337 int ret; 1338 1339 spin_lock_irqsave(&lockres->l_lock, flags); 1340 ret = lockres->l_flags & flag; 1341 spin_unlock_irqrestore(&lockres->l_lock, flags); 1342 1343 return ret; 1344 } 1345 1346 static inline void ocfs2_wait_on_busy_lock(struct ocfs2_lock_res *lockres) 1347 1348 { 1349 wait_event(lockres->l_event, 1350 !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_BUSY)); 1351 } 1352 1353 static inline void ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res *lockres) 1354 1355 { 1356 wait_event(lockres->l_event, 1357 !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_REFRESHING)); 1358 } 1359 1360 /* predict what lock level we'll be dropping down to on behalf 1361 * of another node, and return true if the currently wanted 1362 * level will be compatible with it. */ 1363 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres, 1364 int wanted) 1365 { 1366 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); 1367 1368 return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking); 1369 } 1370 1371 static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw) 1372 { 1373 INIT_LIST_HEAD(&mw->mw_item); 1374 init_completion(&mw->mw_complete); 1375 ocfs2_init_start_time(mw); 1376 } 1377 1378 static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw) 1379 { 1380 wait_for_completion(&mw->mw_complete); 1381 /* Re-arm the completion in case we want to wait on it again */ 1382 reinit_completion(&mw->mw_complete); 1383 return mw->mw_status; 1384 } 1385 1386 static void lockres_add_mask_waiter(struct ocfs2_lock_res *lockres, 1387 struct ocfs2_mask_waiter *mw, 1388 unsigned long mask, 1389 unsigned long goal) 1390 { 1391 BUG_ON(!list_empty(&mw->mw_item)); 1392 1393 assert_spin_locked(&lockres->l_lock); 1394 1395 list_add_tail(&mw->mw_item, &lockres->l_mask_waiters); 1396 mw->mw_mask = mask; 1397 mw->mw_goal = goal; 1398 } 1399 1400 /* returns 0 if the mw that was removed was already satisfied, -EBUSY 1401 * if the mask still hadn't reached its goal */ 1402 static int __lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres, 1403 struct ocfs2_mask_waiter *mw) 1404 { 1405 int ret = 0; 1406 1407 assert_spin_locked(&lockres->l_lock); 1408 if (!list_empty(&mw->mw_item)) { 1409 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal) 1410 ret = -EBUSY; 1411 1412 list_del_init(&mw->mw_item); 1413 init_completion(&mw->mw_complete); 1414 } 1415 1416 return ret; 1417 } 1418 1419 static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres, 1420 struct ocfs2_mask_waiter *mw) 1421 { 1422 unsigned long flags; 1423 int ret = 0; 1424 1425 spin_lock_irqsave(&lockres->l_lock, flags); 1426 ret = __lockres_remove_mask_waiter(lockres, mw); 1427 spin_unlock_irqrestore(&lockres->l_lock, flags); 1428 1429 return ret; 1430 1431 } 1432 1433 static int ocfs2_wait_for_mask_interruptible(struct ocfs2_mask_waiter *mw, 1434 struct ocfs2_lock_res *lockres) 1435 { 1436 int ret; 1437 1438 ret = wait_for_completion_interruptible(&mw->mw_complete); 1439 if (ret) 1440 lockres_remove_mask_waiter(lockres, mw); 1441 else 1442 ret = mw->mw_status; 1443 /* Re-arm the completion in case we want to wait on it again */ 1444 reinit_completion(&mw->mw_complete); 1445 return ret; 1446 } 1447 1448 static int __ocfs2_cluster_lock(struct ocfs2_super *osb, 1449 struct ocfs2_lock_res *lockres, 1450 int level, 1451 u32 lkm_flags, 1452 int arg_flags, 1453 int l_subclass, 1454 unsigned long caller_ip) 1455 { 1456 struct ocfs2_mask_waiter mw; 1457 int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR); 1458 int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */ 1459 unsigned long flags; 1460 unsigned int gen; 1461 int noqueue_attempted = 0; 1462 int dlm_locked = 0; 1463 int kick_dc = 0; 1464 1465 if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED)) { 1466 mlog_errno(-EINVAL); 1467 return -EINVAL; 1468 } 1469 1470 ocfs2_init_mask_waiter(&mw); 1471 1472 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) 1473 lkm_flags |= DLM_LKF_VALBLK; 1474 1475 again: 1476 wait = 0; 1477 1478 spin_lock_irqsave(&lockres->l_lock, flags); 1479 1480 if (catch_signals && signal_pending(current)) { 1481 ret = -ERESTARTSYS; 1482 goto unlock; 1483 } 1484 1485 mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING, 1486 "Cluster lock called on freeing lockres %s! flags " 1487 "0x%lx\n", lockres->l_name, lockres->l_flags); 1488 1489 /* We only compare against the currently granted level 1490 * here. If the lock is blocked waiting on a downconvert, 1491 * we'll get caught below. */ 1492 if (lockres->l_flags & OCFS2_LOCK_BUSY && 1493 level > lockres->l_level) { 1494 /* is someone sitting in dlm_lock? If so, wait on 1495 * them. */ 1496 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1497 wait = 1; 1498 goto unlock; 1499 } 1500 1501 if (lockres->l_flags & OCFS2_LOCK_UPCONVERT_FINISHING) { 1502 /* 1503 * We've upconverted. If the lock now has a level we can 1504 * work with, we take it. If, however, the lock is not at the 1505 * required level, we go thru the full cycle. One way this could 1506 * happen is if a process requesting an upconvert to PR is 1507 * closely followed by another requesting upconvert to an EX. 1508 * If the process requesting EX lands here, we want it to 1509 * continue attempting to upconvert and let the process 1510 * requesting PR take the lock. 1511 * If multiple processes request upconvert to PR, the first one 1512 * here will take the lock. The others will have to go thru the 1513 * OCFS2_LOCK_BLOCKED check to ensure that there is no pending 1514 * downconvert request. 1515 */ 1516 if (level <= lockres->l_level) 1517 goto update_holders; 1518 } 1519 1520 if (lockres->l_flags & OCFS2_LOCK_BLOCKED && 1521 !ocfs2_may_continue_on_blocked_lock(lockres, level)) { 1522 /* is the lock is currently blocked on behalf of 1523 * another node */ 1524 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BLOCKED, 0); 1525 wait = 1; 1526 goto unlock; 1527 } 1528 1529 if (level > lockres->l_level) { 1530 if (noqueue_attempted > 0) { 1531 ret = -EAGAIN; 1532 goto unlock; 1533 } 1534 if (lkm_flags & DLM_LKF_NOQUEUE) 1535 noqueue_attempted = 1; 1536 1537 if (lockres->l_action != OCFS2_AST_INVALID) 1538 mlog(ML_ERROR, "lockres %s has action %u pending\n", 1539 lockres->l_name, lockres->l_action); 1540 1541 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 1542 lockres->l_action = OCFS2_AST_ATTACH; 1543 lkm_flags &= ~DLM_LKF_CONVERT; 1544 } else { 1545 lockres->l_action = OCFS2_AST_CONVERT; 1546 lkm_flags |= DLM_LKF_CONVERT; 1547 } 1548 1549 lockres->l_requested = level; 1550 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 1551 gen = lockres_set_pending(lockres); 1552 spin_unlock_irqrestore(&lockres->l_lock, flags); 1553 1554 BUG_ON(level == DLM_LOCK_IV); 1555 BUG_ON(level == DLM_LOCK_NL); 1556 1557 mlog(ML_BASTS, "lockres %s, convert from %d to %d\n", 1558 lockres->l_name, lockres->l_level, level); 1559 1560 /* call dlm_lock to upgrade lock now */ 1561 ret = ocfs2_dlm_lock(osb->cconn, 1562 level, 1563 &lockres->l_lksb, 1564 lkm_flags, 1565 lockres->l_name, 1566 OCFS2_LOCK_ID_MAX_LEN - 1); 1567 lockres_clear_pending(lockres, gen, osb); 1568 if (ret) { 1569 if (!(lkm_flags & DLM_LKF_NOQUEUE) || 1570 (ret != -EAGAIN)) { 1571 ocfs2_log_dlm_error("ocfs2_dlm_lock", 1572 ret, lockres); 1573 } 1574 ocfs2_recover_from_dlm_error(lockres, 1); 1575 goto out; 1576 } 1577 dlm_locked = 1; 1578 1579 mlog(0, "lock %s, successful return from ocfs2_dlm_lock\n", 1580 lockres->l_name); 1581 1582 /* At this point we've gone inside the dlm and need to 1583 * complete our work regardless. */ 1584 catch_signals = 0; 1585 1586 /* wait for busy to clear and carry on */ 1587 goto again; 1588 } 1589 1590 update_holders: 1591 /* Ok, if we get here then we're good to go. */ 1592 ocfs2_inc_holders(lockres, level); 1593 1594 ret = 0; 1595 unlock: 1596 lockres_clear_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING); 1597 1598 /* ocfs2_unblock_lock reques on seeing OCFS2_LOCK_UPCONVERT_FINISHING */ 1599 kick_dc = (lockres->l_flags & OCFS2_LOCK_BLOCKED); 1600 1601 spin_unlock_irqrestore(&lockres->l_lock, flags); 1602 if (kick_dc) 1603 ocfs2_wake_downconvert_thread(osb); 1604 out: 1605 /* 1606 * This is helping work around a lock inversion between the page lock 1607 * and dlm locks. One path holds the page lock while calling aops 1608 * which block acquiring dlm locks. The voting thread holds dlm 1609 * locks while acquiring page locks while down converting data locks. 1610 * This block is helping an aop path notice the inversion and back 1611 * off to unlock its page lock before trying the dlm lock again. 1612 */ 1613 if (wait && arg_flags & OCFS2_LOCK_NONBLOCK && 1614 mw.mw_mask & (OCFS2_LOCK_BUSY|OCFS2_LOCK_BLOCKED)) { 1615 wait = 0; 1616 spin_lock_irqsave(&lockres->l_lock, flags); 1617 if (__lockres_remove_mask_waiter(lockres, &mw)) { 1618 if (dlm_locked) 1619 lockres_or_flags(lockres, 1620 OCFS2_LOCK_NONBLOCK_FINISHED); 1621 spin_unlock_irqrestore(&lockres->l_lock, flags); 1622 ret = -EAGAIN; 1623 } else { 1624 spin_unlock_irqrestore(&lockres->l_lock, flags); 1625 goto again; 1626 } 1627 } 1628 if (wait) { 1629 ret = ocfs2_wait_for_mask(&mw); 1630 if (ret == 0) 1631 goto again; 1632 mlog_errno(ret); 1633 } 1634 ocfs2_update_lock_stats(lockres, level, &mw, ret); 1635 1636 #ifdef CONFIG_DEBUG_LOCK_ALLOC 1637 if (!ret && lockres->l_lockdep_map.key != NULL) { 1638 if (level == DLM_LOCK_PR) 1639 rwsem_acquire_read(&lockres->l_lockdep_map, l_subclass, 1640 !!(arg_flags & OCFS2_META_LOCK_NOQUEUE), 1641 caller_ip); 1642 else 1643 rwsem_acquire(&lockres->l_lockdep_map, l_subclass, 1644 !!(arg_flags & OCFS2_META_LOCK_NOQUEUE), 1645 caller_ip); 1646 } 1647 #endif 1648 return ret; 1649 } 1650 1651 static inline int ocfs2_cluster_lock(struct ocfs2_super *osb, 1652 struct ocfs2_lock_res *lockres, 1653 int level, 1654 u32 lkm_flags, 1655 int arg_flags) 1656 { 1657 return __ocfs2_cluster_lock(osb, lockres, level, lkm_flags, arg_flags, 1658 0, _RET_IP_); 1659 } 1660 1661 1662 static void __ocfs2_cluster_unlock(struct ocfs2_super *osb, 1663 struct ocfs2_lock_res *lockres, 1664 int level, 1665 unsigned long caller_ip) 1666 { 1667 unsigned long flags; 1668 1669 spin_lock_irqsave(&lockres->l_lock, flags); 1670 ocfs2_dec_holders(lockres, level); 1671 ocfs2_downconvert_on_unlock(osb, lockres); 1672 spin_unlock_irqrestore(&lockres->l_lock, flags); 1673 #ifdef CONFIG_DEBUG_LOCK_ALLOC 1674 if (lockres->l_lockdep_map.key != NULL) 1675 rwsem_release(&lockres->l_lockdep_map, 1, caller_ip); 1676 #endif 1677 } 1678 1679 static int ocfs2_create_new_lock(struct ocfs2_super *osb, 1680 struct ocfs2_lock_res *lockres, 1681 int ex, 1682 int local) 1683 { 1684 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 1685 unsigned long flags; 1686 u32 lkm_flags = local ? DLM_LKF_LOCAL : 0; 1687 1688 spin_lock_irqsave(&lockres->l_lock, flags); 1689 BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED); 1690 lockres_or_flags(lockres, OCFS2_LOCK_LOCAL); 1691 spin_unlock_irqrestore(&lockres->l_lock, flags); 1692 1693 return ocfs2_lock_create(osb, lockres, level, lkm_flags); 1694 } 1695 1696 /* Grants us an EX lock on the data and metadata resources, skipping 1697 * the normal cluster directory lookup. Use this ONLY on newly created 1698 * inodes which other nodes can't possibly see, and which haven't been 1699 * hashed in the inode hash yet. This can give us a good performance 1700 * increase as it'll skip the network broadcast normally associated 1701 * with creating a new lock resource. */ 1702 int ocfs2_create_new_inode_locks(struct inode *inode) 1703 { 1704 int ret; 1705 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1706 1707 BUG_ON(!ocfs2_inode_is_new(inode)); 1708 1709 mlog(0, "Inode %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno); 1710 1711 /* NOTE: That we don't increment any of the holder counts, nor 1712 * do we add anything to a journal handle. Since this is 1713 * supposed to be a new inode which the cluster doesn't know 1714 * about yet, there is no need to. As far as the LVB handling 1715 * is concerned, this is basically like acquiring an EX lock 1716 * on a resource which has an invalid one -- we'll set it 1717 * valid when we release the EX. */ 1718 1719 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1, 1); 1720 if (ret) { 1721 mlog_errno(ret); 1722 goto bail; 1723 } 1724 1725 /* 1726 * We don't want to use DLM_LKF_LOCAL on a meta data lock as they 1727 * don't use a generation in their lock names. 1728 */ 1729 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_inode_lockres, 1, 0); 1730 if (ret) { 1731 mlog_errno(ret); 1732 goto bail; 1733 } 1734 1735 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_open_lockres, 0, 0); 1736 if (ret) 1737 mlog_errno(ret); 1738 1739 bail: 1740 return ret; 1741 } 1742 1743 int ocfs2_rw_lock(struct inode *inode, int write) 1744 { 1745 int status, level; 1746 struct ocfs2_lock_res *lockres; 1747 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1748 1749 mlog(0, "inode %llu take %s RW lock\n", 1750 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1751 write ? "EXMODE" : "PRMODE"); 1752 1753 if (ocfs2_mount_local(osb)) 1754 return 0; 1755 1756 lockres = &OCFS2_I(inode)->ip_rw_lockres; 1757 1758 level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1759 1760 status = ocfs2_cluster_lock(osb, lockres, level, 0, 0); 1761 if (status < 0) 1762 mlog_errno(status); 1763 1764 return status; 1765 } 1766 1767 int ocfs2_try_rw_lock(struct inode *inode, int write) 1768 { 1769 int status, level; 1770 struct ocfs2_lock_res *lockres; 1771 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1772 1773 mlog(0, "inode %llu try to take %s RW lock\n", 1774 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1775 write ? "EXMODE" : "PRMODE"); 1776 1777 if (ocfs2_mount_local(osb)) 1778 return 0; 1779 1780 lockres = &OCFS2_I(inode)->ip_rw_lockres; 1781 1782 level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1783 1784 status = ocfs2_cluster_lock(osb, lockres, level, DLM_LKF_NOQUEUE, 0); 1785 return status; 1786 } 1787 1788 void ocfs2_rw_unlock(struct inode *inode, int write) 1789 { 1790 int level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1791 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres; 1792 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1793 1794 mlog(0, "inode %llu drop %s RW lock\n", 1795 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1796 write ? "EXMODE" : "PRMODE"); 1797 1798 if (!ocfs2_mount_local(osb)) 1799 ocfs2_cluster_unlock(osb, lockres, level); 1800 } 1801 1802 /* 1803 * ocfs2_open_lock always get PR mode lock. 1804 */ 1805 int ocfs2_open_lock(struct inode *inode) 1806 { 1807 int status = 0; 1808 struct ocfs2_lock_res *lockres; 1809 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1810 1811 mlog(0, "inode %llu take PRMODE open lock\n", 1812 (unsigned long long)OCFS2_I(inode)->ip_blkno); 1813 1814 if (ocfs2_is_hard_readonly(osb) || ocfs2_mount_local(osb)) 1815 goto out; 1816 1817 lockres = &OCFS2_I(inode)->ip_open_lockres; 1818 1819 status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_PR, 0, 0); 1820 if (status < 0) 1821 mlog_errno(status); 1822 1823 out: 1824 return status; 1825 } 1826 1827 int ocfs2_try_open_lock(struct inode *inode, int write) 1828 { 1829 int status = 0, level; 1830 struct ocfs2_lock_res *lockres; 1831 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1832 1833 mlog(0, "inode %llu try to take %s open lock\n", 1834 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1835 write ? "EXMODE" : "PRMODE"); 1836 1837 if (ocfs2_is_hard_readonly(osb)) { 1838 if (write) 1839 status = -EROFS; 1840 goto out; 1841 } 1842 1843 if (ocfs2_mount_local(osb)) 1844 goto out; 1845 1846 lockres = &OCFS2_I(inode)->ip_open_lockres; 1847 1848 level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1849 1850 /* 1851 * The file system may already holding a PRMODE/EXMODE open lock. 1852 * Since we pass DLM_LKF_NOQUEUE, the request won't block waiting on 1853 * other nodes and the -EAGAIN will indicate to the caller that 1854 * this inode is still in use. 1855 */ 1856 status = ocfs2_cluster_lock(osb, lockres, level, DLM_LKF_NOQUEUE, 0); 1857 1858 out: 1859 return status; 1860 } 1861 1862 /* 1863 * ocfs2_open_unlock unlock PR and EX mode open locks. 1864 */ 1865 void ocfs2_open_unlock(struct inode *inode) 1866 { 1867 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_open_lockres; 1868 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1869 1870 mlog(0, "inode %llu drop open lock\n", 1871 (unsigned long long)OCFS2_I(inode)->ip_blkno); 1872 1873 if (ocfs2_mount_local(osb)) 1874 goto out; 1875 1876 if(lockres->l_ro_holders) 1877 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_PR); 1878 if(lockres->l_ex_holders) 1879 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX); 1880 1881 out: 1882 return; 1883 } 1884 1885 static int ocfs2_flock_handle_signal(struct ocfs2_lock_res *lockres, 1886 int level) 1887 { 1888 int ret; 1889 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 1890 unsigned long flags; 1891 struct ocfs2_mask_waiter mw; 1892 1893 ocfs2_init_mask_waiter(&mw); 1894 1895 retry_cancel: 1896 spin_lock_irqsave(&lockres->l_lock, flags); 1897 if (lockres->l_flags & OCFS2_LOCK_BUSY) { 1898 ret = ocfs2_prepare_cancel_convert(osb, lockres); 1899 if (ret) { 1900 spin_unlock_irqrestore(&lockres->l_lock, flags); 1901 ret = ocfs2_cancel_convert(osb, lockres); 1902 if (ret < 0) { 1903 mlog_errno(ret); 1904 goto out; 1905 } 1906 goto retry_cancel; 1907 } 1908 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1909 spin_unlock_irqrestore(&lockres->l_lock, flags); 1910 1911 ocfs2_wait_for_mask(&mw); 1912 goto retry_cancel; 1913 } 1914 1915 ret = -ERESTARTSYS; 1916 /* 1917 * We may still have gotten the lock, in which case there's no 1918 * point to restarting the syscall. 1919 */ 1920 if (lockres->l_level == level) 1921 ret = 0; 1922 1923 mlog(0, "Cancel returning %d. flags: 0x%lx, level: %d, act: %d\n", ret, 1924 lockres->l_flags, lockres->l_level, lockres->l_action); 1925 1926 spin_unlock_irqrestore(&lockres->l_lock, flags); 1927 1928 out: 1929 return ret; 1930 } 1931 1932 /* 1933 * ocfs2_file_lock() and ocfs2_file_unlock() map to a single pair of 1934 * flock() calls. The locking approach this requires is sufficiently 1935 * different from all other cluster lock types that we implement a 1936 * separate path to the "low-level" dlm calls. In particular: 1937 * 1938 * - No optimization of lock levels is done - we take at exactly 1939 * what's been requested. 1940 * 1941 * - No lock caching is employed. We immediately downconvert to 1942 * no-lock at unlock time. This also means flock locks never go on 1943 * the blocking list). 1944 * 1945 * - Since userspace can trivially deadlock itself with flock, we make 1946 * sure to allow cancellation of a misbehaving applications flock() 1947 * request. 1948 * 1949 * - Access to any flock lockres doesn't require concurrency, so we 1950 * can simplify the code by requiring the caller to guarantee 1951 * serialization of dlmglue flock calls. 1952 */ 1953 int ocfs2_file_lock(struct file *file, int ex, int trylock) 1954 { 1955 int ret, level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 1956 unsigned int lkm_flags = trylock ? DLM_LKF_NOQUEUE : 0; 1957 unsigned long flags; 1958 struct ocfs2_file_private *fp = file->private_data; 1959 struct ocfs2_lock_res *lockres = &fp->fp_flock; 1960 struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb); 1961 struct ocfs2_mask_waiter mw; 1962 1963 ocfs2_init_mask_waiter(&mw); 1964 1965 if ((lockres->l_flags & OCFS2_LOCK_BUSY) || 1966 (lockres->l_level > DLM_LOCK_NL)) { 1967 mlog(ML_ERROR, 1968 "File lock \"%s\" has busy or locked state: flags: 0x%lx, " 1969 "level: %u\n", lockres->l_name, lockres->l_flags, 1970 lockres->l_level); 1971 return -EINVAL; 1972 } 1973 1974 spin_lock_irqsave(&lockres->l_lock, flags); 1975 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 1976 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1977 spin_unlock_irqrestore(&lockres->l_lock, flags); 1978 1979 /* 1980 * Get the lock at NLMODE to start - that way we 1981 * can cancel the upconvert request if need be. 1982 */ 1983 ret = ocfs2_lock_create(osb, lockres, DLM_LOCK_NL, 0); 1984 if (ret < 0) { 1985 mlog_errno(ret); 1986 goto out; 1987 } 1988 1989 ret = ocfs2_wait_for_mask(&mw); 1990 if (ret) { 1991 mlog_errno(ret); 1992 goto out; 1993 } 1994 spin_lock_irqsave(&lockres->l_lock, flags); 1995 } 1996 1997 lockres->l_action = OCFS2_AST_CONVERT; 1998 lkm_flags |= DLM_LKF_CONVERT; 1999 lockres->l_requested = level; 2000 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 2001 2002 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 2003 spin_unlock_irqrestore(&lockres->l_lock, flags); 2004 2005 ret = ocfs2_dlm_lock(osb->cconn, level, &lockres->l_lksb, lkm_flags, 2006 lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1); 2007 if (ret) { 2008 if (!trylock || (ret != -EAGAIN)) { 2009 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 2010 ret = -EINVAL; 2011 } 2012 2013 ocfs2_recover_from_dlm_error(lockres, 1); 2014 lockres_remove_mask_waiter(lockres, &mw); 2015 goto out; 2016 } 2017 2018 ret = ocfs2_wait_for_mask_interruptible(&mw, lockres); 2019 if (ret == -ERESTARTSYS) { 2020 /* 2021 * Userspace can cause deadlock itself with 2022 * flock(). Current behavior locally is to allow the 2023 * deadlock, but abort the system call if a signal is 2024 * received. We follow this example, otherwise a 2025 * poorly written program could sit in kernel until 2026 * reboot. 2027 * 2028 * Handling this is a bit more complicated for Ocfs2 2029 * though. We can't exit this function with an 2030 * outstanding lock request, so a cancel convert is 2031 * required. We intentionally overwrite 'ret' - if the 2032 * cancel fails and the lock was granted, it's easier 2033 * to just bubble success back up to the user. 2034 */ 2035 ret = ocfs2_flock_handle_signal(lockres, level); 2036 } else if (!ret && (level > lockres->l_level)) { 2037 /* Trylock failed asynchronously */ 2038 BUG_ON(!trylock); 2039 ret = -EAGAIN; 2040 } 2041 2042 out: 2043 2044 mlog(0, "Lock: \"%s\" ex: %d, trylock: %d, returns: %d\n", 2045 lockres->l_name, ex, trylock, ret); 2046 return ret; 2047 } 2048 2049 void ocfs2_file_unlock(struct file *file) 2050 { 2051 int ret; 2052 unsigned int gen; 2053 unsigned long flags; 2054 struct ocfs2_file_private *fp = file->private_data; 2055 struct ocfs2_lock_res *lockres = &fp->fp_flock; 2056 struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb); 2057 struct ocfs2_mask_waiter mw; 2058 2059 ocfs2_init_mask_waiter(&mw); 2060 2061 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) 2062 return; 2063 2064 if (lockres->l_level == DLM_LOCK_NL) 2065 return; 2066 2067 mlog(0, "Unlock: \"%s\" flags: 0x%lx, level: %d, act: %d\n", 2068 lockres->l_name, lockres->l_flags, lockres->l_level, 2069 lockres->l_action); 2070 2071 spin_lock_irqsave(&lockres->l_lock, flags); 2072 /* 2073 * Fake a blocking ast for the downconvert code. 2074 */ 2075 lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED); 2076 lockres->l_blocking = DLM_LOCK_EX; 2077 2078 gen = ocfs2_prepare_downconvert(lockres, DLM_LOCK_NL); 2079 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 2080 spin_unlock_irqrestore(&lockres->l_lock, flags); 2081 2082 ret = ocfs2_downconvert_lock(osb, lockres, DLM_LOCK_NL, 0, gen); 2083 if (ret) { 2084 mlog_errno(ret); 2085 return; 2086 } 2087 2088 ret = ocfs2_wait_for_mask(&mw); 2089 if (ret) 2090 mlog_errno(ret); 2091 } 2092 2093 static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb, 2094 struct ocfs2_lock_res *lockres) 2095 { 2096 int kick = 0; 2097 2098 /* If we know that another node is waiting on our lock, kick 2099 * the downconvert thread * pre-emptively when we reach a release 2100 * condition. */ 2101 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) { 2102 switch(lockres->l_blocking) { 2103 case DLM_LOCK_EX: 2104 if (!lockres->l_ex_holders && !lockres->l_ro_holders) 2105 kick = 1; 2106 break; 2107 case DLM_LOCK_PR: 2108 if (!lockres->l_ex_holders) 2109 kick = 1; 2110 break; 2111 default: 2112 BUG(); 2113 } 2114 } 2115 2116 if (kick) 2117 ocfs2_wake_downconvert_thread(osb); 2118 } 2119 2120 #define OCFS2_SEC_BITS 34 2121 #define OCFS2_SEC_SHIFT (64 - 34) 2122 #define OCFS2_NSEC_MASK ((1ULL << OCFS2_SEC_SHIFT) - 1) 2123 2124 /* LVB only has room for 64 bits of time here so we pack it for 2125 * now. */ 2126 static u64 ocfs2_pack_timespec(struct timespec64 *spec) 2127 { 2128 u64 res; 2129 u64 sec = clamp_t(time64_t, spec->tv_sec, 0, 0x3ffffffffull); 2130 u32 nsec = spec->tv_nsec; 2131 2132 res = (sec << OCFS2_SEC_SHIFT) | (nsec & OCFS2_NSEC_MASK); 2133 2134 return res; 2135 } 2136 2137 /* Call this with the lockres locked. I am reasonably sure we don't 2138 * need ip_lock in this function as anyone who would be changing those 2139 * values is supposed to be blocked in ocfs2_inode_lock right now. */ 2140 static void __ocfs2_stuff_meta_lvb(struct inode *inode) 2141 { 2142 struct ocfs2_inode_info *oi = OCFS2_I(inode); 2143 struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; 2144 struct ocfs2_meta_lvb *lvb; 2145 2146 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2147 2148 /* 2149 * Invalidate the LVB of a deleted inode - this way other 2150 * nodes are forced to go to disk and discover the new inode 2151 * status. 2152 */ 2153 if (oi->ip_flags & OCFS2_INODE_DELETED) { 2154 lvb->lvb_version = 0; 2155 goto out; 2156 } 2157 2158 lvb->lvb_version = OCFS2_LVB_VERSION; 2159 lvb->lvb_isize = cpu_to_be64(i_size_read(inode)); 2160 lvb->lvb_iclusters = cpu_to_be32(oi->ip_clusters); 2161 lvb->lvb_iuid = cpu_to_be32(i_uid_read(inode)); 2162 lvb->lvb_igid = cpu_to_be32(i_gid_read(inode)); 2163 lvb->lvb_imode = cpu_to_be16(inode->i_mode); 2164 lvb->lvb_inlink = cpu_to_be16(inode->i_nlink); 2165 lvb->lvb_iatime_packed = 2166 cpu_to_be64(ocfs2_pack_timespec(&inode->i_atime)); 2167 lvb->lvb_ictime_packed = 2168 cpu_to_be64(ocfs2_pack_timespec(&inode->i_ctime)); 2169 lvb->lvb_imtime_packed = 2170 cpu_to_be64(ocfs2_pack_timespec(&inode->i_mtime)); 2171 lvb->lvb_iattr = cpu_to_be32(oi->ip_attr); 2172 lvb->lvb_idynfeatures = cpu_to_be16(oi->ip_dyn_features); 2173 lvb->lvb_igeneration = cpu_to_be32(inode->i_generation); 2174 2175 out: 2176 mlog_meta_lvb(0, lockres); 2177 } 2178 2179 static void ocfs2_unpack_timespec(struct timespec64 *spec, 2180 u64 packed_time) 2181 { 2182 spec->tv_sec = packed_time >> OCFS2_SEC_SHIFT; 2183 spec->tv_nsec = packed_time & OCFS2_NSEC_MASK; 2184 } 2185 2186 static void ocfs2_refresh_inode_from_lvb(struct inode *inode) 2187 { 2188 struct ocfs2_inode_info *oi = OCFS2_I(inode); 2189 struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; 2190 struct ocfs2_meta_lvb *lvb; 2191 2192 mlog_meta_lvb(0, lockres); 2193 2194 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2195 2196 /* We're safe here without the lockres lock... */ 2197 spin_lock(&oi->ip_lock); 2198 oi->ip_clusters = be32_to_cpu(lvb->lvb_iclusters); 2199 i_size_write(inode, be64_to_cpu(lvb->lvb_isize)); 2200 2201 oi->ip_attr = be32_to_cpu(lvb->lvb_iattr); 2202 oi->ip_dyn_features = be16_to_cpu(lvb->lvb_idynfeatures); 2203 ocfs2_set_inode_flags(inode); 2204 2205 /* fast-symlinks are a special case */ 2206 if (S_ISLNK(inode->i_mode) && !oi->ip_clusters) 2207 inode->i_blocks = 0; 2208 else 2209 inode->i_blocks = ocfs2_inode_sector_count(inode); 2210 2211 i_uid_write(inode, be32_to_cpu(lvb->lvb_iuid)); 2212 i_gid_write(inode, be32_to_cpu(lvb->lvb_igid)); 2213 inode->i_mode = be16_to_cpu(lvb->lvb_imode); 2214 set_nlink(inode, be16_to_cpu(lvb->lvb_inlink)); 2215 ocfs2_unpack_timespec(&inode->i_atime, 2216 be64_to_cpu(lvb->lvb_iatime_packed)); 2217 ocfs2_unpack_timespec(&inode->i_mtime, 2218 be64_to_cpu(lvb->lvb_imtime_packed)); 2219 ocfs2_unpack_timespec(&inode->i_ctime, 2220 be64_to_cpu(lvb->lvb_ictime_packed)); 2221 spin_unlock(&oi->ip_lock); 2222 } 2223 2224 static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode, 2225 struct ocfs2_lock_res *lockres) 2226 { 2227 struct ocfs2_meta_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2228 2229 if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) 2230 && lvb->lvb_version == OCFS2_LVB_VERSION 2231 && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation) 2232 return 1; 2233 return 0; 2234 } 2235 2236 /* Determine whether a lock resource needs to be refreshed, and 2237 * arbitrate who gets to refresh it. 2238 * 2239 * 0 means no refresh needed. 2240 * 2241 * > 0 means you need to refresh this and you MUST call 2242 * ocfs2_complete_lock_res_refresh afterwards. */ 2243 static int ocfs2_should_refresh_lock_res(struct ocfs2_lock_res *lockres) 2244 { 2245 unsigned long flags; 2246 int status = 0; 2247 2248 refresh_check: 2249 spin_lock_irqsave(&lockres->l_lock, flags); 2250 if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) { 2251 spin_unlock_irqrestore(&lockres->l_lock, flags); 2252 goto bail; 2253 } 2254 2255 if (lockres->l_flags & OCFS2_LOCK_REFRESHING) { 2256 spin_unlock_irqrestore(&lockres->l_lock, flags); 2257 2258 ocfs2_wait_on_refreshing_lock(lockres); 2259 goto refresh_check; 2260 } 2261 2262 /* Ok, I'll be the one to refresh this lock. */ 2263 lockres_or_flags(lockres, OCFS2_LOCK_REFRESHING); 2264 spin_unlock_irqrestore(&lockres->l_lock, flags); 2265 2266 status = 1; 2267 bail: 2268 mlog(0, "status %d\n", status); 2269 return status; 2270 } 2271 2272 /* If status is non zero, I'll mark it as not being in refresh 2273 * anymroe, but i won't clear the needs refresh flag. */ 2274 static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockres, 2275 int status) 2276 { 2277 unsigned long flags; 2278 2279 spin_lock_irqsave(&lockres->l_lock, flags); 2280 lockres_clear_flags(lockres, OCFS2_LOCK_REFRESHING); 2281 if (!status) 2282 lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 2283 spin_unlock_irqrestore(&lockres->l_lock, flags); 2284 2285 wake_up(&lockres->l_event); 2286 } 2287 2288 /* may or may not return a bh if it went to disk. */ 2289 static int ocfs2_inode_lock_update(struct inode *inode, 2290 struct buffer_head **bh) 2291 { 2292 int status = 0; 2293 struct ocfs2_inode_info *oi = OCFS2_I(inode); 2294 struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; 2295 struct ocfs2_dinode *fe; 2296 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 2297 2298 if (ocfs2_mount_local(osb)) 2299 goto bail; 2300 2301 spin_lock(&oi->ip_lock); 2302 if (oi->ip_flags & OCFS2_INODE_DELETED) { 2303 mlog(0, "Orphaned inode %llu was deleted while we " 2304 "were waiting on a lock. ip_flags = 0x%x\n", 2305 (unsigned long long)oi->ip_blkno, oi->ip_flags); 2306 spin_unlock(&oi->ip_lock); 2307 status = -ENOENT; 2308 goto bail; 2309 } 2310 spin_unlock(&oi->ip_lock); 2311 2312 if (!ocfs2_should_refresh_lock_res(lockres)) 2313 goto bail; 2314 2315 /* This will discard any caching information we might have had 2316 * for the inode metadata. */ 2317 ocfs2_metadata_cache_purge(INODE_CACHE(inode)); 2318 2319 ocfs2_extent_map_trunc(inode, 0); 2320 2321 if (ocfs2_meta_lvb_is_trustable(inode, lockres)) { 2322 mlog(0, "Trusting LVB on inode %llu\n", 2323 (unsigned long long)oi->ip_blkno); 2324 ocfs2_refresh_inode_from_lvb(inode); 2325 } else { 2326 /* Boo, we have to go to disk. */ 2327 /* read bh, cast, ocfs2_refresh_inode */ 2328 status = ocfs2_read_inode_block(inode, bh); 2329 if (status < 0) { 2330 mlog_errno(status); 2331 goto bail_refresh; 2332 } 2333 fe = (struct ocfs2_dinode *) (*bh)->b_data; 2334 2335 /* This is a good chance to make sure we're not 2336 * locking an invalid object. ocfs2_read_inode_block() 2337 * already checked that the inode block is sane. 2338 * 2339 * We bug on a stale inode here because we checked 2340 * above whether it was wiped from disk. The wiping 2341 * node provides a guarantee that we receive that 2342 * message and can mark the inode before dropping any 2343 * locks associated with it. */ 2344 mlog_bug_on_msg(inode->i_generation != 2345 le32_to_cpu(fe->i_generation), 2346 "Invalid dinode %llu disk generation: %u " 2347 "inode->i_generation: %u\n", 2348 (unsigned long long)oi->ip_blkno, 2349 le32_to_cpu(fe->i_generation), 2350 inode->i_generation); 2351 mlog_bug_on_msg(le64_to_cpu(fe->i_dtime) || 2352 !(fe->i_flags & cpu_to_le32(OCFS2_VALID_FL)), 2353 "Stale dinode %llu dtime: %llu flags: 0x%x\n", 2354 (unsigned long long)oi->ip_blkno, 2355 (unsigned long long)le64_to_cpu(fe->i_dtime), 2356 le32_to_cpu(fe->i_flags)); 2357 2358 ocfs2_refresh_inode(inode, fe); 2359 ocfs2_track_lock_refresh(lockres); 2360 } 2361 2362 status = 0; 2363 bail_refresh: 2364 ocfs2_complete_lock_res_refresh(lockres, status); 2365 bail: 2366 return status; 2367 } 2368 2369 static int ocfs2_assign_bh(struct inode *inode, 2370 struct buffer_head **ret_bh, 2371 struct buffer_head *passed_bh) 2372 { 2373 int status; 2374 2375 if (passed_bh) { 2376 /* Ok, the update went to disk for us, use the 2377 * returned bh. */ 2378 *ret_bh = passed_bh; 2379 get_bh(*ret_bh); 2380 2381 return 0; 2382 } 2383 2384 status = ocfs2_read_inode_block(inode, ret_bh); 2385 if (status < 0) 2386 mlog_errno(status); 2387 2388 return status; 2389 } 2390 2391 /* 2392 * returns < 0 error if the callback will never be called, otherwise 2393 * the result of the lock will be communicated via the callback. 2394 */ 2395 int ocfs2_inode_lock_full_nested(struct inode *inode, 2396 struct buffer_head **ret_bh, 2397 int ex, 2398 int arg_flags, 2399 int subclass) 2400 { 2401 int status, level, acquired; 2402 u32 dlm_flags; 2403 struct ocfs2_lock_res *lockres = NULL; 2404 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 2405 struct buffer_head *local_bh = NULL; 2406 2407 mlog(0, "inode %llu, take %s META lock\n", 2408 (unsigned long long)OCFS2_I(inode)->ip_blkno, 2409 ex ? "EXMODE" : "PRMODE"); 2410 2411 status = 0; 2412 acquired = 0; 2413 /* We'll allow faking a readonly metadata lock for 2414 * rodevices. */ 2415 if (ocfs2_is_hard_readonly(osb)) { 2416 if (ex) 2417 status = -EROFS; 2418 goto getbh; 2419 } 2420 2421 if ((arg_flags & OCFS2_META_LOCK_GETBH) || 2422 ocfs2_mount_local(osb)) 2423 goto update; 2424 2425 if (!(arg_flags & OCFS2_META_LOCK_RECOVERY)) 2426 ocfs2_wait_for_recovery(osb); 2427 2428 lockres = &OCFS2_I(inode)->ip_inode_lockres; 2429 level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2430 dlm_flags = 0; 2431 if (arg_flags & OCFS2_META_LOCK_NOQUEUE) 2432 dlm_flags |= DLM_LKF_NOQUEUE; 2433 2434 status = __ocfs2_cluster_lock(osb, lockres, level, dlm_flags, 2435 arg_flags, subclass, _RET_IP_); 2436 if (status < 0) { 2437 if (status != -EAGAIN) 2438 mlog_errno(status); 2439 goto bail; 2440 } 2441 2442 /* Notify the error cleanup path to drop the cluster lock. */ 2443 acquired = 1; 2444 2445 /* We wait twice because a node may have died while we were in 2446 * the lower dlm layers. The second time though, we've 2447 * committed to owning this lock so we don't allow signals to 2448 * abort the operation. */ 2449 if (!(arg_flags & OCFS2_META_LOCK_RECOVERY)) 2450 ocfs2_wait_for_recovery(osb); 2451 2452 update: 2453 /* 2454 * We only see this flag if we're being called from 2455 * ocfs2_read_locked_inode(). It means we're locking an inode 2456 * which hasn't been populated yet, so clear the refresh flag 2457 * and let the caller handle it. 2458 */ 2459 if (inode->i_state & I_NEW) { 2460 status = 0; 2461 if (lockres) 2462 ocfs2_complete_lock_res_refresh(lockres, 0); 2463 goto bail; 2464 } 2465 2466 /* This is fun. The caller may want a bh back, or it may 2467 * not. ocfs2_inode_lock_update definitely wants one in, but 2468 * may or may not read one, depending on what's in the 2469 * LVB. The result of all of this is that we've *only* gone to 2470 * disk if we have to, so the complexity is worthwhile. */ 2471 status = ocfs2_inode_lock_update(inode, &local_bh); 2472 if (status < 0) { 2473 if (status != -ENOENT) 2474 mlog_errno(status); 2475 goto bail; 2476 } 2477 getbh: 2478 if (ret_bh) { 2479 status = ocfs2_assign_bh(inode, ret_bh, local_bh); 2480 if (status < 0) { 2481 mlog_errno(status); 2482 goto bail; 2483 } 2484 } 2485 2486 bail: 2487 if (status < 0) { 2488 if (ret_bh && (*ret_bh)) { 2489 brelse(*ret_bh); 2490 *ret_bh = NULL; 2491 } 2492 if (acquired) 2493 ocfs2_inode_unlock(inode, ex); 2494 } 2495 2496 if (local_bh) 2497 brelse(local_bh); 2498 2499 return status; 2500 } 2501 2502 /* 2503 * This is working around a lock inversion between tasks acquiring DLM 2504 * locks while holding a page lock and the downconvert thread which 2505 * blocks dlm lock acquiry while acquiring page locks. 2506 * 2507 * ** These _with_page variantes are only intended to be called from aop 2508 * methods that hold page locks and return a very specific *positive* error 2509 * code that aop methods pass up to the VFS -- test for errors with != 0. ** 2510 * 2511 * The DLM is called such that it returns -EAGAIN if it would have 2512 * blocked waiting for the downconvert thread. In that case we unlock 2513 * our page so the downconvert thread can make progress. Once we've 2514 * done this we have to return AOP_TRUNCATED_PAGE so the aop method 2515 * that called us can bubble that back up into the VFS who will then 2516 * immediately retry the aop call. 2517 */ 2518 int ocfs2_inode_lock_with_page(struct inode *inode, 2519 struct buffer_head **ret_bh, 2520 int ex, 2521 struct page *page) 2522 { 2523 int ret; 2524 2525 ret = ocfs2_inode_lock_full(inode, ret_bh, ex, OCFS2_LOCK_NONBLOCK); 2526 if (ret == -EAGAIN) { 2527 unlock_page(page); 2528 /* 2529 * If we can't get inode lock immediately, we should not return 2530 * directly here, since this will lead to a softlockup problem. 2531 * The method is to get a blocking lock and immediately unlock 2532 * before returning, this can avoid CPU resource waste due to 2533 * lots of retries, and benefits fairness in getting lock. 2534 */ 2535 if (ocfs2_inode_lock(inode, ret_bh, ex) == 0) 2536 ocfs2_inode_unlock(inode, ex); 2537 ret = AOP_TRUNCATED_PAGE; 2538 } 2539 2540 return ret; 2541 } 2542 2543 int ocfs2_inode_lock_atime(struct inode *inode, 2544 struct vfsmount *vfsmnt, 2545 int *level, int wait) 2546 { 2547 int ret; 2548 2549 if (wait) 2550 ret = ocfs2_inode_lock(inode, NULL, 0); 2551 else 2552 ret = ocfs2_try_inode_lock(inode, NULL, 0); 2553 2554 if (ret < 0) { 2555 if (ret != -EAGAIN) 2556 mlog_errno(ret); 2557 return ret; 2558 } 2559 2560 /* 2561 * If we should update atime, we will get EX lock, 2562 * otherwise we just get PR lock. 2563 */ 2564 if (ocfs2_should_update_atime(inode, vfsmnt)) { 2565 struct buffer_head *bh = NULL; 2566 2567 ocfs2_inode_unlock(inode, 0); 2568 if (wait) 2569 ret = ocfs2_inode_lock(inode, &bh, 1); 2570 else 2571 ret = ocfs2_try_inode_lock(inode, &bh, 1); 2572 2573 if (ret < 0) { 2574 if (ret != -EAGAIN) 2575 mlog_errno(ret); 2576 return ret; 2577 } 2578 *level = 1; 2579 if (ocfs2_should_update_atime(inode, vfsmnt)) 2580 ocfs2_update_inode_atime(inode, bh); 2581 if (bh) 2582 brelse(bh); 2583 } else 2584 *level = 0; 2585 2586 return ret; 2587 } 2588 2589 void ocfs2_inode_unlock(struct inode *inode, 2590 int ex) 2591 { 2592 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2593 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_inode_lockres; 2594 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 2595 2596 mlog(0, "inode %llu drop %s META lock\n", 2597 (unsigned long long)OCFS2_I(inode)->ip_blkno, 2598 ex ? "EXMODE" : "PRMODE"); 2599 2600 if (!ocfs2_is_hard_readonly(osb) && 2601 !ocfs2_mount_local(osb)) 2602 ocfs2_cluster_unlock(osb, lockres, level); 2603 } 2604 2605 /* 2606 * This _tracker variantes are introduced to deal with the recursive cluster 2607 * locking issue. The idea is to keep track of a lock holder on the stack of 2608 * the current process. If there's a lock holder on the stack, we know the 2609 * task context is already protected by cluster locking. Currently, they're 2610 * used in some VFS entry routines. 2611 * 2612 * return < 0 on error, return == 0 if there's no lock holder on the stack 2613 * before this call, return == 1 if this call would be a recursive locking. 2614 * return == -1 if this lock attempt will cause an upgrade which is forbidden. 2615 * 2616 * When taking lock levels into account,we face some different situations. 2617 * 2618 * 1. no lock is held 2619 * In this case, just lock the inode as requested and return 0 2620 * 2621 * 2. We are holding a lock 2622 * For this situation, things diverges into several cases 2623 * 2624 * wanted holding what to do 2625 * ex ex see 2.1 below 2626 * ex pr see 2.2 below 2627 * pr ex see 2.1 below 2628 * pr pr see 2.1 below 2629 * 2630 * 2.1 lock level that is been held is compatible 2631 * with the wanted level, so no lock action will be tacken. 2632 * 2633 * 2.2 Otherwise, an upgrade is needed, but it is forbidden. 2634 * 2635 * Reason why upgrade within a process is forbidden is that 2636 * lock upgrade may cause dead lock. The following illustrates 2637 * how it happens. 2638 * 2639 * thread on node1 thread on node2 2640 * ocfs2_inode_lock_tracker(ex=0) 2641 * 2642 * <====== ocfs2_inode_lock_tracker(ex=1) 2643 * 2644 * ocfs2_inode_lock_tracker(ex=1) 2645 */ 2646 int ocfs2_inode_lock_tracker(struct inode *inode, 2647 struct buffer_head **ret_bh, 2648 int ex, 2649 struct ocfs2_lock_holder *oh) 2650 { 2651 int status = 0; 2652 struct ocfs2_lock_res *lockres; 2653 struct ocfs2_lock_holder *tmp_oh; 2654 struct pid *pid = task_pid(current); 2655 2656 2657 lockres = &OCFS2_I(inode)->ip_inode_lockres; 2658 tmp_oh = ocfs2_pid_holder(lockres, pid); 2659 2660 if (!tmp_oh) { 2661 /* 2662 * This corresponds to the case 1. 2663 * We haven't got any lock before. 2664 */ 2665 status = ocfs2_inode_lock_full(inode, ret_bh, ex, 0); 2666 if (status < 0) { 2667 if (status != -ENOENT) 2668 mlog_errno(status); 2669 return status; 2670 } 2671 2672 oh->oh_ex = ex; 2673 ocfs2_add_holder(lockres, oh); 2674 return 0; 2675 } 2676 2677 if (unlikely(ex && !tmp_oh->oh_ex)) { 2678 /* 2679 * case 2.2 upgrade may cause dead lock, forbid it. 2680 */ 2681 mlog(ML_ERROR, "Recursive locking is not permitted to " 2682 "upgrade to EX level from PR level.\n"); 2683 dump_stack(); 2684 return -EINVAL; 2685 } 2686 2687 /* 2688 * case 2.1 OCFS2_META_LOCK_GETBH flag make ocfs2_inode_lock_full. 2689 * ignore the lock level and just update it. 2690 */ 2691 if (ret_bh) { 2692 status = ocfs2_inode_lock_full(inode, ret_bh, ex, 2693 OCFS2_META_LOCK_GETBH); 2694 if (status < 0) { 2695 if (status != -ENOENT) 2696 mlog_errno(status); 2697 return status; 2698 } 2699 } 2700 return tmp_oh ? 1 : 0; 2701 } 2702 2703 void ocfs2_inode_unlock_tracker(struct inode *inode, 2704 int ex, 2705 struct ocfs2_lock_holder *oh, 2706 int had_lock) 2707 { 2708 struct ocfs2_lock_res *lockres; 2709 2710 lockres = &OCFS2_I(inode)->ip_inode_lockres; 2711 /* had_lock means that the currect process already takes the cluster 2712 * lock previously. 2713 * If had_lock is 1, we have nothing to do here. 2714 * If had_lock is 0, we will release the lock. 2715 */ 2716 if (!had_lock) { 2717 ocfs2_inode_unlock(inode, oh->oh_ex); 2718 ocfs2_remove_holder(lockres, oh); 2719 } 2720 } 2721 2722 int ocfs2_orphan_scan_lock(struct ocfs2_super *osb, u32 *seqno) 2723 { 2724 struct ocfs2_lock_res *lockres; 2725 struct ocfs2_orphan_scan_lvb *lvb; 2726 int status = 0; 2727 2728 if (ocfs2_is_hard_readonly(osb)) 2729 return -EROFS; 2730 2731 if (ocfs2_mount_local(osb)) 2732 return 0; 2733 2734 lockres = &osb->osb_orphan_scan.os_lockres; 2735 status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0); 2736 if (status < 0) 2737 return status; 2738 2739 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2740 if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) && 2741 lvb->lvb_version == OCFS2_ORPHAN_LVB_VERSION) 2742 *seqno = be32_to_cpu(lvb->lvb_os_seqno); 2743 else 2744 *seqno = osb->osb_orphan_scan.os_seqno + 1; 2745 2746 return status; 2747 } 2748 2749 void ocfs2_orphan_scan_unlock(struct ocfs2_super *osb, u32 seqno) 2750 { 2751 struct ocfs2_lock_res *lockres; 2752 struct ocfs2_orphan_scan_lvb *lvb; 2753 2754 if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb)) { 2755 lockres = &osb->osb_orphan_scan.os_lockres; 2756 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2757 lvb->lvb_version = OCFS2_ORPHAN_LVB_VERSION; 2758 lvb->lvb_os_seqno = cpu_to_be32(seqno); 2759 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX); 2760 } 2761 } 2762 2763 int ocfs2_super_lock(struct ocfs2_super *osb, 2764 int ex) 2765 { 2766 int status = 0; 2767 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2768 struct ocfs2_lock_res *lockres = &osb->osb_super_lockres; 2769 2770 if (ocfs2_is_hard_readonly(osb)) 2771 return -EROFS; 2772 2773 if (ocfs2_mount_local(osb)) 2774 goto bail; 2775 2776 status = ocfs2_cluster_lock(osb, lockres, level, 0, 0); 2777 if (status < 0) { 2778 mlog_errno(status); 2779 goto bail; 2780 } 2781 2782 /* The super block lock path is really in the best position to 2783 * know when resources covered by the lock need to be 2784 * refreshed, so we do it here. Of course, making sense of 2785 * everything is up to the caller :) */ 2786 status = ocfs2_should_refresh_lock_res(lockres); 2787 if (status) { 2788 status = ocfs2_refresh_slot_info(osb); 2789 2790 ocfs2_complete_lock_res_refresh(lockres, status); 2791 2792 if (status < 0) { 2793 ocfs2_cluster_unlock(osb, lockres, level); 2794 mlog_errno(status); 2795 } 2796 ocfs2_track_lock_refresh(lockres); 2797 } 2798 bail: 2799 return status; 2800 } 2801 2802 void ocfs2_super_unlock(struct ocfs2_super *osb, 2803 int ex) 2804 { 2805 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2806 struct ocfs2_lock_res *lockres = &osb->osb_super_lockres; 2807 2808 if (!ocfs2_mount_local(osb)) 2809 ocfs2_cluster_unlock(osb, lockres, level); 2810 } 2811 2812 int ocfs2_rename_lock(struct ocfs2_super *osb) 2813 { 2814 int status; 2815 struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres; 2816 2817 if (ocfs2_is_hard_readonly(osb)) 2818 return -EROFS; 2819 2820 if (ocfs2_mount_local(osb)) 2821 return 0; 2822 2823 status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0); 2824 if (status < 0) 2825 mlog_errno(status); 2826 2827 return status; 2828 } 2829 2830 void ocfs2_rename_unlock(struct ocfs2_super *osb) 2831 { 2832 struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres; 2833 2834 if (!ocfs2_mount_local(osb)) 2835 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX); 2836 } 2837 2838 int ocfs2_nfs_sync_lock(struct ocfs2_super *osb, int ex) 2839 { 2840 int status; 2841 struct ocfs2_lock_res *lockres = &osb->osb_nfs_sync_lockres; 2842 2843 if (ocfs2_is_hard_readonly(osb)) 2844 return -EROFS; 2845 2846 if (ocfs2_mount_local(osb)) 2847 return 0; 2848 2849 status = ocfs2_cluster_lock(osb, lockres, ex ? LKM_EXMODE : LKM_PRMODE, 2850 0, 0); 2851 if (status < 0) 2852 mlog(ML_ERROR, "lock on nfs sync lock failed %d\n", status); 2853 2854 return status; 2855 } 2856 2857 void ocfs2_nfs_sync_unlock(struct ocfs2_super *osb, int ex) 2858 { 2859 struct ocfs2_lock_res *lockres = &osb->osb_nfs_sync_lockres; 2860 2861 if (!ocfs2_mount_local(osb)) 2862 ocfs2_cluster_unlock(osb, lockres, 2863 ex ? LKM_EXMODE : LKM_PRMODE); 2864 } 2865 2866 int ocfs2_trim_fs_lock(struct ocfs2_super *osb, 2867 struct ocfs2_trim_fs_info *info, int trylock) 2868 { 2869 int status; 2870 struct ocfs2_trim_fs_lvb *lvb; 2871 struct ocfs2_lock_res *lockres = &osb->osb_trim_fs_lockres; 2872 2873 if (info) 2874 info->tf_valid = 0; 2875 2876 if (ocfs2_is_hard_readonly(osb)) 2877 return -EROFS; 2878 2879 if (ocfs2_mount_local(osb)) 2880 return 0; 2881 2882 status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 2883 trylock ? DLM_LKF_NOQUEUE : 0, 0); 2884 if (status < 0) { 2885 if (status != -EAGAIN) 2886 mlog_errno(status); 2887 return status; 2888 } 2889 2890 if (info) { 2891 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2892 if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) && 2893 lvb->lvb_version == OCFS2_TRIMFS_LVB_VERSION) { 2894 info->tf_valid = 1; 2895 info->tf_success = lvb->lvb_success; 2896 info->tf_nodenum = be32_to_cpu(lvb->lvb_nodenum); 2897 info->tf_start = be64_to_cpu(lvb->lvb_start); 2898 info->tf_len = be64_to_cpu(lvb->lvb_len); 2899 info->tf_minlen = be64_to_cpu(lvb->lvb_minlen); 2900 info->tf_trimlen = be64_to_cpu(lvb->lvb_trimlen); 2901 } 2902 } 2903 2904 return status; 2905 } 2906 2907 void ocfs2_trim_fs_unlock(struct ocfs2_super *osb, 2908 struct ocfs2_trim_fs_info *info) 2909 { 2910 struct ocfs2_trim_fs_lvb *lvb; 2911 struct ocfs2_lock_res *lockres = &osb->osb_trim_fs_lockres; 2912 2913 if (ocfs2_mount_local(osb)) 2914 return; 2915 2916 if (info) { 2917 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2918 lvb->lvb_version = OCFS2_TRIMFS_LVB_VERSION; 2919 lvb->lvb_success = info->tf_success; 2920 lvb->lvb_nodenum = cpu_to_be32(info->tf_nodenum); 2921 lvb->lvb_start = cpu_to_be64(info->tf_start); 2922 lvb->lvb_len = cpu_to_be64(info->tf_len); 2923 lvb->lvb_minlen = cpu_to_be64(info->tf_minlen); 2924 lvb->lvb_trimlen = cpu_to_be64(info->tf_trimlen); 2925 } 2926 2927 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX); 2928 } 2929 2930 int ocfs2_dentry_lock(struct dentry *dentry, int ex) 2931 { 2932 int ret; 2933 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2934 struct ocfs2_dentry_lock *dl = dentry->d_fsdata; 2935 struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb); 2936 2937 BUG_ON(!dl); 2938 2939 if (ocfs2_is_hard_readonly(osb)) { 2940 if (ex) 2941 return -EROFS; 2942 return 0; 2943 } 2944 2945 if (ocfs2_mount_local(osb)) 2946 return 0; 2947 2948 ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0); 2949 if (ret < 0) 2950 mlog_errno(ret); 2951 2952 return ret; 2953 } 2954 2955 void ocfs2_dentry_unlock(struct dentry *dentry, int ex) 2956 { 2957 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2958 struct ocfs2_dentry_lock *dl = dentry->d_fsdata; 2959 struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb); 2960 2961 if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb)) 2962 ocfs2_cluster_unlock(osb, &dl->dl_lockres, level); 2963 } 2964 2965 /* Reference counting of the dlm debug structure. We want this because 2966 * open references on the debug inodes can live on after a mount, so 2967 * we can't rely on the ocfs2_super to always exist. */ 2968 static void ocfs2_dlm_debug_free(struct kref *kref) 2969 { 2970 struct ocfs2_dlm_debug *dlm_debug; 2971 2972 dlm_debug = container_of(kref, struct ocfs2_dlm_debug, d_refcnt); 2973 2974 kfree(dlm_debug); 2975 } 2976 2977 void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug) 2978 { 2979 if (dlm_debug) 2980 kref_put(&dlm_debug->d_refcnt, ocfs2_dlm_debug_free); 2981 } 2982 2983 static void ocfs2_get_dlm_debug(struct ocfs2_dlm_debug *debug) 2984 { 2985 kref_get(&debug->d_refcnt); 2986 } 2987 2988 struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void) 2989 { 2990 struct ocfs2_dlm_debug *dlm_debug; 2991 2992 dlm_debug = kmalloc(sizeof(struct ocfs2_dlm_debug), GFP_KERNEL); 2993 if (!dlm_debug) { 2994 mlog_errno(-ENOMEM); 2995 goto out; 2996 } 2997 2998 kref_init(&dlm_debug->d_refcnt); 2999 INIT_LIST_HEAD(&dlm_debug->d_lockres_tracking); 3000 dlm_debug->d_locking_state = NULL; 3001 out: 3002 return dlm_debug; 3003 } 3004 3005 /* Access to this is arbitrated for us via seq_file->sem. */ 3006 struct ocfs2_dlm_seq_priv { 3007 struct ocfs2_dlm_debug *p_dlm_debug; 3008 struct ocfs2_lock_res p_iter_res; 3009 struct ocfs2_lock_res p_tmp_res; 3010 }; 3011 3012 static struct ocfs2_lock_res *ocfs2_dlm_next_res(struct ocfs2_lock_res *start, 3013 struct ocfs2_dlm_seq_priv *priv) 3014 { 3015 struct ocfs2_lock_res *iter, *ret = NULL; 3016 struct ocfs2_dlm_debug *dlm_debug = priv->p_dlm_debug; 3017 3018 assert_spin_locked(&ocfs2_dlm_tracking_lock); 3019 3020 list_for_each_entry(iter, &start->l_debug_list, l_debug_list) { 3021 /* discover the head of the list */ 3022 if (&iter->l_debug_list == &dlm_debug->d_lockres_tracking) { 3023 mlog(0, "End of list found, %p\n", ret); 3024 break; 3025 } 3026 3027 /* We track our "dummy" iteration lockres' by a NULL 3028 * l_ops field. */ 3029 if (iter->l_ops != NULL) { 3030 ret = iter; 3031 break; 3032 } 3033 } 3034 3035 return ret; 3036 } 3037 3038 static void *ocfs2_dlm_seq_start(struct seq_file *m, loff_t *pos) 3039 { 3040 struct ocfs2_dlm_seq_priv *priv = m->private; 3041 struct ocfs2_lock_res *iter; 3042 3043 spin_lock(&ocfs2_dlm_tracking_lock); 3044 iter = ocfs2_dlm_next_res(&priv->p_iter_res, priv); 3045 if (iter) { 3046 /* Since lockres' have the lifetime of their container 3047 * (which can be inodes, ocfs2_supers, etc) we want to 3048 * copy this out to a temporary lockres while still 3049 * under the spinlock. Obviously after this we can't 3050 * trust any pointers on the copy returned, but that's 3051 * ok as the information we want isn't typically held 3052 * in them. */ 3053 priv->p_tmp_res = *iter; 3054 iter = &priv->p_tmp_res; 3055 } 3056 spin_unlock(&ocfs2_dlm_tracking_lock); 3057 3058 return iter; 3059 } 3060 3061 static void ocfs2_dlm_seq_stop(struct seq_file *m, void *v) 3062 { 3063 } 3064 3065 static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos) 3066 { 3067 struct ocfs2_dlm_seq_priv *priv = m->private; 3068 struct ocfs2_lock_res *iter = v; 3069 struct ocfs2_lock_res *dummy = &priv->p_iter_res; 3070 3071 spin_lock(&ocfs2_dlm_tracking_lock); 3072 iter = ocfs2_dlm_next_res(iter, priv); 3073 list_del_init(&dummy->l_debug_list); 3074 if (iter) { 3075 list_add(&dummy->l_debug_list, &iter->l_debug_list); 3076 priv->p_tmp_res = *iter; 3077 iter = &priv->p_tmp_res; 3078 } 3079 spin_unlock(&ocfs2_dlm_tracking_lock); 3080 3081 return iter; 3082 } 3083 3084 /* 3085 * Version is used by debugfs.ocfs2 to determine the format being used 3086 * 3087 * New in version 2 3088 * - Lock stats printed 3089 * New in version 3 3090 * - Max time in lock stats is in usecs (instead of nsecs) 3091 */ 3092 #define OCFS2_DLM_DEBUG_STR_VERSION 3 3093 static int ocfs2_dlm_seq_show(struct seq_file *m, void *v) 3094 { 3095 int i; 3096 char *lvb; 3097 struct ocfs2_lock_res *lockres = v; 3098 3099 if (!lockres) 3100 return -EINVAL; 3101 3102 seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION); 3103 3104 if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY) 3105 seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1, 3106 lockres->l_name, 3107 (unsigned int)ocfs2_get_dentry_lock_ino(lockres)); 3108 else 3109 seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name); 3110 3111 seq_printf(m, "%d\t" 3112 "0x%lx\t" 3113 "0x%x\t" 3114 "0x%x\t" 3115 "%u\t" 3116 "%u\t" 3117 "%d\t" 3118 "%d\t", 3119 lockres->l_level, 3120 lockres->l_flags, 3121 lockres->l_action, 3122 lockres->l_unlock_action, 3123 lockres->l_ro_holders, 3124 lockres->l_ex_holders, 3125 lockres->l_requested, 3126 lockres->l_blocking); 3127 3128 /* Dump the raw LVB */ 3129 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 3130 for(i = 0; i < DLM_LVB_LEN; i++) 3131 seq_printf(m, "0x%x\t", lvb[i]); 3132 3133 #ifdef CONFIG_OCFS2_FS_STATS 3134 # define lock_num_prmode(_l) ((_l)->l_lock_prmode.ls_gets) 3135 # define lock_num_exmode(_l) ((_l)->l_lock_exmode.ls_gets) 3136 # define lock_num_prmode_failed(_l) ((_l)->l_lock_prmode.ls_fail) 3137 # define lock_num_exmode_failed(_l) ((_l)->l_lock_exmode.ls_fail) 3138 # define lock_total_prmode(_l) ((_l)->l_lock_prmode.ls_total) 3139 # define lock_total_exmode(_l) ((_l)->l_lock_exmode.ls_total) 3140 # define lock_max_prmode(_l) ((_l)->l_lock_prmode.ls_max) 3141 # define lock_max_exmode(_l) ((_l)->l_lock_exmode.ls_max) 3142 # define lock_refresh(_l) ((_l)->l_lock_refresh) 3143 #else 3144 # define lock_num_prmode(_l) (0) 3145 # define lock_num_exmode(_l) (0) 3146 # define lock_num_prmode_failed(_l) (0) 3147 # define lock_num_exmode_failed(_l) (0) 3148 # define lock_total_prmode(_l) (0ULL) 3149 # define lock_total_exmode(_l) (0ULL) 3150 # define lock_max_prmode(_l) (0) 3151 # define lock_max_exmode(_l) (0) 3152 # define lock_refresh(_l) (0) 3153 #endif 3154 /* The following seq_print was added in version 2 of this output */ 3155 seq_printf(m, "%u\t" 3156 "%u\t" 3157 "%u\t" 3158 "%u\t" 3159 "%llu\t" 3160 "%llu\t" 3161 "%u\t" 3162 "%u\t" 3163 "%u\t", 3164 lock_num_prmode(lockres), 3165 lock_num_exmode(lockres), 3166 lock_num_prmode_failed(lockres), 3167 lock_num_exmode_failed(lockres), 3168 lock_total_prmode(lockres), 3169 lock_total_exmode(lockres), 3170 lock_max_prmode(lockres), 3171 lock_max_exmode(lockres), 3172 lock_refresh(lockres)); 3173 3174 /* End the line */ 3175 seq_printf(m, "\n"); 3176 return 0; 3177 } 3178 3179 static const struct seq_operations ocfs2_dlm_seq_ops = { 3180 .start = ocfs2_dlm_seq_start, 3181 .stop = ocfs2_dlm_seq_stop, 3182 .next = ocfs2_dlm_seq_next, 3183 .show = ocfs2_dlm_seq_show, 3184 }; 3185 3186 static int ocfs2_dlm_debug_release(struct inode *inode, struct file *file) 3187 { 3188 struct seq_file *seq = file->private_data; 3189 struct ocfs2_dlm_seq_priv *priv = seq->private; 3190 struct ocfs2_lock_res *res = &priv->p_iter_res; 3191 3192 ocfs2_remove_lockres_tracking(res); 3193 ocfs2_put_dlm_debug(priv->p_dlm_debug); 3194 return seq_release_private(inode, file); 3195 } 3196 3197 static int ocfs2_dlm_debug_open(struct inode *inode, struct file *file) 3198 { 3199 struct ocfs2_dlm_seq_priv *priv; 3200 struct ocfs2_super *osb; 3201 3202 priv = __seq_open_private(file, &ocfs2_dlm_seq_ops, sizeof(*priv)); 3203 if (!priv) { 3204 mlog_errno(-ENOMEM); 3205 return -ENOMEM; 3206 } 3207 3208 osb = inode->i_private; 3209 ocfs2_get_dlm_debug(osb->osb_dlm_debug); 3210 priv->p_dlm_debug = osb->osb_dlm_debug; 3211 INIT_LIST_HEAD(&priv->p_iter_res.l_debug_list); 3212 3213 ocfs2_add_lockres_tracking(&priv->p_iter_res, 3214 priv->p_dlm_debug); 3215 3216 return 0; 3217 } 3218 3219 static const struct file_operations ocfs2_dlm_debug_fops = { 3220 .open = ocfs2_dlm_debug_open, 3221 .release = ocfs2_dlm_debug_release, 3222 .read = seq_read, 3223 .llseek = seq_lseek, 3224 }; 3225 3226 static int ocfs2_dlm_init_debug(struct ocfs2_super *osb) 3227 { 3228 int ret = 0; 3229 struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug; 3230 3231 dlm_debug->d_locking_state = debugfs_create_file("locking_state", 3232 S_IFREG|S_IRUSR, 3233 osb->osb_debug_root, 3234 osb, 3235 &ocfs2_dlm_debug_fops); 3236 if (!dlm_debug->d_locking_state) { 3237 ret = -EINVAL; 3238 mlog(ML_ERROR, 3239 "Unable to create locking state debugfs file.\n"); 3240 goto out; 3241 } 3242 3243 ocfs2_get_dlm_debug(dlm_debug); 3244 out: 3245 return ret; 3246 } 3247 3248 static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb) 3249 { 3250 struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug; 3251 3252 if (dlm_debug) { 3253 debugfs_remove(dlm_debug->d_locking_state); 3254 ocfs2_put_dlm_debug(dlm_debug); 3255 } 3256 } 3257 3258 int ocfs2_dlm_init(struct ocfs2_super *osb) 3259 { 3260 int status = 0; 3261 struct ocfs2_cluster_connection *conn = NULL; 3262 3263 if (ocfs2_mount_local(osb)) { 3264 osb->node_num = 0; 3265 goto local; 3266 } 3267 3268 status = ocfs2_dlm_init_debug(osb); 3269 if (status < 0) { 3270 mlog_errno(status); 3271 goto bail; 3272 } 3273 3274 /* launch downconvert thread */ 3275 osb->dc_task = kthread_run(ocfs2_downconvert_thread, osb, "ocfs2dc-%s", 3276 osb->uuid_str); 3277 if (IS_ERR(osb->dc_task)) { 3278 status = PTR_ERR(osb->dc_task); 3279 osb->dc_task = NULL; 3280 mlog_errno(status); 3281 goto bail; 3282 } 3283 3284 /* for now, uuid == domain */ 3285 status = ocfs2_cluster_connect(osb->osb_cluster_stack, 3286 osb->osb_cluster_name, 3287 strlen(osb->osb_cluster_name), 3288 osb->uuid_str, 3289 strlen(osb->uuid_str), 3290 &lproto, ocfs2_do_node_down, osb, 3291 &conn); 3292 if (status) { 3293 mlog_errno(status); 3294 goto bail; 3295 } 3296 3297 status = ocfs2_cluster_this_node(conn, &osb->node_num); 3298 if (status < 0) { 3299 mlog_errno(status); 3300 mlog(ML_ERROR, 3301 "could not find this host's node number\n"); 3302 ocfs2_cluster_disconnect(conn, 0); 3303 goto bail; 3304 } 3305 3306 local: 3307 ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb); 3308 ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb); 3309 ocfs2_nfs_sync_lock_res_init(&osb->osb_nfs_sync_lockres, osb); 3310 ocfs2_orphan_scan_lock_res_init(&osb->osb_orphan_scan.os_lockres, osb); 3311 3312 osb->cconn = conn; 3313 bail: 3314 if (status < 0) { 3315 ocfs2_dlm_shutdown_debug(osb); 3316 if (osb->dc_task) 3317 kthread_stop(osb->dc_task); 3318 } 3319 3320 return status; 3321 } 3322 3323 void ocfs2_dlm_shutdown(struct ocfs2_super *osb, 3324 int hangup_pending) 3325 { 3326 ocfs2_drop_osb_locks(osb); 3327 3328 /* 3329 * Now that we have dropped all locks and ocfs2_dismount_volume() 3330 * has disabled recovery, the DLM won't be talking to us. It's 3331 * safe to tear things down before disconnecting the cluster. 3332 */ 3333 3334 if (osb->dc_task) { 3335 kthread_stop(osb->dc_task); 3336 osb->dc_task = NULL; 3337 } 3338 3339 ocfs2_lock_res_free(&osb->osb_super_lockres); 3340 ocfs2_lock_res_free(&osb->osb_rename_lockres); 3341 ocfs2_lock_res_free(&osb->osb_nfs_sync_lockres); 3342 ocfs2_lock_res_free(&osb->osb_orphan_scan.os_lockres); 3343 3344 ocfs2_cluster_disconnect(osb->cconn, hangup_pending); 3345 osb->cconn = NULL; 3346 3347 ocfs2_dlm_shutdown_debug(osb); 3348 } 3349 3350 static int ocfs2_drop_lock(struct ocfs2_super *osb, 3351 struct ocfs2_lock_res *lockres) 3352 { 3353 int ret; 3354 unsigned long flags; 3355 u32 lkm_flags = 0; 3356 3357 /* We didn't get anywhere near actually using this lockres. */ 3358 if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED)) 3359 goto out; 3360 3361 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) 3362 lkm_flags |= DLM_LKF_VALBLK; 3363 3364 spin_lock_irqsave(&lockres->l_lock, flags); 3365 3366 mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_FREEING), 3367 "lockres %s, flags 0x%lx\n", 3368 lockres->l_name, lockres->l_flags); 3369 3370 while (lockres->l_flags & OCFS2_LOCK_BUSY) { 3371 mlog(0, "waiting on busy lock \"%s\": flags = %lx, action = " 3372 "%u, unlock_action = %u\n", 3373 lockres->l_name, lockres->l_flags, lockres->l_action, 3374 lockres->l_unlock_action); 3375 3376 spin_unlock_irqrestore(&lockres->l_lock, flags); 3377 3378 /* XXX: Today we just wait on any busy 3379 * locks... Perhaps we need to cancel converts in the 3380 * future? */ 3381 ocfs2_wait_on_busy_lock(lockres); 3382 3383 spin_lock_irqsave(&lockres->l_lock, flags); 3384 } 3385 3386 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) { 3387 if (lockres->l_flags & OCFS2_LOCK_ATTACHED && 3388 lockres->l_level == DLM_LOCK_EX && 3389 !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) 3390 lockres->l_ops->set_lvb(lockres); 3391 } 3392 3393 if (lockres->l_flags & OCFS2_LOCK_BUSY) 3394 mlog(ML_ERROR, "destroying busy lock: \"%s\"\n", 3395 lockres->l_name); 3396 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) 3397 mlog(0, "destroying blocked lock: \"%s\"\n", lockres->l_name); 3398 3399 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 3400 spin_unlock_irqrestore(&lockres->l_lock, flags); 3401 goto out; 3402 } 3403 3404 lockres_clear_flags(lockres, OCFS2_LOCK_ATTACHED); 3405 3406 /* make sure we never get here while waiting for an ast to 3407 * fire. */ 3408 BUG_ON(lockres->l_action != OCFS2_AST_INVALID); 3409 3410 /* is this necessary? */ 3411 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 3412 lockres->l_unlock_action = OCFS2_UNLOCK_DROP_LOCK; 3413 spin_unlock_irqrestore(&lockres->l_lock, flags); 3414 3415 mlog(0, "lock %s\n", lockres->l_name); 3416 3417 ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, lkm_flags); 3418 if (ret) { 3419 ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres); 3420 mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags); 3421 ocfs2_dlm_dump_lksb(&lockres->l_lksb); 3422 BUG(); 3423 } 3424 mlog(0, "lock %s, successful return from ocfs2_dlm_unlock\n", 3425 lockres->l_name); 3426 3427 ocfs2_wait_on_busy_lock(lockres); 3428 out: 3429 return 0; 3430 } 3431 3432 static void ocfs2_process_blocked_lock(struct ocfs2_super *osb, 3433 struct ocfs2_lock_res *lockres); 3434 3435 /* Mark the lockres as being dropped. It will no longer be 3436 * queued if blocking, but we still may have to wait on it 3437 * being dequeued from the downconvert thread before we can consider 3438 * it safe to drop. 3439 * 3440 * You can *not* attempt to call cluster_lock on this lockres anymore. */ 3441 void ocfs2_mark_lockres_freeing(struct ocfs2_super *osb, 3442 struct ocfs2_lock_res *lockres) 3443 { 3444 int status; 3445 struct ocfs2_mask_waiter mw; 3446 unsigned long flags, flags2; 3447 3448 ocfs2_init_mask_waiter(&mw); 3449 3450 spin_lock_irqsave(&lockres->l_lock, flags); 3451 lockres->l_flags |= OCFS2_LOCK_FREEING; 3452 if (lockres->l_flags & OCFS2_LOCK_QUEUED && current == osb->dc_task) { 3453 /* 3454 * We know the downconvert is queued but not in progress 3455 * because we are the downconvert thread and processing 3456 * different lock. So we can just remove the lock from the 3457 * queue. This is not only an optimization but also a way 3458 * to avoid the following deadlock: 3459 * ocfs2_dentry_post_unlock() 3460 * ocfs2_dentry_lock_put() 3461 * ocfs2_drop_dentry_lock() 3462 * iput() 3463 * ocfs2_evict_inode() 3464 * ocfs2_clear_inode() 3465 * ocfs2_mark_lockres_freeing() 3466 * ... blocks waiting for OCFS2_LOCK_QUEUED 3467 * since we are the downconvert thread which 3468 * should clear the flag. 3469 */ 3470 spin_unlock_irqrestore(&lockres->l_lock, flags); 3471 spin_lock_irqsave(&osb->dc_task_lock, flags2); 3472 list_del_init(&lockres->l_blocked_list); 3473 osb->blocked_lock_count--; 3474 spin_unlock_irqrestore(&osb->dc_task_lock, flags2); 3475 /* 3476 * Warn if we recurse into another post_unlock call. Strictly 3477 * speaking it isn't a problem but we need to be careful if 3478 * that happens (stack overflow, deadlocks, ...) so warn if 3479 * ocfs2 grows a path for which this can happen. 3480 */ 3481 WARN_ON_ONCE(lockres->l_ops->post_unlock); 3482 /* Since the lock is freeing we don't do much in the fn below */ 3483 ocfs2_process_blocked_lock(osb, lockres); 3484 return; 3485 } 3486 while (lockres->l_flags & OCFS2_LOCK_QUEUED) { 3487 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_QUEUED, 0); 3488 spin_unlock_irqrestore(&lockres->l_lock, flags); 3489 3490 mlog(0, "Waiting on lockres %s\n", lockres->l_name); 3491 3492 status = ocfs2_wait_for_mask(&mw); 3493 if (status) 3494 mlog_errno(status); 3495 3496 spin_lock_irqsave(&lockres->l_lock, flags); 3497 } 3498 spin_unlock_irqrestore(&lockres->l_lock, flags); 3499 } 3500 3501 void ocfs2_simple_drop_lockres(struct ocfs2_super *osb, 3502 struct ocfs2_lock_res *lockres) 3503 { 3504 int ret; 3505 3506 ocfs2_mark_lockres_freeing(osb, lockres); 3507 ret = ocfs2_drop_lock(osb, lockres); 3508 if (ret) 3509 mlog_errno(ret); 3510 } 3511 3512 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb) 3513 { 3514 ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres); 3515 ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres); 3516 ocfs2_simple_drop_lockres(osb, &osb->osb_nfs_sync_lockres); 3517 ocfs2_simple_drop_lockres(osb, &osb->osb_orphan_scan.os_lockres); 3518 } 3519 3520 int ocfs2_drop_inode_locks(struct inode *inode) 3521 { 3522 int status, err; 3523 3524 /* No need to call ocfs2_mark_lockres_freeing here - 3525 * ocfs2_clear_inode has done it for us. */ 3526 3527 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 3528 &OCFS2_I(inode)->ip_open_lockres); 3529 if (err < 0) 3530 mlog_errno(err); 3531 3532 status = err; 3533 3534 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 3535 &OCFS2_I(inode)->ip_inode_lockres); 3536 if (err < 0) 3537 mlog_errno(err); 3538 if (err < 0 && !status) 3539 status = err; 3540 3541 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 3542 &OCFS2_I(inode)->ip_rw_lockres); 3543 if (err < 0) 3544 mlog_errno(err); 3545 if (err < 0 && !status) 3546 status = err; 3547 3548 return status; 3549 } 3550 3551 static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres, 3552 int new_level) 3553 { 3554 assert_spin_locked(&lockres->l_lock); 3555 3556 BUG_ON(lockres->l_blocking <= DLM_LOCK_NL); 3557 3558 if (lockres->l_level <= new_level) { 3559 mlog(ML_ERROR, "lockres %s, lvl %d <= %d, blcklst %d, mask %d, " 3560 "type %d, flags 0x%lx, hold %d %d, act %d %d, req %d, " 3561 "block %d, pgen %d\n", lockres->l_name, lockres->l_level, 3562 new_level, list_empty(&lockres->l_blocked_list), 3563 list_empty(&lockres->l_mask_waiters), lockres->l_type, 3564 lockres->l_flags, lockres->l_ro_holders, 3565 lockres->l_ex_holders, lockres->l_action, 3566 lockres->l_unlock_action, lockres->l_requested, 3567 lockres->l_blocking, lockres->l_pending_gen); 3568 BUG(); 3569 } 3570 3571 mlog(ML_BASTS, "lockres %s, level %d => %d, blocking %d\n", 3572 lockres->l_name, lockres->l_level, new_level, lockres->l_blocking); 3573 3574 lockres->l_action = OCFS2_AST_DOWNCONVERT; 3575 lockres->l_requested = new_level; 3576 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 3577 return lockres_set_pending(lockres); 3578 } 3579 3580 static int ocfs2_downconvert_lock(struct ocfs2_super *osb, 3581 struct ocfs2_lock_res *lockres, 3582 int new_level, 3583 int lvb, 3584 unsigned int generation) 3585 { 3586 int ret; 3587 u32 dlm_flags = DLM_LKF_CONVERT; 3588 3589 mlog(ML_BASTS, "lockres %s, level %d => %d\n", lockres->l_name, 3590 lockres->l_level, new_level); 3591 3592 /* 3593 * On DLM_LKF_VALBLK, fsdlm behaves differently with o2cb. It always 3594 * expects DLM_LKF_VALBLK being set if the LKB has LVB, so that 3595 * we can recover correctly from node failure. Otherwise, we may get 3596 * invalid LVB in LKB, but without DLM_SBF_VALNOTVALID being set. 3597 */ 3598 if (ocfs2_userspace_stack(osb) && 3599 lockres->l_ops->flags & LOCK_TYPE_USES_LVB) 3600 lvb = 1; 3601 3602 if (lvb) 3603 dlm_flags |= DLM_LKF_VALBLK; 3604 3605 ret = ocfs2_dlm_lock(osb->cconn, 3606 new_level, 3607 &lockres->l_lksb, 3608 dlm_flags, 3609 lockres->l_name, 3610 OCFS2_LOCK_ID_MAX_LEN - 1); 3611 lockres_clear_pending(lockres, generation, osb); 3612 if (ret) { 3613 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 3614 ocfs2_recover_from_dlm_error(lockres, 1); 3615 goto bail; 3616 } 3617 3618 ret = 0; 3619 bail: 3620 return ret; 3621 } 3622 3623 /* returns 1 when the caller should unlock and call ocfs2_dlm_unlock */ 3624 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb, 3625 struct ocfs2_lock_res *lockres) 3626 { 3627 assert_spin_locked(&lockres->l_lock); 3628 3629 if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) { 3630 /* If we're already trying to cancel a lock conversion 3631 * then just drop the spinlock and allow the caller to 3632 * requeue this lock. */ 3633 mlog(ML_BASTS, "lockres %s, skip convert\n", lockres->l_name); 3634 return 0; 3635 } 3636 3637 /* were we in a convert when we got the bast fire? */ 3638 BUG_ON(lockres->l_action != OCFS2_AST_CONVERT && 3639 lockres->l_action != OCFS2_AST_DOWNCONVERT); 3640 /* set things up for the unlockast to know to just 3641 * clear out the ast_action and unset busy, etc. */ 3642 lockres->l_unlock_action = OCFS2_UNLOCK_CANCEL_CONVERT; 3643 3644 mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_BUSY), 3645 "lock %s, invalid flags: 0x%lx\n", 3646 lockres->l_name, lockres->l_flags); 3647 3648 mlog(ML_BASTS, "lockres %s\n", lockres->l_name); 3649 3650 return 1; 3651 } 3652 3653 static int ocfs2_cancel_convert(struct ocfs2_super *osb, 3654 struct ocfs2_lock_res *lockres) 3655 { 3656 int ret; 3657 3658 ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, 3659 DLM_LKF_CANCEL); 3660 if (ret) { 3661 ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres); 3662 ocfs2_recover_from_dlm_error(lockres, 0); 3663 } 3664 3665 mlog(ML_BASTS, "lockres %s\n", lockres->l_name); 3666 3667 return ret; 3668 } 3669 3670 static int ocfs2_unblock_lock(struct ocfs2_super *osb, 3671 struct ocfs2_lock_res *lockres, 3672 struct ocfs2_unblock_ctl *ctl) 3673 { 3674 unsigned long flags; 3675 int blocking; 3676 int new_level; 3677 int level; 3678 int ret = 0; 3679 int set_lvb = 0; 3680 unsigned int gen; 3681 3682 spin_lock_irqsave(&lockres->l_lock, flags); 3683 3684 recheck: 3685 /* 3686 * Is it still blocking? If not, we have no more work to do. 3687 */ 3688 if (!(lockres->l_flags & OCFS2_LOCK_BLOCKED)) { 3689 BUG_ON(lockres->l_blocking != DLM_LOCK_NL); 3690 spin_unlock_irqrestore(&lockres->l_lock, flags); 3691 ret = 0; 3692 goto leave; 3693 } 3694 3695 if (lockres->l_flags & OCFS2_LOCK_BUSY) { 3696 /* XXX 3697 * This is a *big* race. The OCFS2_LOCK_PENDING flag 3698 * exists entirely for one reason - another thread has set 3699 * OCFS2_LOCK_BUSY, but has *NOT* yet called dlm_lock(). 3700 * 3701 * If we do ocfs2_cancel_convert() before the other thread 3702 * calls dlm_lock(), our cancel will do nothing. We will 3703 * get no ast, and we will have no way of knowing the 3704 * cancel failed. Meanwhile, the other thread will call 3705 * into dlm_lock() and wait...forever. 3706 * 3707 * Why forever? Because another node has asked for the 3708 * lock first; that's why we're here in unblock_lock(). 3709 * 3710 * The solution is OCFS2_LOCK_PENDING. When PENDING is 3711 * set, we just requeue the unblock. Only when the other 3712 * thread has called dlm_lock() and cleared PENDING will 3713 * we then cancel their request. 3714 * 3715 * All callers of dlm_lock() must set OCFS2_DLM_PENDING 3716 * at the same time they set OCFS2_DLM_BUSY. They must 3717 * clear OCFS2_DLM_PENDING after dlm_lock() returns. 3718 */ 3719 if (lockres->l_flags & OCFS2_LOCK_PENDING) { 3720 mlog(ML_BASTS, "lockres %s, ReQ: Pending\n", 3721 lockres->l_name); 3722 goto leave_requeue; 3723 } 3724 3725 ctl->requeue = 1; 3726 ret = ocfs2_prepare_cancel_convert(osb, lockres); 3727 spin_unlock_irqrestore(&lockres->l_lock, flags); 3728 if (ret) { 3729 ret = ocfs2_cancel_convert(osb, lockres); 3730 if (ret < 0) 3731 mlog_errno(ret); 3732 } 3733 goto leave; 3734 } 3735 3736 /* 3737 * This prevents livelocks. OCFS2_LOCK_UPCONVERT_FINISHING flag is 3738 * set when the ast is received for an upconvert just before the 3739 * OCFS2_LOCK_BUSY flag is cleared. Now if the fs received a bast 3740 * on the heels of the ast, we want to delay the downconvert just 3741 * enough to allow the up requestor to do its task. Because this 3742 * lock is in the blocked queue, the lock will be downconverted 3743 * as soon as the requestor is done with the lock. 3744 */ 3745 if (lockres->l_flags & OCFS2_LOCK_UPCONVERT_FINISHING) 3746 goto leave_requeue; 3747 3748 /* 3749 * How can we block and yet be at NL? We were trying to upconvert 3750 * from NL and got canceled. The code comes back here, and now 3751 * we notice and clear BLOCKING. 3752 */ 3753 if (lockres->l_level == DLM_LOCK_NL) { 3754 BUG_ON(lockres->l_ex_holders || lockres->l_ro_holders); 3755 mlog(ML_BASTS, "lockres %s, Aborting dc\n", lockres->l_name); 3756 lockres->l_blocking = DLM_LOCK_NL; 3757 lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED); 3758 spin_unlock_irqrestore(&lockres->l_lock, flags); 3759 goto leave; 3760 } 3761 3762 /* if we're blocking an exclusive and we have *any* holders, 3763 * then requeue. */ 3764 if ((lockres->l_blocking == DLM_LOCK_EX) 3765 && (lockres->l_ex_holders || lockres->l_ro_holders)) { 3766 mlog(ML_BASTS, "lockres %s, ReQ: EX/PR Holders %u,%u\n", 3767 lockres->l_name, lockres->l_ex_holders, 3768 lockres->l_ro_holders); 3769 goto leave_requeue; 3770 } 3771 3772 /* If it's a PR we're blocking, then only 3773 * requeue if we've got any EX holders */ 3774 if (lockres->l_blocking == DLM_LOCK_PR && 3775 lockres->l_ex_holders) { 3776 mlog(ML_BASTS, "lockres %s, ReQ: EX Holders %u\n", 3777 lockres->l_name, lockres->l_ex_holders); 3778 goto leave_requeue; 3779 } 3780 3781 /* 3782 * Can we get a lock in this state if the holder counts are 3783 * zero? The meta data unblock code used to check this. 3784 */ 3785 if ((lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 3786 && (lockres->l_flags & OCFS2_LOCK_REFRESHING)) { 3787 mlog(ML_BASTS, "lockres %s, ReQ: Lock Refreshing\n", 3788 lockres->l_name); 3789 goto leave_requeue; 3790 } 3791 3792 new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking); 3793 3794 if (lockres->l_ops->check_downconvert 3795 && !lockres->l_ops->check_downconvert(lockres, new_level)) { 3796 mlog(ML_BASTS, "lockres %s, ReQ: Checkpointing\n", 3797 lockres->l_name); 3798 goto leave_requeue; 3799 } 3800 3801 /* If we get here, then we know that there are no more 3802 * incompatible holders (and anyone asking for an incompatible 3803 * lock is blocked). We can now downconvert the lock */ 3804 if (!lockres->l_ops->downconvert_worker) 3805 goto downconvert; 3806 3807 /* Some lockres types want to do a bit of work before 3808 * downconverting a lock. Allow that here. The worker function 3809 * may sleep, so we save off a copy of what we're blocking as 3810 * it may change while we're not holding the spin lock. */ 3811 blocking = lockres->l_blocking; 3812 level = lockres->l_level; 3813 spin_unlock_irqrestore(&lockres->l_lock, flags); 3814 3815 ctl->unblock_action = lockres->l_ops->downconvert_worker(lockres, blocking); 3816 3817 if (ctl->unblock_action == UNBLOCK_STOP_POST) { 3818 mlog(ML_BASTS, "lockres %s, UNBLOCK_STOP_POST\n", 3819 lockres->l_name); 3820 goto leave; 3821 } 3822 3823 spin_lock_irqsave(&lockres->l_lock, flags); 3824 if ((blocking != lockres->l_blocking) || (level != lockres->l_level)) { 3825 /* If this changed underneath us, then we can't drop 3826 * it just yet. */ 3827 mlog(ML_BASTS, "lockres %s, block=%d:%d, level=%d:%d, " 3828 "Recheck\n", lockres->l_name, blocking, 3829 lockres->l_blocking, level, lockres->l_level); 3830 goto recheck; 3831 } 3832 3833 downconvert: 3834 ctl->requeue = 0; 3835 3836 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) { 3837 if (lockres->l_level == DLM_LOCK_EX) 3838 set_lvb = 1; 3839 3840 /* 3841 * We only set the lvb if the lock has been fully 3842 * refreshed - otherwise we risk setting stale 3843 * data. Otherwise, there's no need to actually clear 3844 * out the lvb here as it's value is still valid. 3845 */ 3846 if (set_lvb && !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) 3847 lockres->l_ops->set_lvb(lockres); 3848 } 3849 3850 gen = ocfs2_prepare_downconvert(lockres, new_level); 3851 spin_unlock_irqrestore(&lockres->l_lock, flags); 3852 ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb, 3853 gen); 3854 3855 leave: 3856 if (ret) 3857 mlog_errno(ret); 3858 return ret; 3859 3860 leave_requeue: 3861 spin_unlock_irqrestore(&lockres->l_lock, flags); 3862 ctl->requeue = 1; 3863 3864 return 0; 3865 } 3866 3867 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres, 3868 int blocking) 3869 { 3870 struct inode *inode; 3871 struct address_space *mapping; 3872 struct ocfs2_inode_info *oi; 3873 3874 inode = ocfs2_lock_res_inode(lockres); 3875 mapping = inode->i_mapping; 3876 3877 if (S_ISDIR(inode->i_mode)) { 3878 oi = OCFS2_I(inode); 3879 oi->ip_dir_lock_gen++; 3880 mlog(0, "generation: %u\n", oi->ip_dir_lock_gen); 3881 goto out; 3882 } 3883 3884 if (!S_ISREG(inode->i_mode)) 3885 goto out; 3886 3887 /* 3888 * We need this before the filemap_fdatawrite() so that it can 3889 * transfer the dirty bit from the PTE to the 3890 * page. Unfortunately this means that even for EX->PR 3891 * downconverts, we'll lose our mappings and have to build 3892 * them up again. 3893 */ 3894 unmap_mapping_range(mapping, 0, 0, 0); 3895 3896 if (filemap_fdatawrite(mapping)) { 3897 mlog(ML_ERROR, "Could not sync inode %llu for downconvert!", 3898 (unsigned long long)OCFS2_I(inode)->ip_blkno); 3899 } 3900 sync_mapping_buffers(mapping); 3901 if (blocking == DLM_LOCK_EX) { 3902 truncate_inode_pages(mapping, 0); 3903 } else { 3904 /* We only need to wait on the I/O if we're not also 3905 * truncating pages because truncate_inode_pages waits 3906 * for us above. We don't truncate pages if we're 3907 * blocking anything < EXMODE because we want to keep 3908 * them around in that case. */ 3909 filemap_fdatawait(mapping); 3910 } 3911 3912 forget_all_cached_acls(inode); 3913 3914 out: 3915 return UNBLOCK_CONTINUE; 3916 } 3917 3918 static int ocfs2_ci_checkpointed(struct ocfs2_caching_info *ci, 3919 struct ocfs2_lock_res *lockres, 3920 int new_level) 3921 { 3922 int checkpointed = ocfs2_ci_fully_checkpointed(ci); 3923 3924 BUG_ON(new_level != DLM_LOCK_NL && new_level != DLM_LOCK_PR); 3925 BUG_ON(lockres->l_level != DLM_LOCK_EX && !checkpointed); 3926 3927 if (checkpointed) 3928 return 1; 3929 3930 ocfs2_start_checkpoint(OCFS2_SB(ocfs2_metadata_cache_get_super(ci))); 3931 return 0; 3932 } 3933 3934 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres, 3935 int new_level) 3936 { 3937 struct inode *inode = ocfs2_lock_res_inode(lockres); 3938 3939 return ocfs2_ci_checkpointed(INODE_CACHE(inode), lockres, new_level); 3940 } 3941 3942 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres) 3943 { 3944 struct inode *inode = ocfs2_lock_res_inode(lockres); 3945 3946 __ocfs2_stuff_meta_lvb(inode); 3947 } 3948 3949 /* 3950 * Does the final reference drop on our dentry lock. Right now this 3951 * happens in the downconvert thread, but we could choose to simplify the 3952 * dlmglue API and push these off to the ocfs2_wq in the future. 3953 */ 3954 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb, 3955 struct ocfs2_lock_res *lockres) 3956 { 3957 struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres); 3958 ocfs2_dentry_lock_put(osb, dl); 3959 } 3960 3961 /* 3962 * d_delete() matching dentries before the lock downconvert. 3963 * 3964 * At this point, any process waiting to destroy the 3965 * dentry_lock due to last ref count is stopped by the 3966 * OCFS2_LOCK_QUEUED flag. 3967 * 3968 * We have two potential problems 3969 * 3970 * 1) If we do the last reference drop on our dentry_lock (via dput) 3971 * we'll wind up in ocfs2_release_dentry_lock(), waiting on 3972 * the downconvert to finish. Instead we take an elevated 3973 * reference and push the drop until after we've completed our 3974 * unblock processing. 3975 * 3976 * 2) There might be another process with a final reference, 3977 * waiting on us to finish processing. If this is the case, we 3978 * detect it and exit out - there's no more dentries anyway. 3979 */ 3980 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres, 3981 int blocking) 3982 { 3983 struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres); 3984 struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode); 3985 struct dentry *dentry; 3986 unsigned long flags; 3987 int extra_ref = 0; 3988 3989 /* 3990 * This node is blocking another node from getting a read 3991 * lock. This happens when we've renamed within a 3992 * directory. We've forced the other nodes to d_delete(), but 3993 * we never actually dropped our lock because it's still 3994 * valid. The downconvert code will retain a PR for this node, 3995 * so there's no further work to do. 3996 */ 3997 if (blocking == DLM_LOCK_PR) 3998 return UNBLOCK_CONTINUE; 3999 4000 /* 4001 * Mark this inode as potentially orphaned. The code in 4002 * ocfs2_delete_inode() will figure out whether it actually 4003 * needs to be freed or not. 4004 */ 4005 spin_lock(&oi->ip_lock); 4006 oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED; 4007 spin_unlock(&oi->ip_lock); 4008 4009 /* 4010 * Yuck. We need to make sure however that the check of 4011 * OCFS2_LOCK_FREEING and the extra reference are atomic with 4012 * respect to a reference decrement or the setting of that 4013 * flag. 4014 */ 4015 spin_lock_irqsave(&lockres->l_lock, flags); 4016 spin_lock(&dentry_attach_lock); 4017 if (!(lockres->l_flags & OCFS2_LOCK_FREEING) 4018 && dl->dl_count) { 4019 dl->dl_count++; 4020 extra_ref = 1; 4021 } 4022 spin_unlock(&dentry_attach_lock); 4023 spin_unlock_irqrestore(&lockres->l_lock, flags); 4024 4025 mlog(0, "extra_ref = %d\n", extra_ref); 4026 4027 /* 4028 * We have a process waiting on us in ocfs2_dentry_iput(), 4029 * which means we can't have any more outstanding 4030 * aliases. There's no need to do any more work. 4031 */ 4032 if (!extra_ref) 4033 return UNBLOCK_CONTINUE; 4034 4035 spin_lock(&dentry_attach_lock); 4036 while (1) { 4037 dentry = ocfs2_find_local_alias(dl->dl_inode, 4038 dl->dl_parent_blkno, 1); 4039 if (!dentry) 4040 break; 4041 spin_unlock(&dentry_attach_lock); 4042 4043 if (S_ISDIR(dl->dl_inode->i_mode)) 4044 shrink_dcache_parent(dentry); 4045 4046 mlog(0, "d_delete(%pd);\n", dentry); 4047 4048 /* 4049 * The following dcache calls may do an 4050 * iput(). Normally we don't want that from the 4051 * downconverting thread, but in this case it's ok 4052 * because the requesting node already has an 4053 * exclusive lock on the inode, so it can't be queued 4054 * for a downconvert. 4055 */ 4056 d_delete(dentry); 4057 dput(dentry); 4058 4059 spin_lock(&dentry_attach_lock); 4060 } 4061 spin_unlock(&dentry_attach_lock); 4062 4063 /* 4064 * If we are the last holder of this dentry lock, there is no 4065 * reason to downconvert so skip straight to the unlock. 4066 */ 4067 if (dl->dl_count == 1) 4068 return UNBLOCK_STOP_POST; 4069 4070 return UNBLOCK_CONTINUE_POST; 4071 } 4072 4073 static int ocfs2_check_refcount_downconvert(struct ocfs2_lock_res *lockres, 4074 int new_level) 4075 { 4076 struct ocfs2_refcount_tree *tree = 4077 ocfs2_lock_res_refcount_tree(lockres); 4078 4079 return ocfs2_ci_checkpointed(&tree->rf_ci, lockres, new_level); 4080 } 4081 4082 static int ocfs2_refcount_convert_worker(struct ocfs2_lock_res *lockres, 4083 int blocking) 4084 { 4085 struct ocfs2_refcount_tree *tree = 4086 ocfs2_lock_res_refcount_tree(lockres); 4087 4088 ocfs2_metadata_cache_purge(&tree->rf_ci); 4089 4090 return UNBLOCK_CONTINUE; 4091 } 4092 4093 static void ocfs2_set_qinfo_lvb(struct ocfs2_lock_res *lockres) 4094 { 4095 struct ocfs2_qinfo_lvb *lvb; 4096 struct ocfs2_mem_dqinfo *oinfo = ocfs2_lock_res_qinfo(lockres); 4097 struct mem_dqinfo *info = sb_dqinfo(oinfo->dqi_gi.dqi_sb, 4098 oinfo->dqi_gi.dqi_type); 4099 4100 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 4101 lvb->lvb_version = OCFS2_QINFO_LVB_VERSION; 4102 lvb->lvb_bgrace = cpu_to_be32(info->dqi_bgrace); 4103 lvb->lvb_igrace = cpu_to_be32(info->dqi_igrace); 4104 lvb->lvb_syncms = cpu_to_be32(oinfo->dqi_syncms); 4105 lvb->lvb_blocks = cpu_to_be32(oinfo->dqi_gi.dqi_blocks); 4106 lvb->lvb_free_blk = cpu_to_be32(oinfo->dqi_gi.dqi_free_blk); 4107 lvb->lvb_free_entry = cpu_to_be32(oinfo->dqi_gi.dqi_free_entry); 4108 } 4109 4110 void ocfs2_qinfo_unlock(struct ocfs2_mem_dqinfo *oinfo, int ex) 4111 { 4112 struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock; 4113 struct ocfs2_super *osb = OCFS2_SB(oinfo->dqi_gi.dqi_sb); 4114 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 4115 4116 if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb)) 4117 ocfs2_cluster_unlock(osb, lockres, level); 4118 } 4119 4120 static int ocfs2_refresh_qinfo(struct ocfs2_mem_dqinfo *oinfo) 4121 { 4122 struct mem_dqinfo *info = sb_dqinfo(oinfo->dqi_gi.dqi_sb, 4123 oinfo->dqi_gi.dqi_type); 4124 struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock; 4125 struct ocfs2_qinfo_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 4126 struct buffer_head *bh = NULL; 4127 struct ocfs2_global_disk_dqinfo *gdinfo; 4128 int status = 0; 4129 4130 if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) && 4131 lvb->lvb_version == OCFS2_QINFO_LVB_VERSION) { 4132 info->dqi_bgrace = be32_to_cpu(lvb->lvb_bgrace); 4133 info->dqi_igrace = be32_to_cpu(lvb->lvb_igrace); 4134 oinfo->dqi_syncms = be32_to_cpu(lvb->lvb_syncms); 4135 oinfo->dqi_gi.dqi_blocks = be32_to_cpu(lvb->lvb_blocks); 4136 oinfo->dqi_gi.dqi_free_blk = be32_to_cpu(lvb->lvb_free_blk); 4137 oinfo->dqi_gi.dqi_free_entry = 4138 be32_to_cpu(lvb->lvb_free_entry); 4139 } else { 4140 status = ocfs2_read_quota_phys_block(oinfo->dqi_gqinode, 4141 oinfo->dqi_giblk, &bh); 4142 if (status) { 4143 mlog_errno(status); 4144 goto bail; 4145 } 4146 gdinfo = (struct ocfs2_global_disk_dqinfo *) 4147 (bh->b_data + OCFS2_GLOBAL_INFO_OFF); 4148 info->dqi_bgrace = le32_to_cpu(gdinfo->dqi_bgrace); 4149 info->dqi_igrace = le32_to_cpu(gdinfo->dqi_igrace); 4150 oinfo->dqi_syncms = le32_to_cpu(gdinfo->dqi_syncms); 4151 oinfo->dqi_gi.dqi_blocks = le32_to_cpu(gdinfo->dqi_blocks); 4152 oinfo->dqi_gi.dqi_free_blk = le32_to_cpu(gdinfo->dqi_free_blk); 4153 oinfo->dqi_gi.dqi_free_entry = 4154 le32_to_cpu(gdinfo->dqi_free_entry); 4155 brelse(bh); 4156 ocfs2_track_lock_refresh(lockres); 4157 } 4158 4159 bail: 4160 return status; 4161 } 4162 4163 /* Lock quota info, this function expects at least shared lock on the quota file 4164 * so that we can safely refresh quota info from disk. */ 4165 int ocfs2_qinfo_lock(struct ocfs2_mem_dqinfo *oinfo, int ex) 4166 { 4167 struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock; 4168 struct ocfs2_super *osb = OCFS2_SB(oinfo->dqi_gi.dqi_sb); 4169 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 4170 int status = 0; 4171 4172 /* On RO devices, locking really isn't needed... */ 4173 if (ocfs2_is_hard_readonly(osb)) { 4174 if (ex) 4175 status = -EROFS; 4176 goto bail; 4177 } 4178 if (ocfs2_mount_local(osb)) 4179 goto bail; 4180 4181 status = ocfs2_cluster_lock(osb, lockres, level, 0, 0); 4182 if (status < 0) { 4183 mlog_errno(status); 4184 goto bail; 4185 } 4186 if (!ocfs2_should_refresh_lock_res(lockres)) 4187 goto bail; 4188 /* OK, we have the lock but we need to refresh the quota info */ 4189 status = ocfs2_refresh_qinfo(oinfo); 4190 if (status) 4191 ocfs2_qinfo_unlock(oinfo, ex); 4192 ocfs2_complete_lock_res_refresh(lockres, status); 4193 bail: 4194 return status; 4195 } 4196 4197 int ocfs2_refcount_lock(struct ocfs2_refcount_tree *ref_tree, int ex) 4198 { 4199 int status; 4200 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 4201 struct ocfs2_lock_res *lockres = &ref_tree->rf_lockres; 4202 struct ocfs2_super *osb = lockres->l_priv; 4203 4204 4205 if (ocfs2_is_hard_readonly(osb)) 4206 return -EROFS; 4207 4208 if (ocfs2_mount_local(osb)) 4209 return 0; 4210 4211 status = ocfs2_cluster_lock(osb, lockres, level, 0, 0); 4212 if (status < 0) 4213 mlog_errno(status); 4214 4215 return status; 4216 } 4217 4218 void ocfs2_refcount_unlock(struct ocfs2_refcount_tree *ref_tree, int ex) 4219 { 4220 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 4221 struct ocfs2_lock_res *lockres = &ref_tree->rf_lockres; 4222 struct ocfs2_super *osb = lockres->l_priv; 4223 4224 if (!ocfs2_mount_local(osb)) 4225 ocfs2_cluster_unlock(osb, lockres, level); 4226 } 4227 4228 static void ocfs2_process_blocked_lock(struct ocfs2_super *osb, 4229 struct ocfs2_lock_res *lockres) 4230 { 4231 int status; 4232 struct ocfs2_unblock_ctl ctl = {0, 0,}; 4233 unsigned long flags; 4234 4235 /* Our reference to the lockres in this function can be 4236 * considered valid until we remove the OCFS2_LOCK_QUEUED 4237 * flag. */ 4238 4239 BUG_ON(!lockres); 4240 BUG_ON(!lockres->l_ops); 4241 4242 mlog(ML_BASTS, "lockres %s blocked\n", lockres->l_name); 4243 4244 /* Detect whether a lock has been marked as going away while 4245 * the downconvert thread was processing other things. A lock can 4246 * still be marked with OCFS2_LOCK_FREEING after this check, 4247 * but short circuiting here will still save us some 4248 * performance. */ 4249 spin_lock_irqsave(&lockres->l_lock, flags); 4250 if (lockres->l_flags & OCFS2_LOCK_FREEING) 4251 goto unqueue; 4252 spin_unlock_irqrestore(&lockres->l_lock, flags); 4253 4254 status = ocfs2_unblock_lock(osb, lockres, &ctl); 4255 if (status < 0) 4256 mlog_errno(status); 4257 4258 spin_lock_irqsave(&lockres->l_lock, flags); 4259 unqueue: 4260 if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) { 4261 lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED); 4262 } else 4263 ocfs2_schedule_blocked_lock(osb, lockres); 4264 4265 mlog(ML_BASTS, "lockres %s, requeue = %s.\n", lockres->l_name, 4266 ctl.requeue ? "yes" : "no"); 4267 spin_unlock_irqrestore(&lockres->l_lock, flags); 4268 4269 if (ctl.unblock_action != UNBLOCK_CONTINUE 4270 && lockres->l_ops->post_unlock) 4271 lockres->l_ops->post_unlock(osb, lockres); 4272 } 4273 4274 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb, 4275 struct ocfs2_lock_res *lockres) 4276 { 4277 unsigned long flags; 4278 4279 assert_spin_locked(&lockres->l_lock); 4280 4281 if (lockres->l_flags & OCFS2_LOCK_FREEING) { 4282 /* Do not schedule a lock for downconvert when it's on 4283 * the way to destruction - any nodes wanting access 4284 * to the resource will get it soon. */ 4285 mlog(ML_BASTS, "lockres %s won't be scheduled: flags 0x%lx\n", 4286 lockres->l_name, lockres->l_flags); 4287 return; 4288 } 4289 4290 lockres_or_flags(lockres, OCFS2_LOCK_QUEUED); 4291 4292 spin_lock_irqsave(&osb->dc_task_lock, flags); 4293 if (list_empty(&lockres->l_blocked_list)) { 4294 list_add_tail(&lockres->l_blocked_list, 4295 &osb->blocked_lock_list); 4296 osb->blocked_lock_count++; 4297 } 4298 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4299 } 4300 4301 static void ocfs2_downconvert_thread_do_work(struct ocfs2_super *osb) 4302 { 4303 unsigned long processed; 4304 unsigned long flags; 4305 struct ocfs2_lock_res *lockres; 4306 4307 spin_lock_irqsave(&osb->dc_task_lock, flags); 4308 /* grab this early so we know to try again if a state change and 4309 * wake happens part-way through our work */ 4310 osb->dc_work_sequence = osb->dc_wake_sequence; 4311 4312 processed = osb->blocked_lock_count; 4313 /* 4314 * blocked lock processing in this loop might call iput which can 4315 * remove items off osb->blocked_lock_list. Downconvert up to 4316 * 'processed' number of locks, but stop short if we had some 4317 * removed in ocfs2_mark_lockres_freeing when downconverting. 4318 */ 4319 while (processed && !list_empty(&osb->blocked_lock_list)) { 4320 lockres = list_entry(osb->blocked_lock_list.next, 4321 struct ocfs2_lock_res, l_blocked_list); 4322 list_del_init(&lockres->l_blocked_list); 4323 osb->blocked_lock_count--; 4324 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4325 4326 BUG_ON(!processed); 4327 processed--; 4328 4329 ocfs2_process_blocked_lock(osb, lockres); 4330 4331 spin_lock_irqsave(&osb->dc_task_lock, flags); 4332 } 4333 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4334 } 4335 4336 static int ocfs2_downconvert_thread_lists_empty(struct ocfs2_super *osb) 4337 { 4338 int empty = 0; 4339 unsigned long flags; 4340 4341 spin_lock_irqsave(&osb->dc_task_lock, flags); 4342 if (list_empty(&osb->blocked_lock_list)) 4343 empty = 1; 4344 4345 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4346 return empty; 4347 } 4348 4349 static int ocfs2_downconvert_thread_should_wake(struct ocfs2_super *osb) 4350 { 4351 int should_wake = 0; 4352 unsigned long flags; 4353 4354 spin_lock_irqsave(&osb->dc_task_lock, flags); 4355 if (osb->dc_work_sequence != osb->dc_wake_sequence) 4356 should_wake = 1; 4357 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4358 4359 return should_wake; 4360 } 4361 4362 static int ocfs2_downconvert_thread(void *arg) 4363 { 4364 int status = 0; 4365 struct ocfs2_super *osb = arg; 4366 4367 /* only quit once we've been asked to stop and there is no more 4368 * work available */ 4369 while (!(kthread_should_stop() && 4370 ocfs2_downconvert_thread_lists_empty(osb))) { 4371 4372 wait_event_interruptible(osb->dc_event, 4373 ocfs2_downconvert_thread_should_wake(osb) || 4374 kthread_should_stop()); 4375 4376 mlog(0, "downconvert_thread: awoken\n"); 4377 4378 ocfs2_downconvert_thread_do_work(osb); 4379 } 4380 4381 osb->dc_task = NULL; 4382 return status; 4383 } 4384 4385 void ocfs2_wake_downconvert_thread(struct ocfs2_super *osb) 4386 { 4387 unsigned long flags; 4388 4389 spin_lock_irqsave(&osb->dc_task_lock, flags); 4390 /* make sure the voting thread gets a swipe at whatever changes 4391 * the caller may have made to the voting state */ 4392 osb->dc_wake_sequence++; 4393 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4394 wake_up(&osb->dc_event); 4395 } 4396