1 /* -*- mode: c; c-basic-offset: 8; -*- 2 * vim: noexpandtab sw=8 ts=8 sts=0: 3 * 4 * dlmglue.c 5 * 6 * Code which implements an OCFS2 specific interface to our DLM. 7 * 8 * Copyright (C) 2003, 2004 Oracle. All rights reserved. 9 * 10 * This program is free software; you can redistribute it and/or 11 * modify it under the terms of the GNU General Public 12 * License as published by the Free Software Foundation; either 13 * version 2 of the License, or (at your option) any later version. 14 * 15 * This program is distributed in the hope that it will be useful, 16 * but WITHOUT ANY WARRANTY; without even the implied warranty of 17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 18 * General Public License for more details. 19 * 20 * You should have received a copy of the GNU General Public 21 * License along with this program; if not, write to the 22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 23 * Boston, MA 021110-1307, USA. 24 */ 25 26 #include <linux/types.h> 27 #include <linux/slab.h> 28 #include <linux/highmem.h> 29 #include <linux/mm.h> 30 #include <linux/kthread.h> 31 #include <linux/pagemap.h> 32 #include <linux/debugfs.h> 33 #include <linux/seq_file.h> 34 #include <linux/time.h> 35 #include <linux/quotaops.h> 36 #include <linux/sched/signal.h> 37 38 #define MLOG_MASK_PREFIX ML_DLM_GLUE 39 #include <cluster/masklog.h> 40 41 #include "ocfs2.h" 42 #include "ocfs2_lockingver.h" 43 44 #include "alloc.h" 45 #include "dcache.h" 46 #include "dlmglue.h" 47 #include "extent_map.h" 48 #include "file.h" 49 #include "heartbeat.h" 50 #include "inode.h" 51 #include "journal.h" 52 #include "stackglue.h" 53 #include "slot_map.h" 54 #include "super.h" 55 #include "uptodate.h" 56 #include "quota.h" 57 #include "refcounttree.h" 58 #include "acl.h" 59 60 #include "buffer_head_io.h" 61 62 struct ocfs2_mask_waiter { 63 struct list_head mw_item; 64 int mw_status; 65 struct completion mw_complete; 66 unsigned long mw_mask; 67 unsigned long mw_goal; 68 #ifdef CONFIG_OCFS2_FS_STATS 69 ktime_t mw_lock_start; 70 #endif 71 }; 72 73 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres); 74 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres); 75 static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres); 76 static struct ocfs2_super *ocfs2_get_qinfo_osb(struct ocfs2_lock_res *lockres); 77 78 /* 79 * Return value from ->downconvert_worker functions. 80 * 81 * These control the precise actions of ocfs2_unblock_lock() 82 * and ocfs2_process_blocked_lock() 83 * 84 */ 85 enum ocfs2_unblock_action { 86 UNBLOCK_CONTINUE = 0, /* Continue downconvert */ 87 UNBLOCK_CONTINUE_POST = 1, /* Continue downconvert, fire 88 * ->post_unlock callback */ 89 UNBLOCK_STOP_POST = 2, /* Do not downconvert, fire 90 * ->post_unlock() callback. */ 91 }; 92 93 struct ocfs2_unblock_ctl { 94 int requeue; 95 enum ocfs2_unblock_action unblock_action; 96 }; 97 98 /* Lockdep class keys */ 99 struct lock_class_key lockdep_keys[OCFS2_NUM_LOCK_TYPES]; 100 101 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres, 102 int new_level); 103 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres); 104 105 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres, 106 int blocking); 107 108 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres, 109 int blocking); 110 111 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb, 112 struct ocfs2_lock_res *lockres); 113 114 static void ocfs2_set_qinfo_lvb(struct ocfs2_lock_res *lockres); 115 116 static int ocfs2_check_refcount_downconvert(struct ocfs2_lock_res *lockres, 117 int new_level); 118 static int ocfs2_refcount_convert_worker(struct ocfs2_lock_res *lockres, 119 int blocking); 120 121 #define mlog_meta_lvb(__level, __lockres) ocfs2_dump_meta_lvb_info(__level, __PRETTY_FUNCTION__, __LINE__, __lockres) 122 123 /* This aids in debugging situations where a bad LVB might be involved. */ 124 static void ocfs2_dump_meta_lvb_info(u64 level, 125 const char *function, 126 unsigned int line, 127 struct ocfs2_lock_res *lockres) 128 { 129 struct ocfs2_meta_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 130 131 mlog(level, "LVB information for %s (called from %s:%u):\n", 132 lockres->l_name, function, line); 133 mlog(level, "version: %u, clusters: %u, generation: 0x%x\n", 134 lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters), 135 be32_to_cpu(lvb->lvb_igeneration)); 136 mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n", 137 (unsigned long long)be64_to_cpu(lvb->lvb_isize), 138 be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid), 139 be16_to_cpu(lvb->lvb_imode)); 140 mlog(level, "nlink %u, atime_packed 0x%llx, ctime_packed 0x%llx, " 141 "mtime_packed 0x%llx iattr 0x%x\n", be16_to_cpu(lvb->lvb_inlink), 142 (long long)be64_to_cpu(lvb->lvb_iatime_packed), 143 (long long)be64_to_cpu(lvb->lvb_ictime_packed), 144 (long long)be64_to_cpu(lvb->lvb_imtime_packed), 145 be32_to_cpu(lvb->lvb_iattr)); 146 } 147 148 149 /* 150 * OCFS2 Lock Resource Operations 151 * 152 * These fine tune the behavior of the generic dlmglue locking infrastructure. 153 * 154 * The most basic of lock types can point ->l_priv to their respective 155 * struct ocfs2_super and allow the default actions to manage things. 156 * 157 * Right now, each lock type also needs to implement an init function, 158 * and trivial lock/unlock wrappers. ocfs2_simple_drop_lockres() 159 * should be called when the lock is no longer needed (i.e., object 160 * destruction time). 161 */ 162 struct ocfs2_lock_res_ops { 163 /* 164 * Translate an ocfs2_lock_res * into an ocfs2_super *. Define 165 * this callback if ->l_priv is not an ocfs2_super pointer 166 */ 167 struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *); 168 169 /* 170 * Optionally called in the downconvert thread after a 171 * successful downconvert. The lockres will not be referenced 172 * after this callback is called, so it is safe to free 173 * memory, etc. 174 * 175 * The exact semantics of when this is called are controlled 176 * by ->downconvert_worker() 177 */ 178 void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *); 179 180 /* 181 * Allow a lock type to add checks to determine whether it is 182 * safe to downconvert a lock. Return 0 to re-queue the 183 * downconvert at a later time, nonzero to continue. 184 * 185 * For most locks, the default checks that there are no 186 * incompatible holders are sufficient. 187 * 188 * Called with the lockres spinlock held. 189 */ 190 int (*check_downconvert)(struct ocfs2_lock_res *, int); 191 192 /* 193 * Allows a lock type to populate the lock value block. This 194 * is called on downconvert, and when we drop a lock. 195 * 196 * Locks that want to use this should set LOCK_TYPE_USES_LVB 197 * in the flags field. 198 * 199 * Called with the lockres spinlock held. 200 */ 201 void (*set_lvb)(struct ocfs2_lock_res *); 202 203 /* 204 * Called from the downconvert thread when it is determined 205 * that a lock will be downconverted. This is called without 206 * any locks held so the function can do work that might 207 * schedule (syncing out data, etc). 208 * 209 * This should return any one of the ocfs2_unblock_action 210 * values, depending on what it wants the thread to do. 211 */ 212 int (*downconvert_worker)(struct ocfs2_lock_res *, int); 213 214 /* 215 * LOCK_TYPE_* flags which describe the specific requirements 216 * of a lock type. Descriptions of each individual flag follow. 217 */ 218 int flags; 219 }; 220 221 /* 222 * Some locks want to "refresh" potentially stale data when a 223 * meaningful (PRMODE or EXMODE) lock level is first obtained. If this 224 * flag is set, the OCFS2_LOCK_NEEDS_REFRESH flag will be set on the 225 * individual lockres l_flags member from the ast function. It is 226 * expected that the locking wrapper will clear the 227 * OCFS2_LOCK_NEEDS_REFRESH flag when done. 228 */ 229 #define LOCK_TYPE_REQUIRES_REFRESH 0x1 230 231 /* 232 * Indicate that a lock type makes use of the lock value block. The 233 * ->set_lvb lock type callback must be defined. 234 */ 235 #define LOCK_TYPE_USES_LVB 0x2 236 237 static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = { 238 .get_osb = ocfs2_get_inode_osb, 239 .flags = 0, 240 }; 241 242 static struct ocfs2_lock_res_ops ocfs2_inode_inode_lops = { 243 .get_osb = ocfs2_get_inode_osb, 244 .check_downconvert = ocfs2_check_meta_downconvert, 245 .set_lvb = ocfs2_set_meta_lvb, 246 .downconvert_worker = ocfs2_data_convert_worker, 247 .flags = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB, 248 }; 249 250 static struct ocfs2_lock_res_ops ocfs2_super_lops = { 251 .flags = LOCK_TYPE_REQUIRES_REFRESH, 252 }; 253 254 static struct ocfs2_lock_res_ops ocfs2_rename_lops = { 255 .flags = 0, 256 }; 257 258 static struct ocfs2_lock_res_ops ocfs2_nfs_sync_lops = { 259 .flags = 0, 260 }; 261 262 static struct ocfs2_lock_res_ops ocfs2_trim_fs_lops = { 263 .flags = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB, 264 }; 265 266 static struct ocfs2_lock_res_ops ocfs2_orphan_scan_lops = { 267 .flags = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB, 268 }; 269 270 static struct ocfs2_lock_res_ops ocfs2_dentry_lops = { 271 .get_osb = ocfs2_get_dentry_osb, 272 .post_unlock = ocfs2_dentry_post_unlock, 273 .downconvert_worker = ocfs2_dentry_convert_worker, 274 .flags = 0, 275 }; 276 277 static struct ocfs2_lock_res_ops ocfs2_inode_open_lops = { 278 .get_osb = ocfs2_get_inode_osb, 279 .flags = 0, 280 }; 281 282 static struct ocfs2_lock_res_ops ocfs2_flock_lops = { 283 .get_osb = ocfs2_get_file_osb, 284 .flags = 0, 285 }; 286 287 static struct ocfs2_lock_res_ops ocfs2_qinfo_lops = { 288 .set_lvb = ocfs2_set_qinfo_lvb, 289 .get_osb = ocfs2_get_qinfo_osb, 290 .flags = LOCK_TYPE_REQUIRES_REFRESH | LOCK_TYPE_USES_LVB, 291 }; 292 293 static struct ocfs2_lock_res_ops ocfs2_refcount_block_lops = { 294 .check_downconvert = ocfs2_check_refcount_downconvert, 295 .downconvert_worker = ocfs2_refcount_convert_worker, 296 .flags = 0, 297 }; 298 299 static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres) 300 { 301 return lockres->l_type == OCFS2_LOCK_TYPE_META || 302 lockres->l_type == OCFS2_LOCK_TYPE_RW || 303 lockres->l_type == OCFS2_LOCK_TYPE_OPEN; 304 } 305 306 static inline struct ocfs2_lock_res *ocfs2_lksb_to_lock_res(struct ocfs2_dlm_lksb *lksb) 307 { 308 return container_of(lksb, struct ocfs2_lock_res, l_lksb); 309 } 310 311 static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres) 312 { 313 BUG_ON(!ocfs2_is_inode_lock(lockres)); 314 315 return (struct inode *) lockres->l_priv; 316 } 317 318 static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres) 319 { 320 BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY); 321 322 return (struct ocfs2_dentry_lock *)lockres->l_priv; 323 } 324 325 static inline struct ocfs2_mem_dqinfo *ocfs2_lock_res_qinfo(struct ocfs2_lock_res *lockres) 326 { 327 BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_QINFO); 328 329 return (struct ocfs2_mem_dqinfo *)lockres->l_priv; 330 } 331 332 static inline struct ocfs2_refcount_tree * 333 ocfs2_lock_res_refcount_tree(struct ocfs2_lock_res *res) 334 { 335 return container_of(res, struct ocfs2_refcount_tree, rf_lockres); 336 } 337 338 static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *lockres) 339 { 340 if (lockres->l_ops->get_osb) 341 return lockres->l_ops->get_osb(lockres); 342 343 return (struct ocfs2_super *)lockres->l_priv; 344 } 345 346 static int ocfs2_lock_create(struct ocfs2_super *osb, 347 struct ocfs2_lock_res *lockres, 348 int level, 349 u32 dlm_flags); 350 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres, 351 int wanted); 352 static void __ocfs2_cluster_unlock(struct ocfs2_super *osb, 353 struct ocfs2_lock_res *lockres, 354 int level, unsigned long caller_ip); 355 static inline void ocfs2_cluster_unlock(struct ocfs2_super *osb, 356 struct ocfs2_lock_res *lockres, 357 int level) 358 { 359 __ocfs2_cluster_unlock(osb, lockres, level, _RET_IP_); 360 } 361 362 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres); 363 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres); 364 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres); 365 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, int level); 366 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb, 367 struct ocfs2_lock_res *lockres); 368 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres, 369 int convert); 370 #define ocfs2_log_dlm_error(_func, _err, _lockres) do { \ 371 if ((_lockres)->l_type != OCFS2_LOCK_TYPE_DENTRY) \ 372 mlog(ML_ERROR, "DLM error %d while calling %s on resource %s\n", \ 373 _err, _func, _lockres->l_name); \ 374 else \ 375 mlog(ML_ERROR, "DLM error %d while calling %s on resource %.*s%08x\n", \ 376 _err, _func, OCFS2_DENTRY_LOCK_INO_START - 1, (_lockres)->l_name, \ 377 (unsigned int)ocfs2_get_dentry_lock_ino(_lockres)); \ 378 } while (0) 379 static int ocfs2_downconvert_thread(void *arg); 380 static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb, 381 struct ocfs2_lock_res *lockres); 382 static int ocfs2_inode_lock_update(struct inode *inode, 383 struct buffer_head **bh); 384 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb); 385 static inline int ocfs2_highest_compat_lock_level(int level); 386 static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres, 387 int new_level); 388 static int ocfs2_downconvert_lock(struct ocfs2_super *osb, 389 struct ocfs2_lock_res *lockres, 390 int new_level, 391 int lvb, 392 unsigned int generation); 393 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb, 394 struct ocfs2_lock_res *lockres); 395 static int ocfs2_cancel_convert(struct ocfs2_super *osb, 396 struct ocfs2_lock_res *lockres); 397 398 399 static void ocfs2_build_lock_name(enum ocfs2_lock_type type, 400 u64 blkno, 401 u32 generation, 402 char *name) 403 { 404 int len; 405 406 BUG_ON(type >= OCFS2_NUM_LOCK_TYPES); 407 408 len = snprintf(name, OCFS2_LOCK_ID_MAX_LEN, "%c%s%016llx%08x", 409 ocfs2_lock_type_char(type), OCFS2_LOCK_ID_PAD, 410 (long long)blkno, generation); 411 412 BUG_ON(len != (OCFS2_LOCK_ID_MAX_LEN - 1)); 413 414 mlog(0, "built lock resource with name: %s\n", name); 415 } 416 417 static DEFINE_SPINLOCK(ocfs2_dlm_tracking_lock); 418 419 static void ocfs2_add_lockres_tracking(struct ocfs2_lock_res *res, 420 struct ocfs2_dlm_debug *dlm_debug) 421 { 422 mlog(0, "Add tracking for lockres %s\n", res->l_name); 423 424 spin_lock(&ocfs2_dlm_tracking_lock); 425 list_add(&res->l_debug_list, &dlm_debug->d_lockres_tracking); 426 spin_unlock(&ocfs2_dlm_tracking_lock); 427 } 428 429 static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res) 430 { 431 spin_lock(&ocfs2_dlm_tracking_lock); 432 if (!list_empty(&res->l_debug_list)) 433 list_del_init(&res->l_debug_list); 434 spin_unlock(&ocfs2_dlm_tracking_lock); 435 } 436 437 #ifdef CONFIG_OCFS2_FS_STATS 438 static void ocfs2_init_lock_stats(struct ocfs2_lock_res *res) 439 { 440 res->l_lock_refresh = 0; 441 memset(&res->l_lock_prmode, 0, sizeof(struct ocfs2_lock_stats)); 442 memset(&res->l_lock_exmode, 0, sizeof(struct ocfs2_lock_stats)); 443 } 444 445 static void ocfs2_update_lock_stats(struct ocfs2_lock_res *res, int level, 446 struct ocfs2_mask_waiter *mw, int ret) 447 { 448 u32 usec; 449 ktime_t kt; 450 struct ocfs2_lock_stats *stats; 451 452 if (level == LKM_PRMODE) 453 stats = &res->l_lock_prmode; 454 else if (level == LKM_EXMODE) 455 stats = &res->l_lock_exmode; 456 else 457 return; 458 459 kt = ktime_sub(ktime_get(), mw->mw_lock_start); 460 usec = ktime_to_us(kt); 461 462 stats->ls_gets++; 463 stats->ls_total += ktime_to_ns(kt); 464 /* overflow */ 465 if (unlikely(stats->ls_gets == 0)) { 466 stats->ls_gets++; 467 stats->ls_total = ktime_to_ns(kt); 468 } 469 470 if (stats->ls_max < usec) 471 stats->ls_max = usec; 472 473 if (ret) 474 stats->ls_fail++; 475 } 476 477 static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres) 478 { 479 lockres->l_lock_refresh++; 480 } 481 482 static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw) 483 { 484 mw->mw_lock_start = ktime_get(); 485 } 486 #else 487 static inline void ocfs2_init_lock_stats(struct ocfs2_lock_res *res) 488 { 489 } 490 static inline void ocfs2_update_lock_stats(struct ocfs2_lock_res *res, 491 int level, struct ocfs2_mask_waiter *mw, int ret) 492 { 493 } 494 static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres) 495 { 496 } 497 static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw) 498 { 499 } 500 #endif 501 502 static void ocfs2_lock_res_init_common(struct ocfs2_super *osb, 503 struct ocfs2_lock_res *res, 504 enum ocfs2_lock_type type, 505 struct ocfs2_lock_res_ops *ops, 506 void *priv) 507 { 508 res->l_type = type; 509 res->l_ops = ops; 510 res->l_priv = priv; 511 512 res->l_level = DLM_LOCK_IV; 513 res->l_requested = DLM_LOCK_IV; 514 res->l_blocking = DLM_LOCK_IV; 515 res->l_action = OCFS2_AST_INVALID; 516 res->l_unlock_action = OCFS2_UNLOCK_INVALID; 517 518 res->l_flags = OCFS2_LOCK_INITIALIZED; 519 520 ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug); 521 522 ocfs2_init_lock_stats(res); 523 #ifdef CONFIG_DEBUG_LOCK_ALLOC 524 if (type != OCFS2_LOCK_TYPE_OPEN) 525 lockdep_init_map(&res->l_lockdep_map, ocfs2_lock_type_strings[type], 526 &lockdep_keys[type], 0); 527 else 528 res->l_lockdep_map.key = NULL; 529 #endif 530 } 531 532 void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res) 533 { 534 /* This also clears out the lock status block */ 535 memset(res, 0, sizeof(struct ocfs2_lock_res)); 536 spin_lock_init(&res->l_lock); 537 init_waitqueue_head(&res->l_event); 538 INIT_LIST_HEAD(&res->l_blocked_list); 539 INIT_LIST_HEAD(&res->l_mask_waiters); 540 INIT_LIST_HEAD(&res->l_holders); 541 } 542 543 void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res, 544 enum ocfs2_lock_type type, 545 unsigned int generation, 546 struct inode *inode) 547 { 548 struct ocfs2_lock_res_ops *ops; 549 550 switch(type) { 551 case OCFS2_LOCK_TYPE_RW: 552 ops = &ocfs2_inode_rw_lops; 553 break; 554 case OCFS2_LOCK_TYPE_META: 555 ops = &ocfs2_inode_inode_lops; 556 break; 557 case OCFS2_LOCK_TYPE_OPEN: 558 ops = &ocfs2_inode_open_lops; 559 break; 560 default: 561 mlog_bug_on_msg(1, "type: %d\n", type); 562 ops = NULL; /* thanks, gcc */ 563 break; 564 }; 565 566 ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno, 567 generation, res->l_name); 568 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode); 569 } 570 571 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres) 572 { 573 struct inode *inode = ocfs2_lock_res_inode(lockres); 574 575 return OCFS2_SB(inode->i_sb); 576 } 577 578 static struct ocfs2_super *ocfs2_get_qinfo_osb(struct ocfs2_lock_res *lockres) 579 { 580 struct ocfs2_mem_dqinfo *info = lockres->l_priv; 581 582 return OCFS2_SB(info->dqi_gi.dqi_sb); 583 } 584 585 static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres) 586 { 587 struct ocfs2_file_private *fp = lockres->l_priv; 588 589 return OCFS2_SB(fp->fp_file->f_mapping->host->i_sb); 590 } 591 592 static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres) 593 { 594 __be64 inode_blkno_be; 595 596 memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], 597 sizeof(__be64)); 598 599 return be64_to_cpu(inode_blkno_be); 600 } 601 602 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres) 603 { 604 struct ocfs2_dentry_lock *dl = lockres->l_priv; 605 606 return OCFS2_SB(dl->dl_inode->i_sb); 607 } 608 609 void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl, 610 u64 parent, struct inode *inode) 611 { 612 int len; 613 u64 inode_blkno = OCFS2_I(inode)->ip_blkno; 614 __be64 inode_blkno_be = cpu_to_be64(inode_blkno); 615 struct ocfs2_lock_res *lockres = &dl->dl_lockres; 616 617 ocfs2_lock_res_init_once(lockres); 618 619 /* 620 * Unfortunately, the standard lock naming scheme won't work 621 * here because we have two 16 byte values to use. Instead, 622 * we'll stuff the inode number as a binary value. We still 623 * want error prints to show something without garbling the 624 * display, so drop a null byte in there before the inode 625 * number. A future version of OCFS2 will likely use all 626 * binary lock names. The stringified names have been a 627 * tremendous aid in debugging, but now that the debugfs 628 * interface exists, we can mangle things there if need be. 629 * 630 * NOTE: We also drop the standard "pad" value (the total lock 631 * name size stays the same though - the last part is all 632 * zeros due to the memset in ocfs2_lock_res_init_once() 633 */ 634 len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START, 635 "%c%016llx", 636 ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY), 637 (long long)parent); 638 639 BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1)); 640 641 memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be, 642 sizeof(__be64)); 643 644 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres, 645 OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops, 646 dl); 647 } 648 649 static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res, 650 struct ocfs2_super *osb) 651 { 652 /* Superblock lockres doesn't come from a slab so we call init 653 * once on it manually. */ 654 ocfs2_lock_res_init_once(res); 655 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO, 656 0, res->l_name); 657 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER, 658 &ocfs2_super_lops, osb); 659 } 660 661 static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res, 662 struct ocfs2_super *osb) 663 { 664 /* Rename lockres doesn't come from a slab so we call init 665 * once on it manually. */ 666 ocfs2_lock_res_init_once(res); 667 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name); 668 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME, 669 &ocfs2_rename_lops, osb); 670 } 671 672 static void ocfs2_nfs_sync_lock_res_init(struct ocfs2_lock_res *res, 673 struct ocfs2_super *osb) 674 { 675 /* nfs_sync lockres doesn't come from a slab so we call init 676 * once on it manually. */ 677 ocfs2_lock_res_init_once(res); 678 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_NFS_SYNC, 0, 0, res->l_name); 679 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_NFS_SYNC, 680 &ocfs2_nfs_sync_lops, osb); 681 } 682 683 void ocfs2_trim_fs_lock_res_init(struct ocfs2_super *osb) 684 { 685 struct ocfs2_lock_res *lockres = &osb->osb_trim_fs_lockres; 686 687 ocfs2_lock_res_init_once(lockres); 688 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_TRIM_FS, 0, 0, lockres->l_name); 689 ocfs2_lock_res_init_common(osb, lockres, OCFS2_LOCK_TYPE_TRIM_FS, 690 &ocfs2_trim_fs_lops, osb); 691 } 692 693 void ocfs2_trim_fs_lock_res_uninit(struct ocfs2_super *osb) 694 { 695 struct ocfs2_lock_res *lockres = &osb->osb_trim_fs_lockres; 696 697 ocfs2_simple_drop_lockres(osb, lockres); 698 ocfs2_lock_res_free(lockres); 699 } 700 701 static void ocfs2_orphan_scan_lock_res_init(struct ocfs2_lock_res *res, 702 struct ocfs2_super *osb) 703 { 704 ocfs2_lock_res_init_once(res); 705 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_ORPHAN_SCAN, 0, 0, res->l_name); 706 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_ORPHAN_SCAN, 707 &ocfs2_orphan_scan_lops, osb); 708 } 709 710 void ocfs2_file_lock_res_init(struct ocfs2_lock_res *lockres, 711 struct ocfs2_file_private *fp) 712 { 713 struct inode *inode = fp->fp_file->f_mapping->host; 714 struct ocfs2_inode_info *oi = OCFS2_I(inode); 715 716 ocfs2_lock_res_init_once(lockres); 717 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_FLOCK, oi->ip_blkno, 718 inode->i_generation, lockres->l_name); 719 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres, 720 OCFS2_LOCK_TYPE_FLOCK, &ocfs2_flock_lops, 721 fp); 722 lockres->l_flags |= OCFS2_LOCK_NOCACHE; 723 } 724 725 void ocfs2_qinfo_lock_res_init(struct ocfs2_lock_res *lockres, 726 struct ocfs2_mem_dqinfo *info) 727 { 728 ocfs2_lock_res_init_once(lockres); 729 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_QINFO, info->dqi_gi.dqi_type, 730 0, lockres->l_name); 731 ocfs2_lock_res_init_common(OCFS2_SB(info->dqi_gi.dqi_sb), lockres, 732 OCFS2_LOCK_TYPE_QINFO, &ocfs2_qinfo_lops, 733 info); 734 } 735 736 void ocfs2_refcount_lock_res_init(struct ocfs2_lock_res *lockres, 737 struct ocfs2_super *osb, u64 ref_blkno, 738 unsigned int generation) 739 { 740 ocfs2_lock_res_init_once(lockres); 741 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_REFCOUNT, ref_blkno, 742 generation, lockres->l_name); 743 ocfs2_lock_res_init_common(osb, lockres, OCFS2_LOCK_TYPE_REFCOUNT, 744 &ocfs2_refcount_block_lops, osb); 745 } 746 747 void ocfs2_lock_res_free(struct ocfs2_lock_res *res) 748 { 749 if (!(res->l_flags & OCFS2_LOCK_INITIALIZED)) 750 return; 751 752 ocfs2_remove_lockres_tracking(res); 753 754 mlog_bug_on_msg(!list_empty(&res->l_blocked_list), 755 "Lockres %s is on the blocked list\n", 756 res->l_name); 757 mlog_bug_on_msg(!list_empty(&res->l_mask_waiters), 758 "Lockres %s has mask waiters pending\n", 759 res->l_name); 760 mlog_bug_on_msg(spin_is_locked(&res->l_lock), 761 "Lockres %s is locked\n", 762 res->l_name); 763 mlog_bug_on_msg(res->l_ro_holders, 764 "Lockres %s has %u ro holders\n", 765 res->l_name, res->l_ro_holders); 766 mlog_bug_on_msg(res->l_ex_holders, 767 "Lockres %s has %u ex holders\n", 768 res->l_name, res->l_ex_holders); 769 770 /* Need to clear out the lock status block for the dlm */ 771 memset(&res->l_lksb, 0, sizeof(res->l_lksb)); 772 773 res->l_flags = 0UL; 774 } 775 776 /* 777 * Keep a list of processes who have interest in a lockres. 778 * Note: this is now only uesed for check recursive cluster locking. 779 */ 780 static inline void ocfs2_add_holder(struct ocfs2_lock_res *lockres, 781 struct ocfs2_lock_holder *oh) 782 { 783 INIT_LIST_HEAD(&oh->oh_list); 784 oh->oh_owner_pid = get_pid(task_pid(current)); 785 786 spin_lock(&lockres->l_lock); 787 list_add_tail(&oh->oh_list, &lockres->l_holders); 788 spin_unlock(&lockres->l_lock); 789 } 790 791 static struct ocfs2_lock_holder * 792 ocfs2_pid_holder(struct ocfs2_lock_res *lockres, 793 struct pid *pid) 794 { 795 struct ocfs2_lock_holder *oh; 796 797 spin_lock(&lockres->l_lock); 798 list_for_each_entry(oh, &lockres->l_holders, oh_list) { 799 if (oh->oh_owner_pid == pid) { 800 spin_unlock(&lockres->l_lock); 801 return oh; 802 } 803 } 804 spin_unlock(&lockres->l_lock); 805 return NULL; 806 } 807 808 static inline void ocfs2_remove_holder(struct ocfs2_lock_res *lockres, 809 struct ocfs2_lock_holder *oh) 810 { 811 spin_lock(&lockres->l_lock); 812 list_del(&oh->oh_list); 813 spin_unlock(&lockres->l_lock); 814 815 put_pid(oh->oh_owner_pid); 816 } 817 818 819 static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres, 820 int level) 821 { 822 BUG_ON(!lockres); 823 824 switch(level) { 825 case DLM_LOCK_EX: 826 lockres->l_ex_holders++; 827 break; 828 case DLM_LOCK_PR: 829 lockres->l_ro_holders++; 830 break; 831 default: 832 BUG(); 833 } 834 } 835 836 static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres, 837 int level) 838 { 839 BUG_ON(!lockres); 840 841 switch(level) { 842 case DLM_LOCK_EX: 843 BUG_ON(!lockres->l_ex_holders); 844 lockres->l_ex_holders--; 845 break; 846 case DLM_LOCK_PR: 847 BUG_ON(!lockres->l_ro_holders); 848 lockres->l_ro_holders--; 849 break; 850 default: 851 BUG(); 852 } 853 } 854 855 /* WARNING: This function lives in a world where the only three lock 856 * levels are EX, PR, and NL. It *will* have to be adjusted when more 857 * lock types are added. */ 858 static inline int ocfs2_highest_compat_lock_level(int level) 859 { 860 int new_level = DLM_LOCK_EX; 861 862 if (level == DLM_LOCK_EX) 863 new_level = DLM_LOCK_NL; 864 else if (level == DLM_LOCK_PR) 865 new_level = DLM_LOCK_PR; 866 return new_level; 867 } 868 869 static void lockres_set_flags(struct ocfs2_lock_res *lockres, 870 unsigned long newflags) 871 { 872 struct ocfs2_mask_waiter *mw, *tmp; 873 874 assert_spin_locked(&lockres->l_lock); 875 876 lockres->l_flags = newflags; 877 878 list_for_each_entry_safe(mw, tmp, &lockres->l_mask_waiters, mw_item) { 879 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal) 880 continue; 881 882 list_del_init(&mw->mw_item); 883 mw->mw_status = 0; 884 complete(&mw->mw_complete); 885 } 886 } 887 static void lockres_or_flags(struct ocfs2_lock_res *lockres, unsigned long or) 888 { 889 lockres_set_flags(lockres, lockres->l_flags | or); 890 } 891 static void lockres_clear_flags(struct ocfs2_lock_res *lockres, 892 unsigned long clear) 893 { 894 lockres_set_flags(lockres, lockres->l_flags & ~clear); 895 } 896 897 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres) 898 { 899 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 900 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED)); 901 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); 902 BUG_ON(lockres->l_blocking <= DLM_LOCK_NL); 903 904 lockres->l_level = lockres->l_requested; 905 if (lockres->l_level <= 906 ocfs2_highest_compat_lock_level(lockres->l_blocking)) { 907 lockres->l_blocking = DLM_LOCK_NL; 908 lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED); 909 } 910 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 911 } 912 913 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres) 914 { 915 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 916 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED)); 917 918 /* Convert from RO to EX doesn't really need anything as our 919 * information is already up to data. Convert from NL to 920 * *anything* however should mark ourselves as needing an 921 * update */ 922 if (lockres->l_level == DLM_LOCK_NL && 923 lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 924 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 925 926 lockres->l_level = lockres->l_requested; 927 928 /* 929 * We set the OCFS2_LOCK_UPCONVERT_FINISHING flag before clearing 930 * the OCFS2_LOCK_BUSY flag to prevent the dc thread from 931 * downconverting the lock before the upconvert has fully completed. 932 * Do not prevent the dc thread from downconverting if NONBLOCK lock 933 * had already returned. 934 */ 935 if (!(lockres->l_flags & OCFS2_LOCK_NONBLOCK_FINISHED)) 936 lockres_or_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING); 937 else 938 lockres_clear_flags(lockres, OCFS2_LOCK_NONBLOCK_FINISHED); 939 940 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 941 } 942 943 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres) 944 { 945 BUG_ON((!(lockres->l_flags & OCFS2_LOCK_BUSY))); 946 BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED); 947 948 if (lockres->l_requested > DLM_LOCK_NL && 949 !(lockres->l_flags & OCFS2_LOCK_LOCAL) && 950 lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 951 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 952 953 lockres->l_level = lockres->l_requested; 954 lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED); 955 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 956 } 957 958 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, 959 int level) 960 { 961 int needs_downconvert = 0; 962 963 assert_spin_locked(&lockres->l_lock); 964 965 if (level > lockres->l_blocking) { 966 /* only schedule a downconvert if we haven't already scheduled 967 * one that goes low enough to satisfy the level we're 968 * blocking. this also catches the case where we get 969 * duplicate BASTs */ 970 if (ocfs2_highest_compat_lock_level(level) < 971 ocfs2_highest_compat_lock_level(lockres->l_blocking)) 972 needs_downconvert = 1; 973 974 lockres->l_blocking = level; 975 } 976 977 mlog(ML_BASTS, "lockres %s, block %d, level %d, l_block %d, dwn %d\n", 978 lockres->l_name, level, lockres->l_level, lockres->l_blocking, 979 needs_downconvert); 980 981 if (needs_downconvert) 982 lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED); 983 mlog(0, "needs_downconvert = %d\n", needs_downconvert); 984 return needs_downconvert; 985 } 986 987 /* 988 * OCFS2_LOCK_PENDING and l_pending_gen. 989 * 990 * Why does OCFS2_LOCK_PENDING exist? To close a race between setting 991 * OCFS2_LOCK_BUSY and calling ocfs2_dlm_lock(). See ocfs2_unblock_lock() 992 * for more details on the race. 993 * 994 * OCFS2_LOCK_PENDING closes the race quite nicely. However, it introduces 995 * a race on itself. In o2dlm, we can get the ast before ocfs2_dlm_lock() 996 * returns. The ast clears OCFS2_LOCK_BUSY, and must therefore clear 997 * OCFS2_LOCK_PENDING at the same time. When ocfs2_dlm_lock() returns, 998 * the caller is going to try to clear PENDING again. If nothing else is 999 * happening, __lockres_clear_pending() sees PENDING is unset and does 1000 * nothing. 1001 * 1002 * But what if another path (eg downconvert thread) has just started a 1003 * new locking action? The other path has re-set PENDING. Our path 1004 * cannot clear PENDING, because that will re-open the original race 1005 * window. 1006 * 1007 * [Example] 1008 * 1009 * ocfs2_meta_lock() 1010 * ocfs2_cluster_lock() 1011 * set BUSY 1012 * set PENDING 1013 * drop l_lock 1014 * ocfs2_dlm_lock() 1015 * ocfs2_locking_ast() ocfs2_downconvert_thread() 1016 * clear PENDING ocfs2_unblock_lock() 1017 * take_l_lock 1018 * !BUSY 1019 * ocfs2_prepare_downconvert() 1020 * set BUSY 1021 * set PENDING 1022 * drop l_lock 1023 * take l_lock 1024 * clear PENDING 1025 * drop l_lock 1026 * <window> 1027 * ocfs2_dlm_lock() 1028 * 1029 * So as you can see, we now have a window where l_lock is not held, 1030 * PENDING is not set, and ocfs2_dlm_lock() has not been called. 1031 * 1032 * The core problem is that ocfs2_cluster_lock() has cleared the PENDING 1033 * set by ocfs2_prepare_downconvert(). That wasn't nice. 1034 * 1035 * To solve this we introduce l_pending_gen. A call to 1036 * lockres_clear_pending() will only do so when it is passed a generation 1037 * number that matches the lockres. lockres_set_pending() will return the 1038 * current generation number. When ocfs2_cluster_lock() goes to clear 1039 * PENDING, it passes the generation it got from set_pending(). In our 1040 * example above, the generation numbers will *not* match. Thus, 1041 * ocfs2_cluster_lock() will not clear the PENDING set by 1042 * ocfs2_prepare_downconvert(). 1043 */ 1044 1045 /* Unlocked version for ocfs2_locking_ast() */ 1046 static void __lockres_clear_pending(struct ocfs2_lock_res *lockres, 1047 unsigned int generation, 1048 struct ocfs2_super *osb) 1049 { 1050 assert_spin_locked(&lockres->l_lock); 1051 1052 /* 1053 * The ast and locking functions can race us here. The winner 1054 * will clear pending, the loser will not. 1055 */ 1056 if (!(lockres->l_flags & OCFS2_LOCK_PENDING) || 1057 (lockres->l_pending_gen != generation)) 1058 return; 1059 1060 lockres_clear_flags(lockres, OCFS2_LOCK_PENDING); 1061 lockres->l_pending_gen++; 1062 1063 /* 1064 * The downconvert thread may have skipped us because we 1065 * were PENDING. Wake it up. 1066 */ 1067 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) 1068 ocfs2_wake_downconvert_thread(osb); 1069 } 1070 1071 /* Locked version for callers of ocfs2_dlm_lock() */ 1072 static void lockres_clear_pending(struct ocfs2_lock_res *lockres, 1073 unsigned int generation, 1074 struct ocfs2_super *osb) 1075 { 1076 unsigned long flags; 1077 1078 spin_lock_irqsave(&lockres->l_lock, flags); 1079 __lockres_clear_pending(lockres, generation, osb); 1080 spin_unlock_irqrestore(&lockres->l_lock, flags); 1081 } 1082 1083 static unsigned int lockres_set_pending(struct ocfs2_lock_res *lockres) 1084 { 1085 assert_spin_locked(&lockres->l_lock); 1086 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 1087 1088 lockres_or_flags(lockres, OCFS2_LOCK_PENDING); 1089 1090 return lockres->l_pending_gen; 1091 } 1092 1093 static void ocfs2_blocking_ast(struct ocfs2_dlm_lksb *lksb, int level) 1094 { 1095 struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb); 1096 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 1097 int needs_downconvert; 1098 unsigned long flags; 1099 1100 BUG_ON(level <= DLM_LOCK_NL); 1101 1102 mlog(ML_BASTS, "BAST fired for lockres %s, blocking %d, level %d, " 1103 "type %s\n", lockres->l_name, level, lockres->l_level, 1104 ocfs2_lock_type_string(lockres->l_type)); 1105 1106 /* 1107 * We can skip the bast for locks which don't enable caching - 1108 * they'll be dropped at the earliest possible time anyway. 1109 */ 1110 if (lockres->l_flags & OCFS2_LOCK_NOCACHE) 1111 return; 1112 1113 spin_lock_irqsave(&lockres->l_lock, flags); 1114 needs_downconvert = ocfs2_generic_handle_bast(lockres, level); 1115 if (needs_downconvert) 1116 ocfs2_schedule_blocked_lock(osb, lockres); 1117 spin_unlock_irqrestore(&lockres->l_lock, flags); 1118 1119 wake_up(&lockres->l_event); 1120 1121 ocfs2_wake_downconvert_thread(osb); 1122 } 1123 1124 static void ocfs2_locking_ast(struct ocfs2_dlm_lksb *lksb) 1125 { 1126 struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb); 1127 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 1128 unsigned long flags; 1129 int status; 1130 1131 spin_lock_irqsave(&lockres->l_lock, flags); 1132 1133 status = ocfs2_dlm_lock_status(&lockres->l_lksb); 1134 1135 if (status == -EAGAIN) { 1136 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 1137 goto out; 1138 } 1139 1140 if (status) { 1141 mlog(ML_ERROR, "lockres %s: lksb status value of %d!\n", 1142 lockres->l_name, status); 1143 spin_unlock_irqrestore(&lockres->l_lock, flags); 1144 return; 1145 } 1146 1147 mlog(ML_BASTS, "AST fired for lockres %s, action %d, unlock %d, " 1148 "level %d => %d\n", lockres->l_name, lockres->l_action, 1149 lockres->l_unlock_action, lockres->l_level, lockres->l_requested); 1150 1151 switch(lockres->l_action) { 1152 case OCFS2_AST_ATTACH: 1153 ocfs2_generic_handle_attach_action(lockres); 1154 lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL); 1155 break; 1156 case OCFS2_AST_CONVERT: 1157 ocfs2_generic_handle_convert_action(lockres); 1158 break; 1159 case OCFS2_AST_DOWNCONVERT: 1160 ocfs2_generic_handle_downconvert_action(lockres); 1161 break; 1162 default: 1163 mlog(ML_ERROR, "lockres %s: AST fired with invalid action: %u, " 1164 "flags 0x%lx, unlock: %u\n", 1165 lockres->l_name, lockres->l_action, lockres->l_flags, 1166 lockres->l_unlock_action); 1167 BUG(); 1168 } 1169 out: 1170 /* set it to something invalid so if we get called again we 1171 * can catch it. */ 1172 lockres->l_action = OCFS2_AST_INVALID; 1173 1174 /* Did we try to cancel this lock? Clear that state */ 1175 if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) 1176 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 1177 1178 /* 1179 * We may have beaten the locking functions here. We certainly 1180 * know that dlm_lock() has been called :-) 1181 * Because we can't have two lock calls in flight at once, we 1182 * can use lockres->l_pending_gen. 1183 */ 1184 __lockres_clear_pending(lockres, lockres->l_pending_gen, osb); 1185 1186 wake_up(&lockres->l_event); 1187 spin_unlock_irqrestore(&lockres->l_lock, flags); 1188 } 1189 1190 static void ocfs2_unlock_ast(struct ocfs2_dlm_lksb *lksb, int error) 1191 { 1192 struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb); 1193 unsigned long flags; 1194 1195 mlog(ML_BASTS, "UNLOCK AST fired for lockres %s, action = %d\n", 1196 lockres->l_name, lockres->l_unlock_action); 1197 1198 spin_lock_irqsave(&lockres->l_lock, flags); 1199 if (error) { 1200 mlog(ML_ERROR, "Dlm passes error %d for lock %s, " 1201 "unlock_action %d\n", error, lockres->l_name, 1202 lockres->l_unlock_action); 1203 spin_unlock_irqrestore(&lockres->l_lock, flags); 1204 return; 1205 } 1206 1207 switch(lockres->l_unlock_action) { 1208 case OCFS2_UNLOCK_CANCEL_CONVERT: 1209 mlog(0, "Cancel convert success for %s\n", lockres->l_name); 1210 lockres->l_action = OCFS2_AST_INVALID; 1211 /* Downconvert thread may have requeued this lock, we 1212 * need to wake it. */ 1213 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) 1214 ocfs2_wake_downconvert_thread(ocfs2_get_lockres_osb(lockres)); 1215 break; 1216 case OCFS2_UNLOCK_DROP_LOCK: 1217 lockres->l_level = DLM_LOCK_IV; 1218 break; 1219 default: 1220 BUG(); 1221 } 1222 1223 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 1224 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 1225 wake_up(&lockres->l_event); 1226 spin_unlock_irqrestore(&lockres->l_lock, flags); 1227 } 1228 1229 /* 1230 * This is the filesystem locking protocol. It provides the lock handling 1231 * hooks for the underlying DLM. It has a maximum version number. 1232 * The version number allows interoperability with systems running at 1233 * the same major number and an equal or smaller minor number. 1234 * 1235 * Whenever the filesystem does new things with locks (adds or removes a 1236 * lock, orders them differently, does different things underneath a lock), 1237 * the version must be changed. The protocol is negotiated when joining 1238 * the dlm domain. A node may join the domain if its major version is 1239 * identical to all other nodes and its minor version is greater than 1240 * or equal to all other nodes. When its minor version is greater than 1241 * the other nodes, it will run at the minor version specified by the 1242 * other nodes. 1243 * 1244 * If a locking change is made that will not be compatible with older 1245 * versions, the major number must be increased and the minor version set 1246 * to zero. If a change merely adds a behavior that can be disabled when 1247 * speaking to older versions, the minor version must be increased. If a 1248 * change adds a fully backwards compatible change (eg, LVB changes that 1249 * are just ignored by older versions), the version does not need to be 1250 * updated. 1251 */ 1252 static struct ocfs2_locking_protocol lproto = { 1253 .lp_max_version = { 1254 .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR, 1255 .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR, 1256 }, 1257 .lp_lock_ast = ocfs2_locking_ast, 1258 .lp_blocking_ast = ocfs2_blocking_ast, 1259 .lp_unlock_ast = ocfs2_unlock_ast, 1260 }; 1261 1262 void ocfs2_set_locking_protocol(void) 1263 { 1264 ocfs2_stack_glue_set_max_proto_version(&lproto.lp_max_version); 1265 } 1266 1267 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres, 1268 int convert) 1269 { 1270 unsigned long flags; 1271 1272 spin_lock_irqsave(&lockres->l_lock, flags); 1273 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 1274 lockres_clear_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING); 1275 if (convert) 1276 lockres->l_action = OCFS2_AST_INVALID; 1277 else 1278 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 1279 spin_unlock_irqrestore(&lockres->l_lock, flags); 1280 1281 wake_up(&lockres->l_event); 1282 } 1283 1284 /* Note: If we detect another process working on the lock (i.e., 1285 * OCFS2_LOCK_BUSY), we'll bail out returning 0. It's up to the caller 1286 * to do the right thing in that case. 1287 */ 1288 static int ocfs2_lock_create(struct ocfs2_super *osb, 1289 struct ocfs2_lock_res *lockres, 1290 int level, 1291 u32 dlm_flags) 1292 { 1293 int ret = 0; 1294 unsigned long flags; 1295 unsigned int gen; 1296 1297 mlog(0, "lock %s, level = %d, flags = %u\n", lockres->l_name, level, 1298 dlm_flags); 1299 1300 spin_lock_irqsave(&lockres->l_lock, flags); 1301 if ((lockres->l_flags & OCFS2_LOCK_ATTACHED) || 1302 (lockres->l_flags & OCFS2_LOCK_BUSY)) { 1303 spin_unlock_irqrestore(&lockres->l_lock, flags); 1304 goto bail; 1305 } 1306 1307 lockres->l_action = OCFS2_AST_ATTACH; 1308 lockres->l_requested = level; 1309 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 1310 gen = lockres_set_pending(lockres); 1311 spin_unlock_irqrestore(&lockres->l_lock, flags); 1312 1313 ret = ocfs2_dlm_lock(osb->cconn, 1314 level, 1315 &lockres->l_lksb, 1316 dlm_flags, 1317 lockres->l_name, 1318 OCFS2_LOCK_ID_MAX_LEN - 1); 1319 lockres_clear_pending(lockres, gen, osb); 1320 if (ret) { 1321 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 1322 ocfs2_recover_from_dlm_error(lockres, 1); 1323 } 1324 1325 mlog(0, "lock %s, return from ocfs2_dlm_lock\n", lockres->l_name); 1326 1327 bail: 1328 return ret; 1329 } 1330 1331 static inline int ocfs2_check_wait_flag(struct ocfs2_lock_res *lockres, 1332 int flag) 1333 { 1334 unsigned long flags; 1335 int ret; 1336 1337 spin_lock_irqsave(&lockres->l_lock, flags); 1338 ret = lockres->l_flags & flag; 1339 spin_unlock_irqrestore(&lockres->l_lock, flags); 1340 1341 return ret; 1342 } 1343 1344 static inline void ocfs2_wait_on_busy_lock(struct ocfs2_lock_res *lockres) 1345 1346 { 1347 wait_event(lockres->l_event, 1348 !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_BUSY)); 1349 } 1350 1351 static inline void ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res *lockres) 1352 1353 { 1354 wait_event(lockres->l_event, 1355 !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_REFRESHING)); 1356 } 1357 1358 /* predict what lock level we'll be dropping down to on behalf 1359 * of another node, and return true if the currently wanted 1360 * level will be compatible with it. */ 1361 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres, 1362 int wanted) 1363 { 1364 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); 1365 1366 return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking); 1367 } 1368 1369 static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw) 1370 { 1371 INIT_LIST_HEAD(&mw->mw_item); 1372 init_completion(&mw->mw_complete); 1373 ocfs2_init_start_time(mw); 1374 } 1375 1376 static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw) 1377 { 1378 wait_for_completion(&mw->mw_complete); 1379 /* Re-arm the completion in case we want to wait on it again */ 1380 reinit_completion(&mw->mw_complete); 1381 return mw->mw_status; 1382 } 1383 1384 static void lockres_add_mask_waiter(struct ocfs2_lock_res *lockres, 1385 struct ocfs2_mask_waiter *mw, 1386 unsigned long mask, 1387 unsigned long goal) 1388 { 1389 BUG_ON(!list_empty(&mw->mw_item)); 1390 1391 assert_spin_locked(&lockres->l_lock); 1392 1393 list_add_tail(&mw->mw_item, &lockres->l_mask_waiters); 1394 mw->mw_mask = mask; 1395 mw->mw_goal = goal; 1396 } 1397 1398 /* returns 0 if the mw that was removed was already satisfied, -EBUSY 1399 * if the mask still hadn't reached its goal */ 1400 static int __lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres, 1401 struct ocfs2_mask_waiter *mw) 1402 { 1403 int ret = 0; 1404 1405 assert_spin_locked(&lockres->l_lock); 1406 if (!list_empty(&mw->mw_item)) { 1407 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal) 1408 ret = -EBUSY; 1409 1410 list_del_init(&mw->mw_item); 1411 init_completion(&mw->mw_complete); 1412 } 1413 1414 return ret; 1415 } 1416 1417 static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres, 1418 struct ocfs2_mask_waiter *mw) 1419 { 1420 unsigned long flags; 1421 int ret = 0; 1422 1423 spin_lock_irqsave(&lockres->l_lock, flags); 1424 ret = __lockres_remove_mask_waiter(lockres, mw); 1425 spin_unlock_irqrestore(&lockres->l_lock, flags); 1426 1427 return ret; 1428 1429 } 1430 1431 static int ocfs2_wait_for_mask_interruptible(struct ocfs2_mask_waiter *mw, 1432 struct ocfs2_lock_res *lockres) 1433 { 1434 int ret; 1435 1436 ret = wait_for_completion_interruptible(&mw->mw_complete); 1437 if (ret) 1438 lockres_remove_mask_waiter(lockres, mw); 1439 else 1440 ret = mw->mw_status; 1441 /* Re-arm the completion in case we want to wait on it again */ 1442 reinit_completion(&mw->mw_complete); 1443 return ret; 1444 } 1445 1446 static int __ocfs2_cluster_lock(struct ocfs2_super *osb, 1447 struct ocfs2_lock_res *lockres, 1448 int level, 1449 u32 lkm_flags, 1450 int arg_flags, 1451 int l_subclass, 1452 unsigned long caller_ip) 1453 { 1454 struct ocfs2_mask_waiter mw; 1455 int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR); 1456 int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */ 1457 unsigned long flags; 1458 unsigned int gen; 1459 int noqueue_attempted = 0; 1460 int dlm_locked = 0; 1461 int kick_dc = 0; 1462 1463 if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED)) { 1464 mlog_errno(-EINVAL); 1465 return -EINVAL; 1466 } 1467 1468 ocfs2_init_mask_waiter(&mw); 1469 1470 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) 1471 lkm_flags |= DLM_LKF_VALBLK; 1472 1473 again: 1474 wait = 0; 1475 1476 spin_lock_irqsave(&lockres->l_lock, flags); 1477 1478 if (catch_signals && signal_pending(current)) { 1479 ret = -ERESTARTSYS; 1480 goto unlock; 1481 } 1482 1483 mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING, 1484 "Cluster lock called on freeing lockres %s! flags " 1485 "0x%lx\n", lockres->l_name, lockres->l_flags); 1486 1487 /* We only compare against the currently granted level 1488 * here. If the lock is blocked waiting on a downconvert, 1489 * we'll get caught below. */ 1490 if (lockres->l_flags & OCFS2_LOCK_BUSY && 1491 level > lockres->l_level) { 1492 /* is someone sitting in dlm_lock? If so, wait on 1493 * them. */ 1494 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1495 wait = 1; 1496 goto unlock; 1497 } 1498 1499 if (lockres->l_flags & OCFS2_LOCK_UPCONVERT_FINISHING) { 1500 /* 1501 * We've upconverted. If the lock now has a level we can 1502 * work with, we take it. If, however, the lock is not at the 1503 * required level, we go thru the full cycle. One way this could 1504 * happen is if a process requesting an upconvert to PR is 1505 * closely followed by another requesting upconvert to an EX. 1506 * If the process requesting EX lands here, we want it to 1507 * continue attempting to upconvert and let the process 1508 * requesting PR take the lock. 1509 * If multiple processes request upconvert to PR, the first one 1510 * here will take the lock. The others will have to go thru the 1511 * OCFS2_LOCK_BLOCKED check to ensure that there is no pending 1512 * downconvert request. 1513 */ 1514 if (level <= lockres->l_level) 1515 goto update_holders; 1516 } 1517 1518 if (lockres->l_flags & OCFS2_LOCK_BLOCKED && 1519 !ocfs2_may_continue_on_blocked_lock(lockres, level)) { 1520 /* is the lock is currently blocked on behalf of 1521 * another node */ 1522 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BLOCKED, 0); 1523 wait = 1; 1524 goto unlock; 1525 } 1526 1527 if (level > lockres->l_level) { 1528 if (noqueue_attempted > 0) { 1529 ret = -EAGAIN; 1530 goto unlock; 1531 } 1532 if (lkm_flags & DLM_LKF_NOQUEUE) 1533 noqueue_attempted = 1; 1534 1535 if (lockres->l_action != OCFS2_AST_INVALID) 1536 mlog(ML_ERROR, "lockres %s has action %u pending\n", 1537 lockres->l_name, lockres->l_action); 1538 1539 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 1540 lockres->l_action = OCFS2_AST_ATTACH; 1541 lkm_flags &= ~DLM_LKF_CONVERT; 1542 } else { 1543 lockres->l_action = OCFS2_AST_CONVERT; 1544 lkm_flags |= DLM_LKF_CONVERT; 1545 } 1546 1547 lockres->l_requested = level; 1548 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 1549 gen = lockres_set_pending(lockres); 1550 spin_unlock_irqrestore(&lockres->l_lock, flags); 1551 1552 BUG_ON(level == DLM_LOCK_IV); 1553 BUG_ON(level == DLM_LOCK_NL); 1554 1555 mlog(ML_BASTS, "lockres %s, convert from %d to %d\n", 1556 lockres->l_name, lockres->l_level, level); 1557 1558 /* call dlm_lock to upgrade lock now */ 1559 ret = ocfs2_dlm_lock(osb->cconn, 1560 level, 1561 &lockres->l_lksb, 1562 lkm_flags, 1563 lockres->l_name, 1564 OCFS2_LOCK_ID_MAX_LEN - 1); 1565 lockres_clear_pending(lockres, gen, osb); 1566 if (ret) { 1567 if (!(lkm_flags & DLM_LKF_NOQUEUE) || 1568 (ret != -EAGAIN)) { 1569 ocfs2_log_dlm_error("ocfs2_dlm_lock", 1570 ret, lockres); 1571 } 1572 ocfs2_recover_from_dlm_error(lockres, 1); 1573 goto out; 1574 } 1575 dlm_locked = 1; 1576 1577 mlog(0, "lock %s, successful return from ocfs2_dlm_lock\n", 1578 lockres->l_name); 1579 1580 /* At this point we've gone inside the dlm and need to 1581 * complete our work regardless. */ 1582 catch_signals = 0; 1583 1584 /* wait for busy to clear and carry on */ 1585 goto again; 1586 } 1587 1588 update_holders: 1589 /* Ok, if we get here then we're good to go. */ 1590 ocfs2_inc_holders(lockres, level); 1591 1592 ret = 0; 1593 unlock: 1594 lockres_clear_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING); 1595 1596 /* ocfs2_unblock_lock reques on seeing OCFS2_LOCK_UPCONVERT_FINISHING */ 1597 kick_dc = (lockres->l_flags & OCFS2_LOCK_BLOCKED); 1598 1599 spin_unlock_irqrestore(&lockres->l_lock, flags); 1600 if (kick_dc) 1601 ocfs2_wake_downconvert_thread(osb); 1602 out: 1603 /* 1604 * This is helping work around a lock inversion between the page lock 1605 * and dlm locks. One path holds the page lock while calling aops 1606 * which block acquiring dlm locks. The voting thread holds dlm 1607 * locks while acquiring page locks while down converting data locks. 1608 * This block is helping an aop path notice the inversion and back 1609 * off to unlock its page lock before trying the dlm lock again. 1610 */ 1611 if (wait && arg_flags & OCFS2_LOCK_NONBLOCK && 1612 mw.mw_mask & (OCFS2_LOCK_BUSY|OCFS2_LOCK_BLOCKED)) { 1613 wait = 0; 1614 spin_lock_irqsave(&lockres->l_lock, flags); 1615 if (__lockres_remove_mask_waiter(lockres, &mw)) { 1616 if (dlm_locked) 1617 lockres_or_flags(lockres, 1618 OCFS2_LOCK_NONBLOCK_FINISHED); 1619 spin_unlock_irqrestore(&lockres->l_lock, flags); 1620 ret = -EAGAIN; 1621 } else { 1622 spin_unlock_irqrestore(&lockres->l_lock, flags); 1623 goto again; 1624 } 1625 } 1626 if (wait) { 1627 ret = ocfs2_wait_for_mask(&mw); 1628 if (ret == 0) 1629 goto again; 1630 mlog_errno(ret); 1631 } 1632 ocfs2_update_lock_stats(lockres, level, &mw, ret); 1633 1634 #ifdef CONFIG_DEBUG_LOCK_ALLOC 1635 if (!ret && lockres->l_lockdep_map.key != NULL) { 1636 if (level == DLM_LOCK_PR) 1637 rwsem_acquire_read(&lockres->l_lockdep_map, l_subclass, 1638 !!(arg_flags & OCFS2_META_LOCK_NOQUEUE), 1639 caller_ip); 1640 else 1641 rwsem_acquire(&lockres->l_lockdep_map, l_subclass, 1642 !!(arg_flags & OCFS2_META_LOCK_NOQUEUE), 1643 caller_ip); 1644 } 1645 #endif 1646 return ret; 1647 } 1648 1649 static inline int ocfs2_cluster_lock(struct ocfs2_super *osb, 1650 struct ocfs2_lock_res *lockres, 1651 int level, 1652 u32 lkm_flags, 1653 int arg_flags) 1654 { 1655 return __ocfs2_cluster_lock(osb, lockres, level, lkm_flags, arg_flags, 1656 0, _RET_IP_); 1657 } 1658 1659 1660 static void __ocfs2_cluster_unlock(struct ocfs2_super *osb, 1661 struct ocfs2_lock_res *lockres, 1662 int level, 1663 unsigned long caller_ip) 1664 { 1665 unsigned long flags; 1666 1667 spin_lock_irqsave(&lockres->l_lock, flags); 1668 ocfs2_dec_holders(lockres, level); 1669 ocfs2_downconvert_on_unlock(osb, lockres); 1670 spin_unlock_irqrestore(&lockres->l_lock, flags); 1671 #ifdef CONFIG_DEBUG_LOCK_ALLOC 1672 if (lockres->l_lockdep_map.key != NULL) 1673 rwsem_release(&lockres->l_lockdep_map, 1, caller_ip); 1674 #endif 1675 } 1676 1677 static int ocfs2_create_new_lock(struct ocfs2_super *osb, 1678 struct ocfs2_lock_res *lockres, 1679 int ex, 1680 int local) 1681 { 1682 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 1683 unsigned long flags; 1684 u32 lkm_flags = local ? DLM_LKF_LOCAL : 0; 1685 1686 spin_lock_irqsave(&lockres->l_lock, flags); 1687 BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED); 1688 lockres_or_flags(lockres, OCFS2_LOCK_LOCAL); 1689 spin_unlock_irqrestore(&lockres->l_lock, flags); 1690 1691 return ocfs2_lock_create(osb, lockres, level, lkm_flags); 1692 } 1693 1694 /* Grants us an EX lock on the data and metadata resources, skipping 1695 * the normal cluster directory lookup. Use this ONLY on newly created 1696 * inodes which other nodes can't possibly see, and which haven't been 1697 * hashed in the inode hash yet. This can give us a good performance 1698 * increase as it'll skip the network broadcast normally associated 1699 * with creating a new lock resource. */ 1700 int ocfs2_create_new_inode_locks(struct inode *inode) 1701 { 1702 int ret; 1703 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1704 1705 BUG_ON(!ocfs2_inode_is_new(inode)); 1706 1707 mlog(0, "Inode %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno); 1708 1709 /* NOTE: That we don't increment any of the holder counts, nor 1710 * do we add anything to a journal handle. Since this is 1711 * supposed to be a new inode which the cluster doesn't know 1712 * about yet, there is no need to. As far as the LVB handling 1713 * is concerned, this is basically like acquiring an EX lock 1714 * on a resource which has an invalid one -- we'll set it 1715 * valid when we release the EX. */ 1716 1717 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1, 1); 1718 if (ret) { 1719 mlog_errno(ret); 1720 goto bail; 1721 } 1722 1723 /* 1724 * We don't want to use DLM_LKF_LOCAL on a meta data lock as they 1725 * don't use a generation in their lock names. 1726 */ 1727 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_inode_lockres, 1, 0); 1728 if (ret) { 1729 mlog_errno(ret); 1730 goto bail; 1731 } 1732 1733 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_open_lockres, 0, 0); 1734 if (ret) 1735 mlog_errno(ret); 1736 1737 bail: 1738 return ret; 1739 } 1740 1741 int ocfs2_rw_lock(struct inode *inode, int write) 1742 { 1743 int status, level; 1744 struct ocfs2_lock_res *lockres; 1745 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1746 1747 mlog(0, "inode %llu take %s RW lock\n", 1748 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1749 write ? "EXMODE" : "PRMODE"); 1750 1751 if (ocfs2_mount_local(osb)) 1752 return 0; 1753 1754 lockres = &OCFS2_I(inode)->ip_rw_lockres; 1755 1756 level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1757 1758 status = ocfs2_cluster_lock(osb, lockres, level, 0, 0); 1759 if (status < 0) 1760 mlog_errno(status); 1761 1762 return status; 1763 } 1764 1765 int ocfs2_try_rw_lock(struct inode *inode, int write) 1766 { 1767 int status, level; 1768 struct ocfs2_lock_res *lockres; 1769 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1770 1771 mlog(0, "inode %llu try to take %s RW lock\n", 1772 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1773 write ? "EXMODE" : "PRMODE"); 1774 1775 if (ocfs2_mount_local(osb)) 1776 return 0; 1777 1778 lockres = &OCFS2_I(inode)->ip_rw_lockres; 1779 1780 level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1781 1782 status = ocfs2_cluster_lock(osb, lockres, level, DLM_LKF_NOQUEUE, 0); 1783 return status; 1784 } 1785 1786 void ocfs2_rw_unlock(struct inode *inode, int write) 1787 { 1788 int level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1789 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres; 1790 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1791 1792 mlog(0, "inode %llu drop %s RW lock\n", 1793 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1794 write ? "EXMODE" : "PRMODE"); 1795 1796 if (!ocfs2_mount_local(osb)) 1797 ocfs2_cluster_unlock(osb, lockres, level); 1798 } 1799 1800 /* 1801 * ocfs2_open_lock always get PR mode lock. 1802 */ 1803 int ocfs2_open_lock(struct inode *inode) 1804 { 1805 int status = 0; 1806 struct ocfs2_lock_res *lockres; 1807 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1808 1809 mlog(0, "inode %llu take PRMODE open lock\n", 1810 (unsigned long long)OCFS2_I(inode)->ip_blkno); 1811 1812 if (ocfs2_is_hard_readonly(osb) || ocfs2_mount_local(osb)) 1813 goto out; 1814 1815 lockres = &OCFS2_I(inode)->ip_open_lockres; 1816 1817 status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_PR, 0, 0); 1818 if (status < 0) 1819 mlog_errno(status); 1820 1821 out: 1822 return status; 1823 } 1824 1825 int ocfs2_try_open_lock(struct inode *inode, int write) 1826 { 1827 int status = 0, level; 1828 struct ocfs2_lock_res *lockres; 1829 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1830 1831 mlog(0, "inode %llu try to take %s open lock\n", 1832 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1833 write ? "EXMODE" : "PRMODE"); 1834 1835 if (ocfs2_is_hard_readonly(osb)) { 1836 if (write) 1837 status = -EROFS; 1838 goto out; 1839 } 1840 1841 if (ocfs2_mount_local(osb)) 1842 goto out; 1843 1844 lockres = &OCFS2_I(inode)->ip_open_lockres; 1845 1846 level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1847 1848 /* 1849 * The file system may already holding a PRMODE/EXMODE open lock. 1850 * Since we pass DLM_LKF_NOQUEUE, the request won't block waiting on 1851 * other nodes and the -EAGAIN will indicate to the caller that 1852 * this inode is still in use. 1853 */ 1854 status = ocfs2_cluster_lock(osb, lockres, level, DLM_LKF_NOQUEUE, 0); 1855 1856 out: 1857 return status; 1858 } 1859 1860 /* 1861 * ocfs2_open_unlock unlock PR and EX mode open locks. 1862 */ 1863 void ocfs2_open_unlock(struct inode *inode) 1864 { 1865 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_open_lockres; 1866 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1867 1868 mlog(0, "inode %llu drop open lock\n", 1869 (unsigned long long)OCFS2_I(inode)->ip_blkno); 1870 1871 if (ocfs2_mount_local(osb)) 1872 goto out; 1873 1874 if(lockres->l_ro_holders) 1875 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_PR); 1876 if(lockres->l_ex_holders) 1877 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX); 1878 1879 out: 1880 return; 1881 } 1882 1883 static int ocfs2_flock_handle_signal(struct ocfs2_lock_res *lockres, 1884 int level) 1885 { 1886 int ret; 1887 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 1888 unsigned long flags; 1889 struct ocfs2_mask_waiter mw; 1890 1891 ocfs2_init_mask_waiter(&mw); 1892 1893 retry_cancel: 1894 spin_lock_irqsave(&lockres->l_lock, flags); 1895 if (lockres->l_flags & OCFS2_LOCK_BUSY) { 1896 ret = ocfs2_prepare_cancel_convert(osb, lockres); 1897 if (ret) { 1898 spin_unlock_irqrestore(&lockres->l_lock, flags); 1899 ret = ocfs2_cancel_convert(osb, lockres); 1900 if (ret < 0) { 1901 mlog_errno(ret); 1902 goto out; 1903 } 1904 goto retry_cancel; 1905 } 1906 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1907 spin_unlock_irqrestore(&lockres->l_lock, flags); 1908 1909 ocfs2_wait_for_mask(&mw); 1910 goto retry_cancel; 1911 } 1912 1913 ret = -ERESTARTSYS; 1914 /* 1915 * We may still have gotten the lock, in which case there's no 1916 * point to restarting the syscall. 1917 */ 1918 if (lockres->l_level == level) 1919 ret = 0; 1920 1921 mlog(0, "Cancel returning %d. flags: 0x%lx, level: %d, act: %d\n", ret, 1922 lockres->l_flags, lockres->l_level, lockres->l_action); 1923 1924 spin_unlock_irqrestore(&lockres->l_lock, flags); 1925 1926 out: 1927 return ret; 1928 } 1929 1930 /* 1931 * ocfs2_file_lock() and ocfs2_file_unlock() map to a single pair of 1932 * flock() calls. The locking approach this requires is sufficiently 1933 * different from all other cluster lock types that we implement a 1934 * separate path to the "low-level" dlm calls. In particular: 1935 * 1936 * - No optimization of lock levels is done - we take at exactly 1937 * what's been requested. 1938 * 1939 * - No lock caching is employed. We immediately downconvert to 1940 * no-lock at unlock time. This also means flock locks never go on 1941 * the blocking list). 1942 * 1943 * - Since userspace can trivially deadlock itself with flock, we make 1944 * sure to allow cancellation of a misbehaving applications flock() 1945 * request. 1946 * 1947 * - Access to any flock lockres doesn't require concurrency, so we 1948 * can simplify the code by requiring the caller to guarantee 1949 * serialization of dlmglue flock calls. 1950 */ 1951 int ocfs2_file_lock(struct file *file, int ex, int trylock) 1952 { 1953 int ret, level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 1954 unsigned int lkm_flags = trylock ? DLM_LKF_NOQUEUE : 0; 1955 unsigned long flags; 1956 struct ocfs2_file_private *fp = file->private_data; 1957 struct ocfs2_lock_res *lockres = &fp->fp_flock; 1958 struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb); 1959 struct ocfs2_mask_waiter mw; 1960 1961 ocfs2_init_mask_waiter(&mw); 1962 1963 if ((lockres->l_flags & OCFS2_LOCK_BUSY) || 1964 (lockres->l_level > DLM_LOCK_NL)) { 1965 mlog(ML_ERROR, 1966 "File lock \"%s\" has busy or locked state: flags: 0x%lx, " 1967 "level: %u\n", lockres->l_name, lockres->l_flags, 1968 lockres->l_level); 1969 return -EINVAL; 1970 } 1971 1972 spin_lock_irqsave(&lockres->l_lock, flags); 1973 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 1974 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1975 spin_unlock_irqrestore(&lockres->l_lock, flags); 1976 1977 /* 1978 * Get the lock at NLMODE to start - that way we 1979 * can cancel the upconvert request if need be. 1980 */ 1981 ret = ocfs2_lock_create(osb, lockres, DLM_LOCK_NL, 0); 1982 if (ret < 0) { 1983 mlog_errno(ret); 1984 goto out; 1985 } 1986 1987 ret = ocfs2_wait_for_mask(&mw); 1988 if (ret) { 1989 mlog_errno(ret); 1990 goto out; 1991 } 1992 spin_lock_irqsave(&lockres->l_lock, flags); 1993 } 1994 1995 lockres->l_action = OCFS2_AST_CONVERT; 1996 lkm_flags |= DLM_LKF_CONVERT; 1997 lockres->l_requested = level; 1998 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 1999 2000 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 2001 spin_unlock_irqrestore(&lockres->l_lock, flags); 2002 2003 ret = ocfs2_dlm_lock(osb->cconn, level, &lockres->l_lksb, lkm_flags, 2004 lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1); 2005 if (ret) { 2006 if (!trylock || (ret != -EAGAIN)) { 2007 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 2008 ret = -EINVAL; 2009 } 2010 2011 ocfs2_recover_from_dlm_error(lockres, 1); 2012 lockres_remove_mask_waiter(lockres, &mw); 2013 goto out; 2014 } 2015 2016 ret = ocfs2_wait_for_mask_interruptible(&mw, lockres); 2017 if (ret == -ERESTARTSYS) { 2018 /* 2019 * Userspace can cause deadlock itself with 2020 * flock(). Current behavior locally is to allow the 2021 * deadlock, but abort the system call if a signal is 2022 * received. We follow this example, otherwise a 2023 * poorly written program could sit in kernel until 2024 * reboot. 2025 * 2026 * Handling this is a bit more complicated for Ocfs2 2027 * though. We can't exit this function with an 2028 * outstanding lock request, so a cancel convert is 2029 * required. We intentionally overwrite 'ret' - if the 2030 * cancel fails and the lock was granted, it's easier 2031 * to just bubble success back up to the user. 2032 */ 2033 ret = ocfs2_flock_handle_signal(lockres, level); 2034 } else if (!ret && (level > lockres->l_level)) { 2035 /* Trylock failed asynchronously */ 2036 BUG_ON(!trylock); 2037 ret = -EAGAIN; 2038 } 2039 2040 out: 2041 2042 mlog(0, "Lock: \"%s\" ex: %d, trylock: %d, returns: %d\n", 2043 lockres->l_name, ex, trylock, ret); 2044 return ret; 2045 } 2046 2047 void ocfs2_file_unlock(struct file *file) 2048 { 2049 int ret; 2050 unsigned int gen; 2051 unsigned long flags; 2052 struct ocfs2_file_private *fp = file->private_data; 2053 struct ocfs2_lock_res *lockres = &fp->fp_flock; 2054 struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb); 2055 struct ocfs2_mask_waiter mw; 2056 2057 ocfs2_init_mask_waiter(&mw); 2058 2059 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) 2060 return; 2061 2062 if (lockres->l_level == DLM_LOCK_NL) 2063 return; 2064 2065 mlog(0, "Unlock: \"%s\" flags: 0x%lx, level: %d, act: %d\n", 2066 lockres->l_name, lockres->l_flags, lockres->l_level, 2067 lockres->l_action); 2068 2069 spin_lock_irqsave(&lockres->l_lock, flags); 2070 /* 2071 * Fake a blocking ast for the downconvert code. 2072 */ 2073 lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED); 2074 lockres->l_blocking = DLM_LOCK_EX; 2075 2076 gen = ocfs2_prepare_downconvert(lockres, DLM_LOCK_NL); 2077 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 2078 spin_unlock_irqrestore(&lockres->l_lock, flags); 2079 2080 ret = ocfs2_downconvert_lock(osb, lockres, DLM_LOCK_NL, 0, gen); 2081 if (ret) { 2082 mlog_errno(ret); 2083 return; 2084 } 2085 2086 ret = ocfs2_wait_for_mask(&mw); 2087 if (ret) 2088 mlog_errno(ret); 2089 } 2090 2091 static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb, 2092 struct ocfs2_lock_res *lockres) 2093 { 2094 int kick = 0; 2095 2096 /* If we know that another node is waiting on our lock, kick 2097 * the downconvert thread * pre-emptively when we reach a release 2098 * condition. */ 2099 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) { 2100 switch(lockres->l_blocking) { 2101 case DLM_LOCK_EX: 2102 if (!lockres->l_ex_holders && !lockres->l_ro_holders) 2103 kick = 1; 2104 break; 2105 case DLM_LOCK_PR: 2106 if (!lockres->l_ex_holders) 2107 kick = 1; 2108 break; 2109 default: 2110 BUG(); 2111 } 2112 } 2113 2114 if (kick) 2115 ocfs2_wake_downconvert_thread(osb); 2116 } 2117 2118 #define OCFS2_SEC_BITS 34 2119 #define OCFS2_SEC_SHIFT (64 - 34) 2120 #define OCFS2_NSEC_MASK ((1ULL << OCFS2_SEC_SHIFT) - 1) 2121 2122 /* LVB only has room for 64 bits of time here so we pack it for 2123 * now. */ 2124 static u64 ocfs2_pack_timespec(struct timespec *spec) 2125 { 2126 u64 res; 2127 u64 sec = spec->tv_sec; 2128 u32 nsec = spec->tv_nsec; 2129 2130 res = (sec << OCFS2_SEC_SHIFT) | (nsec & OCFS2_NSEC_MASK); 2131 2132 return res; 2133 } 2134 2135 /* Call this with the lockres locked. I am reasonably sure we don't 2136 * need ip_lock in this function as anyone who would be changing those 2137 * values is supposed to be blocked in ocfs2_inode_lock right now. */ 2138 static void __ocfs2_stuff_meta_lvb(struct inode *inode) 2139 { 2140 struct ocfs2_inode_info *oi = OCFS2_I(inode); 2141 struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; 2142 struct ocfs2_meta_lvb *lvb; 2143 struct timespec ts; 2144 2145 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2146 2147 /* 2148 * Invalidate the LVB of a deleted inode - this way other 2149 * nodes are forced to go to disk and discover the new inode 2150 * status. 2151 */ 2152 if (oi->ip_flags & OCFS2_INODE_DELETED) { 2153 lvb->lvb_version = 0; 2154 goto out; 2155 } 2156 2157 lvb->lvb_version = OCFS2_LVB_VERSION; 2158 lvb->lvb_isize = cpu_to_be64(i_size_read(inode)); 2159 lvb->lvb_iclusters = cpu_to_be32(oi->ip_clusters); 2160 lvb->lvb_iuid = cpu_to_be32(i_uid_read(inode)); 2161 lvb->lvb_igid = cpu_to_be32(i_gid_read(inode)); 2162 lvb->lvb_imode = cpu_to_be16(inode->i_mode); 2163 lvb->lvb_inlink = cpu_to_be16(inode->i_nlink); 2164 ts = timespec64_to_timespec(inode->i_atime); 2165 lvb->lvb_iatime_packed = 2166 cpu_to_be64(ocfs2_pack_timespec(&ts)); 2167 ts = timespec64_to_timespec(inode->i_ctime); 2168 lvb->lvb_ictime_packed = 2169 cpu_to_be64(ocfs2_pack_timespec(&ts)); 2170 ts = timespec64_to_timespec(inode->i_mtime); 2171 lvb->lvb_imtime_packed = 2172 cpu_to_be64(ocfs2_pack_timespec(&ts)); 2173 lvb->lvb_iattr = cpu_to_be32(oi->ip_attr); 2174 lvb->lvb_idynfeatures = cpu_to_be16(oi->ip_dyn_features); 2175 lvb->lvb_igeneration = cpu_to_be32(inode->i_generation); 2176 2177 out: 2178 mlog_meta_lvb(0, lockres); 2179 } 2180 2181 static void ocfs2_unpack_timespec(struct timespec *spec, 2182 u64 packed_time) 2183 { 2184 spec->tv_sec = packed_time >> OCFS2_SEC_SHIFT; 2185 spec->tv_nsec = packed_time & OCFS2_NSEC_MASK; 2186 } 2187 2188 static void ocfs2_refresh_inode_from_lvb(struct inode *inode) 2189 { 2190 struct timespec ts; 2191 struct ocfs2_inode_info *oi = OCFS2_I(inode); 2192 struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; 2193 struct ocfs2_meta_lvb *lvb; 2194 2195 mlog_meta_lvb(0, lockres); 2196 2197 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2198 2199 /* We're safe here without the lockres lock... */ 2200 spin_lock(&oi->ip_lock); 2201 oi->ip_clusters = be32_to_cpu(lvb->lvb_iclusters); 2202 i_size_write(inode, be64_to_cpu(lvb->lvb_isize)); 2203 2204 oi->ip_attr = be32_to_cpu(lvb->lvb_iattr); 2205 oi->ip_dyn_features = be16_to_cpu(lvb->lvb_idynfeatures); 2206 ocfs2_set_inode_flags(inode); 2207 2208 /* fast-symlinks are a special case */ 2209 if (S_ISLNK(inode->i_mode) && !oi->ip_clusters) 2210 inode->i_blocks = 0; 2211 else 2212 inode->i_blocks = ocfs2_inode_sector_count(inode); 2213 2214 i_uid_write(inode, be32_to_cpu(lvb->lvb_iuid)); 2215 i_gid_write(inode, be32_to_cpu(lvb->lvb_igid)); 2216 inode->i_mode = be16_to_cpu(lvb->lvb_imode); 2217 set_nlink(inode, be16_to_cpu(lvb->lvb_inlink)); 2218 ocfs2_unpack_timespec(&ts, 2219 be64_to_cpu(lvb->lvb_iatime_packed)); 2220 inode->i_atime = timespec_to_timespec64(ts); 2221 ocfs2_unpack_timespec(&ts, 2222 be64_to_cpu(lvb->lvb_imtime_packed)); 2223 inode->i_mtime = timespec_to_timespec64(ts); 2224 ocfs2_unpack_timespec(&ts, 2225 be64_to_cpu(lvb->lvb_ictime_packed)); 2226 inode->i_ctime = timespec_to_timespec64(ts); 2227 spin_unlock(&oi->ip_lock); 2228 } 2229 2230 static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode, 2231 struct ocfs2_lock_res *lockres) 2232 { 2233 struct ocfs2_meta_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2234 2235 if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) 2236 && lvb->lvb_version == OCFS2_LVB_VERSION 2237 && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation) 2238 return 1; 2239 return 0; 2240 } 2241 2242 /* Determine whether a lock resource needs to be refreshed, and 2243 * arbitrate who gets to refresh it. 2244 * 2245 * 0 means no refresh needed. 2246 * 2247 * > 0 means you need to refresh this and you MUST call 2248 * ocfs2_complete_lock_res_refresh afterwards. */ 2249 static int ocfs2_should_refresh_lock_res(struct ocfs2_lock_res *lockres) 2250 { 2251 unsigned long flags; 2252 int status = 0; 2253 2254 refresh_check: 2255 spin_lock_irqsave(&lockres->l_lock, flags); 2256 if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) { 2257 spin_unlock_irqrestore(&lockres->l_lock, flags); 2258 goto bail; 2259 } 2260 2261 if (lockres->l_flags & OCFS2_LOCK_REFRESHING) { 2262 spin_unlock_irqrestore(&lockres->l_lock, flags); 2263 2264 ocfs2_wait_on_refreshing_lock(lockres); 2265 goto refresh_check; 2266 } 2267 2268 /* Ok, I'll be the one to refresh this lock. */ 2269 lockres_or_flags(lockres, OCFS2_LOCK_REFRESHING); 2270 spin_unlock_irqrestore(&lockres->l_lock, flags); 2271 2272 status = 1; 2273 bail: 2274 mlog(0, "status %d\n", status); 2275 return status; 2276 } 2277 2278 /* If status is non zero, I'll mark it as not being in refresh 2279 * anymroe, but i won't clear the needs refresh flag. */ 2280 static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockres, 2281 int status) 2282 { 2283 unsigned long flags; 2284 2285 spin_lock_irqsave(&lockres->l_lock, flags); 2286 lockres_clear_flags(lockres, OCFS2_LOCK_REFRESHING); 2287 if (!status) 2288 lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 2289 spin_unlock_irqrestore(&lockres->l_lock, flags); 2290 2291 wake_up(&lockres->l_event); 2292 } 2293 2294 /* may or may not return a bh if it went to disk. */ 2295 static int ocfs2_inode_lock_update(struct inode *inode, 2296 struct buffer_head **bh) 2297 { 2298 int status = 0; 2299 struct ocfs2_inode_info *oi = OCFS2_I(inode); 2300 struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; 2301 struct ocfs2_dinode *fe; 2302 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 2303 2304 if (ocfs2_mount_local(osb)) 2305 goto bail; 2306 2307 spin_lock(&oi->ip_lock); 2308 if (oi->ip_flags & OCFS2_INODE_DELETED) { 2309 mlog(0, "Orphaned inode %llu was deleted while we " 2310 "were waiting on a lock. ip_flags = 0x%x\n", 2311 (unsigned long long)oi->ip_blkno, oi->ip_flags); 2312 spin_unlock(&oi->ip_lock); 2313 status = -ENOENT; 2314 goto bail; 2315 } 2316 spin_unlock(&oi->ip_lock); 2317 2318 if (!ocfs2_should_refresh_lock_res(lockres)) 2319 goto bail; 2320 2321 /* This will discard any caching information we might have had 2322 * for the inode metadata. */ 2323 ocfs2_metadata_cache_purge(INODE_CACHE(inode)); 2324 2325 ocfs2_extent_map_trunc(inode, 0); 2326 2327 if (ocfs2_meta_lvb_is_trustable(inode, lockres)) { 2328 mlog(0, "Trusting LVB on inode %llu\n", 2329 (unsigned long long)oi->ip_blkno); 2330 ocfs2_refresh_inode_from_lvb(inode); 2331 } else { 2332 /* Boo, we have to go to disk. */ 2333 /* read bh, cast, ocfs2_refresh_inode */ 2334 status = ocfs2_read_inode_block(inode, bh); 2335 if (status < 0) { 2336 mlog_errno(status); 2337 goto bail_refresh; 2338 } 2339 fe = (struct ocfs2_dinode *) (*bh)->b_data; 2340 2341 /* This is a good chance to make sure we're not 2342 * locking an invalid object. ocfs2_read_inode_block() 2343 * already checked that the inode block is sane. 2344 * 2345 * We bug on a stale inode here because we checked 2346 * above whether it was wiped from disk. The wiping 2347 * node provides a guarantee that we receive that 2348 * message and can mark the inode before dropping any 2349 * locks associated with it. */ 2350 mlog_bug_on_msg(inode->i_generation != 2351 le32_to_cpu(fe->i_generation), 2352 "Invalid dinode %llu disk generation: %u " 2353 "inode->i_generation: %u\n", 2354 (unsigned long long)oi->ip_blkno, 2355 le32_to_cpu(fe->i_generation), 2356 inode->i_generation); 2357 mlog_bug_on_msg(le64_to_cpu(fe->i_dtime) || 2358 !(fe->i_flags & cpu_to_le32(OCFS2_VALID_FL)), 2359 "Stale dinode %llu dtime: %llu flags: 0x%x\n", 2360 (unsigned long long)oi->ip_blkno, 2361 (unsigned long long)le64_to_cpu(fe->i_dtime), 2362 le32_to_cpu(fe->i_flags)); 2363 2364 ocfs2_refresh_inode(inode, fe); 2365 ocfs2_track_lock_refresh(lockres); 2366 } 2367 2368 status = 0; 2369 bail_refresh: 2370 ocfs2_complete_lock_res_refresh(lockres, status); 2371 bail: 2372 return status; 2373 } 2374 2375 static int ocfs2_assign_bh(struct inode *inode, 2376 struct buffer_head **ret_bh, 2377 struct buffer_head *passed_bh) 2378 { 2379 int status; 2380 2381 if (passed_bh) { 2382 /* Ok, the update went to disk for us, use the 2383 * returned bh. */ 2384 *ret_bh = passed_bh; 2385 get_bh(*ret_bh); 2386 2387 return 0; 2388 } 2389 2390 status = ocfs2_read_inode_block(inode, ret_bh); 2391 if (status < 0) 2392 mlog_errno(status); 2393 2394 return status; 2395 } 2396 2397 /* 2398 * returns < 0 error if the callback will never be called, otherwise 2399 * the result of the lock will be communicated via the callback. 2400 */ 2401 int ocfs2_inode_lock_full_nested(struct inode *inode, 2402 struct buffer_head **ret_bh, 2403 int ex, 2404 int arg_flags, 2405 int subclass) 2406 { 2407 int status, level, acquired; 2408 u32 dlm_flags; 2409 struct ocfs2_lock_res *lockres = NULL; 2410 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 2411 struct buffer_head *local_bh = NULL; 2412 2413 mlog(0, "inode %llu, take %s META lock\n", 2414 (unsigned long long)OCFS2_I(inode)->ip_blkno, 2415 ex ? "EXMODE" : "PRMODE"); 2416 2417 status = 0; 2418 acquired = 0; 2419 /* We'll allow faking a readonly metadata lock for 2420 * rodevices. */ 2421 if (ocfs2_is_hard_readonly(osb)) { 2422 if (ex) 2423 status = -EROFS; 2424 goto getbh; 2425 } 2426 2427 if ((arg_flags & OCFS2_META_LOCK_GETBH) || 2428 ocfs2_mount_local(osb)) 2429 goto update; 2430 2431 if (!(arg_flags & OCFS2_META_LOCK_RECOVERY)) 2432 ocfs2_wait_for_recovery(osb); 2433 2434 lockres = &OCFS2_I(inode)->ip_inode_lockres; 2435 level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2436 dlm_flags = 0; 2437 if (arg_flags & OCFS2_META_LOCK_NOQUEUE) 2438 dlm_flags |= DLM_LKF_NOQUEUE; 2439 2440 status = __ocfs2_cluster_lock(osb, lockres, level, dlm_flags, 2441 arg_flags, subclass, _RET_IP_); 2442 if (status < 0) { 2443 if (status != -EAGAIN) 2444 mlog_errno(status); 2445 goto bail; 2446 } 2447 2448 /* Notify the error cleanup path to drop the cluster lock. */ 2449 acquired = 1; 2450 2451 /* We wait twice because a node may have died while we were in 2452 * the lower dlm layers. The second time though, we've 2453 * committed to owning this lock so we don't allow signals to 2454 * abort the operation. */ 2455 if (!(arg_flags & OCFS2_META_LOCK_RECOVERY)) 2456 ocfs2_wait_for_recovery(osb); 2457 2458 update: 2459 /* 2460 * We only see this flag if we're being called from 2461 * ocfs2_read_locked_inode(). It means we're locking an inode 2462 * which hasn't been populated yet, so clear the refresh flag 2463 * and let the caller handle it. 2464 */ 2465 if (inode->i_state & I_NEW) { 2466 status = 0; 2467 if (lockres) 2468 ocfs2_complete_lock_res_refresh(lockres, 0); 2469 goto bail; 2470 } 2471 2472 /* This is fun. The caller may want a bh back, or it may 2473 * not. ocfs2_inode_lock_update definitely wants one in, but 2474 * may or may not read one, depending on what's in the 2475 * LVB. The result of all of this is that we've *only* gone to 2476 * disk if we have to, so the complexity is worthwhile. */ 2477 status = ocfs2_inode_lock_update(inode, &local_bh); 2478 if (status < 0) { 2479 if (status != -ENOENT) 2480 mlog_errno(status); 2481 goto bail; 2482 } 2483 getbh: 2484 if (ret_bh) { 2485 status = ocfs2_assign_bh(inode, ret_bh, local_bh); 2486 if (status < 0) { 2487 mlog_errno(status); 2488 goto bail; 2489 } 2490 } 2491 2492 bail: 2493 if (status < 0) { 2494 if (ret_bh && (*ret_bh)) { 2495 brelse(*ret_bh); 2496 *ret_bh = NULL; 2497 } 2498 if (acquired) 2499 ocfs2_inode_unlock(inode, ex); 2500 } 2501 2502 if (local_bh) 2503 brelse(local_bh); 2504 2505 return status; 2506 } 2507 2508 /* 2509 * This is working around a lock inversion between tasks acquiring DLM 2510 * locks while holding a page lock and the downconvert thread which 2511 * blocks dlm lock acquiry while acquiring page locks. 2512 * 2513 * ** These _with_page variantes are only intended to be called from aop 2514 * methods that hold page locks and return a very specific *positive* error 2515 * code that aop methods pass up to the VFS -- test for errors with != 0. ** 2516 * 2517 * The DLM is called such that it returns -EAGAIN if it would have 2518 * blocked waiting for the downconvert thread. In that case we unlock 2519 * our page so the downconvert thread can make progress. Once we've 2520 * done this we have to return AOP_TRUNCATED_PAGE so the aop method 2521 * that called us can bubble that back up into the VFS who will then 2522 * immediately retry the aop call. 2523 */ 2524 int ocfs2_inode_lock_with_page(struct inode *inode, 2525 struct buffer_head **ret_bh, 2526 int ex, 2527 struct page *page) 2528 { 2529 int ret; 2530 2531 ret = ocfs2_inode_lock_full(inode, ret_bh, ex, OCFS2_LOCK_NONBLOCK); 2532 if (ret == -EAGAIN) { 2533 unlock_page(page); 2534 /* 2535 * If we can't get inode lock immediately, we should not return 2536 * directly here, since this will lead to a softlockup problem. 2537 * The method is to get a blocking lock and immediately unlock 2538 * before returning, this can avoid CPU resource waste due to 2539 * lots of retries, and benefits fairness in getting lock. 2540 */ 2541 if (ocfs2_inode_lock(inode, ret_bh, ex) == 0) 2542 ocfs2_inode_unlock(inode, ex); 2543 ret = AOP_TRUNCATED_PAGE; 2544 } 2545 2546 return ret; 2547 } 2548 2549 int ocfs2_inode_lock_atime(struct inode *inode, 2550 struct vfsmount *vfsmnt, 2551 int *level, int wait) 2552 { 2553 int ret; 2554 2555 if (wait) 2556 ret = ocfs2_inode_lock(inode, NULL, 0); 2557 else 2558 ret = ocfs2_try_inode_lock(inode, NULL, 0); 2559 2560 if (ret < 0) { 2561 if (ret != -EAGAIN) 2562 mlog_errno(ret); 2563 return ret; 2564 } 2565 2566 /* 2567 * If we should update atime, we will get EX lock, 2568 * otherwise we just get PR lock. 2569 */ 2570 if (ocfs2_should_update_atime(inode, vfsmnt)) { 2571 struct buffer_head *bh = NULL; 2572 2573 ocfs2_inode_unlock(inode, 0); 2574 if (wait) 2575 ret = ocfs2_inode_lock(inode, &bh, 1); 2576 else 2577 ret = ocfs2_try_inode_lock(inode, &bh, 1); 2578 2579 if (ret < 0) { 2580 if (ret != -EAGAIN) 2581 mlog_errno(ret); 2582 return ret; 2583 } 2584 *level = 1; 2585 if (ocfs2_should_update_atime(inode, vfsmnt)) 2586 ocfs2_update_inode_atime(inode, bh); 2587 if (bh) 2588 brelse(bh); 2589 } else 2590 *level = 0; 2591 2592 return ret; 2593 } 2594 2595 void ocfs2_inode_unlock(struct inode *inode, 2596 int ex) 2597 { 2598 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2599 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_inode_lockres; 2600 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 2601 2602 mlog(0, "inode %llu drop %s META lock\n", 2603 (unsigned long long)OCFS2_I(inode)->ip_blkno, 2604 ex ? "EXMODE" : "PRMODE"); 2605 2606 if (!ocfs2_is_hard_readonly(osb) && 2607 !ocfs2_mount_local(osb)) 2608 ocfs2_cluster_unlock(osb, lockres, level); 2609 } 2610 2611 /* 2612 * This _tracker variantes are introduced to deal with the recursive cluster 2613 * locking issue. The idea is to keep track of a lock holder on the stack of 2614 * the current process. If there's a lock holder on the stack, we know the 2615 * task context is already protected by cluster locking. Currently, they're 2616 * used in some VFS entry routines. 2617 * 2618 * return < 0 on error, return == 0 if there's no lock holder on the stack 2619 * before this call, return == 1 if this call would be a recursive locking. 2620 * return == -1 if this lock attempt will cause an upgrade which is forbidden. 2621 * 2622 * When taking lock levels into account,we face some different situations. 2623 * 2624 * 1. no lock is held 2625 * In this case, just lock the inode as requested and return 0 2626 * 2627 * 2. We are holding a lock 2628 * For this situation, things diverges into several cases 2629 * 2630 * wanted holding what to do 2631 * ex ex see 2.1 below 2632 * ex pr see 2.2 below 2633 * pr ex see 2.1 below 2634 * pr pr see 2.1 below 2635 * 2636 * 2.1 lock level that is been held is compatible 2637 * with the wanted level, so no lock action will be tacken. 2638 * 2639 * 2.2 Otherwise, an upgrade is needed, but it is forbidden. 2640 * 2641 * Reason why upgrade within a process is forbidden is that 2642 * lock upgrade may cause dead lock. The following illustrates 2643 * how it happens. 2644 * 2645 * thread on node1 thread on node2 2646 * ocfs2_inode_lock_tracker(ex=0) 2647 * 2648 * <====== ocfs2_inode_lock_tracker(ex=1) 2649 * 2650 * ocfs2_inode_lock_tracker(ex=1) 2651 */ 2652 int ocfs2_inode_lock_tracker(struct inode *inode, 2653 struct buffer_head **ret_bh, 2654 int ex, 2655 struct ocfs2_lock_holder *oh) 2656 { 2657 int status = 0; 2658 struct ocfs2_lock_res *lockres; 2659 struct ocfs2_lock_holder *tmp_oh; 2660 struct pid *pid = task_pid(current); 2661 2662 2663 lockres = &OCFS2_I(inode)->ip_inode_lockres; 2664 tmp_oh = ocfs2_pid_holder(lockres, pid); 2665 2666 if (!tmp_oh) { 2667 /* 2668 * This corresponds to the case 1. 2669 * We haven't got any lock before. 2670 */ 2671 status = ocfs2_inode_lock_full(inode, ret_bh, ex, 0); 2672 if (status < 0) { 2673 if (status != -ENOENT) 2674 mlog_errno(status); 2675 return status; 2676 } 2677 2678 oh->oh_ex = ex; 2679 ocfs2_add_holder(lockres, oh); 2680 return 0; 2681 } 2682 2683 if (unlikely(ex && !tmp_oh->oh_ex)) { 2684 /* 2685 * case 2.2 upgrade may cause dead lock, forbid it. 2686 */ 2687 mlog(ML_ERROR, "Recursive locking is not permitted to " 2688 "upgrade to EX level from PR level.\n"); 2689 dump_stack(); 2690 return -EINVAL; 2691 } 2692 2693 /* 2694 * case 2.1 OCFS2_META_LOCK_GETBH flag make ocfs2_inode_lock_full. 2695 * ignore the lock level and just update it. 2696 */ 2697 if (ret_bh) { 2698 status = ocfs2_inode_lock_full(inode, ret_bh, ex, 2699 OCFS2_META_LOCK_GETBH); 2700 if (status < 0) { 2701 if (status != -ENOENT) 2702 mlog_errno(status); 2703 return status; 2704 } 2705 } 2706 return tmp_oh ? 1 : 0; 2707 } 2708 2709 void ocfs2_inode_unlock_tracker(struct inode *inode, 2710 int ex, 2711 struct ocfs2_lock_holder *oh, 2712 int had_lock) 2713 { 2714 struct ocfs2_lock_res *lockres; 2715 2716 lockres = &OCFS2_I(inode)->ip_inode_lockres; 2717 /* had_lock means that the currect process already takes the cluster 2718 * lock previously. 2719 * If had_lock is 1, we have nothing to do here. 2720 * If had_lock is 0, we will release the lock. 2721 */ 2722 if (!had_lock) { 2723 ocfs2_inode_unlock(inode, oh->oh_ex); 2724 ocfs2_remove_holder(lockres, oh); 2725 } 2726 } 2727 2728 int ocfs2_orphan_scan_lock(struct ocfs2_super *osb, u32 *seqno) 2729 { 2730 struct ocfs2_lock_res *lockres; 2731 struct ocfs2_orphan_scan_lvb *lvb; 2732 int status = 0; 2733 2734 if (ocfs2_is_hard_readonly(osb)) 2735 return -EROFS; 2736 2737 if (ocfs2_mount_local(osb)) 2738 return 0; 2739 2740 lockres = &osb->osb_orphan_scan.os_lockres; 2741 status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0); 2742 if (status < 0) 2743 return status; 2744 2745 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2746 if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) && 2747 lvb->lvb_version == OCFS2_ORPHAN_LVB_VERSION) 2748 *seqno = be32_to_cpu(lvb->lvb_os_seqno); 2749 else 2750 *seqno = osb->osb_orphan_scan.os_seqno + 1; 2751 2752 return status; 2753 } 2754 2755 void ocfs2_orphan_scan_unlock(struct ocfs2_super *osb, u32 seqno) 2756 { 2757 struct ocfs2_lock_res *lockres; 2758 struct ocfs2_orphan_scan_lvb *lvb; 2759 2760 if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb)) { 2761 lockres = &osb->osb_orphan_scan.os_lockres; 2762 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2763 lvb->lvb_version = OCFS2_ORPHAN_LVB_VERSION; 2764 lvb->lvb_os_seqno = cpu_to_be32(seqno); 2765 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX); 2766 } 2767 } 2768 2769 int ocfs2_super_lock(struct ocfs2_super *osb, 2770 int ex) 2771 { 2772 int status = 0; 2773 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2774 struct ocfs2_lock_res *lockres = &osb->osb_super_lockres; 2775 2776 if (ocfs2_is_hard_readonly(osb)) 2777 return -EROFS; 2778 2779 if (ocfs2_mount_local(osb)) 2780 goto bail; 2781 2782 status = ocfs2_cluster_lock(osb, lockres, level, 0, 0); 2783 if (status < 0) { 2784 mlog_errno(status); 2785 goto bail; 2786 } 2787 2788 /* The super block lock path is really in the best position to 2789 * know when resources covered by the lock need to be 2790 * refreshed, so we do it here. Of course, making sense of 2791 * everything is up to the caller :) */ 2792 status = ocfs2_should_refresh_lock_res(lockres); 2793 if (status) { 2794 status = ocfs2_refresh_slot_info(osb); 2795 2796 ocfs2_complete_lock_res_refresh(lockres, status); 2797 2798 if (status < 0) { 2799 ocfs2_cluster_unlock(osb, lockres, level); 2800 mlog_errno(status); 2801 } 2802 ocfs2_track_lock_refresh(lockres); 2803 } 2804 bail: 2805 return status; 2806 } 2807 2808 void ocfs2_super_unlock(struct ocfs2_super *osb, 2809 int ex) 2810 { 2811 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2812 struct ocfs2_lock_res *lockres = &osb->osb_super_lockres; 2813 2814 if (!ocfs2_mount_local(osb)) 2815 ocfs2_cluster_unlock(osb, lockres, level); 2816 } 2817 2818 int ocfs2_rename_lock(struct ocfs2_super *osb) 2819 { 2820 int status; 2821 struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres; 2822 2823 if (ocfs2_is_hard_readonly(osb)) 2824 return -EROFS; 2825 2826 if (ocfs2_mount_local(osb)) 2827 return 0; 2828 2829 status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0); 2830 if (status < 0) 2831 mlog_errno(status); 2832 2833 return status; 2834 } 2835 2836 void ocfs2_rename_unlock(struct ocfs2_super *osb) 2837 { 2838 struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres; 2839 2840 if (!ocfs2_mount_local(osb)) 2841 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX); 2842 } 2843 2844 int ocfs2_nfs_sync_lock(struct ocfs2_super *osb, int ex) 2845 { 2846 int status; 2847 struct ocfs2_lock_res *lockres = &osb->osb_nfs_sync_lockres; 2848 2849 if (ocfs2_is_hard_readonly(osb)) 2850 return -EROFS; 2851 2852 if (ocfs2_mount_local(osb)) 2853 return 0; 2854 2855 status = ocfs2_cluster_lock(osb, lockres, ex ? LKM_EXMODE : LKM_PRMODE, 2856 0, 0); 2857 if (status < 0) 2858 mlog(ML_ERROR, "lock on nfs sync lock failed %d\n", status); 2859 2860 return status; 2861 } 2862 2863 void ocfs2_nfs_sync_unlock(struct ocfs2_super *osb, int ex) 2864 { 2865 struct ocfs2_lock_res *lockres = &osb->osb_nfs_sync_lockres; 2866 2867 if (!ocfs2_mount_local(osb)) 2868 ocfs2_cluster_unlock(osb, lockres, 2869 ex ? LKM_EXMODE : LKM_PRMODE); 2870 } 2871 2872 int ocfs2_trim_fs_lock(struct ocfs2_super *osb, 2873 struct ocfs2_trim_fs_info *info, int trylock) 2874 { 2875 int status; 2876 struct ocfs2_trim_fs_lvb *lvb; 2877 struct ocfs2_lock_res *lockres = &osb->osb_trim_fs_lockres; 2878 2879 if (info) 2880 info->tf_valid = 0; 2881 2882 if (ocfs2_is_hard_readonly(osb)) 2883 return -EROFS; 2884 2885 if (ocfs2_mount_local(osb)) 2886 return 0; 2887 2888 status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 2889 trylock ? DLM_LKF_NOQUEUE : 0, 0); 2890 if (status < 0) { 2891 if (status != -EAGAIN) 2892 mlog_errno(status); 2893 return status; 2894 } 2895 2896 if (info) { 2897 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2898 if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) && 2899 lvb->lvb_version == OCFS2_TRIMFS_LVB_VERSION) { 2900 info->tf_valid = 1; 2901 info->tf_success = lvb->lvb_success; 2902 info->tf_nodenum = be32_to_cpu(lvb->lvb_nodenum); 2903 info->tf_start = be64_to_cpu(lvb->lvb_start); 2904 info->tf_len = be64_to_cpu(lvb->lvb_len); 2905 info->tf_minlen = be64_to_cpu(lvb->lvb_minlen); 2906 info->tf_trimlen = be64_to_cpu(lvb->lvb_trimlen); 2907 } 2908 } 2909 2910 return status; 2911 } 2912 2913 void ocfs2_trim_fs_unlock(struct ocfs2_super *osb, 2914 struct ocfs2_trim_fs_info *info) 2915 { 2916 struct ocfs2_trim_fs_lvb *lvb; 2917 struct ocfs2_lock_res *lockres = &osb->osb_trim_fs_lockres; 2918 2919 if (ocfs2_mount_local(osb)) 2920 return; 2921 2922 if (info) { 2923 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2924 lvb->lvb_version = OCFS2_TRIMFS_LVB_VERSION; 2925 lvb->lvb_success = info->tf_success; 2926 lvb->lvb_nodenum = cpu_to_be32(info->tf_nodenum); 2927 lvb->lvb_start = cpu_to_be64(info->tf_start); 2928 lvb->lvb_len = cpu_to_be64(info->tf_len); 2929 lvb->lvb_minlen = cpu_to_be64(info->tf_minlen); 2930 lvb->lvb_trimlen = cpu_to_be64(info->tf_trimlen); 2931 } 2932 2933 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX); 2934 } 2935 2936 int ocfs2_dentry_lock(struct dentry *dentry, int ex) 2937 { 2938 int ret; 2939 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2940 struct ocfs2_dentry_lock *dl = dentry->d_fsdata; 2941 struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb); 2942 2943 BUG_ON(!dl); 2944 2945 if (ocfs2_is_hard_readonly(osb)) { 2946 if (ex) 2947 return -EROFS; 2948 return 0; 2949 } 2950 2951 if (ocfs2_mount_local(osb)) 2952 return 0; 2953 2954 ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0); 2955 if (ret < 0) 2956 mlog_errno(ret); 2957 2958 return ret; 2959 } 2960 2961 void ocfs2_dentry_unlock(struct dentry *dentry, int ex) 2962 { 2963 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2964 struct ocfs2_dentry_lock *dl = dentry->d_fsdata; 2965 struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb); 2966 2967 if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb)) 2968 ocfs2_cluster_unlock(osb, &dl->dl_lockres, level); 2969 } 2970 2971 /* Reference counting of the dlm debug structure. We want this because 2972 * open references on the debug inodes can live on after a mount, so 2973 * we can't rely on the ocfs2_super to always exist. */ 2974 static void ocfs2_dlm_debug_free(struct kref *kref) 2975 { 2976 struct ocfs2_dlm_debug *dlm_debug; 2977 2978 dlm_debug = container_of(kref, struct ocfs2_dlm_debug, d_refcnt); 2979 2980 kfree(dlm_debug); 2981 } 2982 2983 void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug) 2984 { 2985 if (dlm_debug) 2986 kref_put(&dlm_debug->d_refcnt, ocfs2_dlm_debug_free); 2987 } 2988 2989 static void ocfs2_get_dlm_debug(struct ocfs2_dlm_debug *debug) 2990 { 2991 kref_get(&debug->d_refcnt); 2992 } 2993 2994 struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void) 2995 { 2996 struct ocfs2_dlm_debug *dlm_debug; 2997 2998 dlm_debug = kmalloc(sizeof(struct ocfs2_dlm_debug), GFP_KERNEL); 2999 if (!dlm_debug) { 3000 mlog_errno(-ENOMEM); 3001 goto out; 3002 } 3003 3004 kref_init(&dlm_debug->d_refcnt); 3005 INIT_LIST_HEAD(&dlm_debug->d_lockres_tracking); 3006 dlm_debug->d_locking_state = NULL; 3007 out: 3008 return dlm_debug; 3009 } 3010 3011 /* Access to this is arbitrated for us via seq_file->sem. */ 3012 struct ocfs2_dlm_seq_priv { 3013 struct ocfs2_dlm_debug *p_dlm_debug; 3014 struct ocfs2_lock_res p_iter_res; 3015 struct ocfs2_lock_res p_tmp_res; 3016 }; 3017 3018 static struct ocfs2_lock_res *ocfs2_dlm_next_res(struct ocfs2_lock_res *start, 3019 struct ocfs2_dlm_seq_priv *priv) 3020 { 3021 struct ocfs2_lock_res *iter, *ret = NULL; 3022 struct ocfs2_dlm_debug *dlm_debug = priv->p_dlm_debug; 3023 3024 assert_spin_locked(&ocfs2_dlm_tracking_lock); 3025 3026 list_for_each_entry(iter, &start->l_debug_list, l_debug_list) { 3027 /* discover the head of the list */ 3028 if (&iter->l_debug_list == &dlm_debug->d_lockres_tracking) { 3029 mlog(0, "End of list found, %p\n", ret); 3030 break; 3031 } 3032 3033 /* We track our "dummy" iteration lockres' by a NULL 3034 * l_ops field. */ 3035 if (iter->l_ops != NULL) { 3036 ret = iter; 3037 break; 3038 } 3039 } 3040 3041 return ret; 3042 } 3043 3044 static void *ocfs2_dlm_seq_start(struct seq_file *m, loff_t *pos) 3045 { 3046 struct ocfs2_dlm_seq_priv *priv = m->private; 3047 struct ocfs2_lock_res *iter; 3048 3049 spin_lock(&ocfs2_dlm_tracking_lock); 3050 iter = ocfs2_dlm_next_res(&priv->p_iter_res, priv); 3051 if (iter) { 3052 /* Since lockres' have the lifetime of their container 3053 * (which can be inodes, ocfs2_supers, etc) we want to 3054 * copy this out to a temporary lockres while still 3055 * under the spinlock. Obviously after this we can't 3056 * trust any pointers on the copy returned, but that's 3057 * ok as the information we want isn't typically held 3058 * in them. */ 3059 priv->p_tmp_res = *iter; 3060 iter = &priv->p_tmp_res; 3061 } 3062 spin_unlock(&ocfs2_dlm_tracking_lock); 3063 3064 return iter; 3065 } 3066 3067 static void ocfs2_dlm_seq_stop(struct seq_file *m, void *v) 3068 { 3069 } 3070 3071 static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos) 3072 { 3073 struct ocfs2_dlm_seq_priv *priv = m->private; 3074 struct ocfs2_lock_res *iter = v; 3075 struct ocfs2_lock_res *dummy = &priv->p_iter_res; 3076 3077 spin_lock(&ocfs2_dlm_tracking_lock); 3078 iter = ocfs2_dlm_next_res(iter, priv); 3079 list_del_init(&dummy->l_debug_list); 3080 if (iter) { 3081 list_add(&dummy->l_debug_list, &iter->l_debug_list); 3082 priv->p_tmp_res = *iter; 3083 iter = &priv->p_tmp_res; 3084 } 3085 spin_unlock(&ocfs2_dlm_tracking_lock); 3086 3087 return iter; 3088 } 3089 3090 /* 3091 * Version is used by debugfs.ocfs2 to determine the format being used 3092 * 3093 * New in version 2 3094 * - Lock stats printed 3095 * New in version 3 3096 * - Max time in lock stats is in usecs (instead of nsecs) 3097 */ 3098 #define OCFS2_DLM_DEBUG_STR_VERSION 3 3099 static int ocfs2_dlm_seq_show(struct seq_file *m, void *v) 3100 { 3101 int i; 3102 char *lvb; 3103 struct ocfs2_lock_res *lockres = v; 3104 3105 if (!lockres) 3106 return -EINVAL; 3107 3108 seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION); 3109 3110 if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY) 3111 seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1, 3112 lockres->l_name, 3113 (unsigned int)ocfs2_get_dentry_lock_ino(lockres)); 3114 else 3115 seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name); 3116 3117 seq_printf(m, "%d\t" 3118 "0x%lx\t" 3119 "0x%x\t" 3120 "0x%x\t" 3121 "%u\t" 3122 "%u\t" 3123 "%d\t" 3124 "%d\t", 3125 lockres->l_level, 3126 lockres->l_flags, 3127 lockres->l_action, 3128 lockres->l_unlock_action, 3129 lockres->l_ro_holders, 3130 lockres->l_ex_holders, 3131 lockres->l_requested, 3132 lockres->l_blocking); 3133 3134 /* Dump the raw LVB */ 3135 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 3136 for(i = 0; i < DLM_LVB_LEN; i++) 3137 seq_printf(m, "0x%x\t", lvb[i]); 3138 3139 #ifdef CONFIG_OCFS2_FS_STATS 3140 # define lock_num_prmode(_l) ((_l)->l_lock_prmode.ls_gets) 3141 # define lock_num_exmode(_l) ((_l)->l_lock_exmode.ls_gets) 3142 # define lock_num_prmode_failed(_l) ((_l)->l_lock_prmode.ls_fail) 3143 # define lock_num_exmode_failed(_l) ((_l)->l_lock_exmode.ls_fail) 3144 # define lock_total_prmode(_l) ((_l)->l_lock_prmode.ls_total) 3145 # define lock_total_exmode(_l) ((_l)->l_lock_exmode.ls_total) 3146 # define lock_max_prmode(_l) ((_l)->l_lock_prmode.ls_max) 3147 # define lock_max_exmode(_l) ((_l)->l_lock_exmode.ls_max) 3148 # define lock_refresh(_l) ((_l)->l_lock_refresh) 3149 #else 3150 # define lock_num_prmode(_l) (0) 3151 # define lock_num_exmode(_l) (0) 3152 # define lock_num_prmode_failed(_l) (0) 3153 # define lock_num_exmode_failed(_l) (0) 3154 # define lock_total_prmode(_l) (0ULL) 3155 # define lock_total_exmode(_l) (0ULL) 3156 # define lock_max_prmode(_l) (0) 3157 # define lock_max_exmode(_l) (0) 3158 # define lock_refresh(_l) (0) 3159 #endif 3160 /* The following seq_print was added in version 2 of this output */ 3161 seq_printf(m, "%u\t" 3162 "%u\t" 3163 "%u\t" 3164 "%u\t" 3165 "%llu\t" 3166 "%llu\t" 3167 "%u\t" 3168 "%u\t" 3169 "%u\t", 3170 lock_num_prmode(lockres), 3171 lock_num_exmode(lockres), 3172 lock_num_prmode_failed(lockres), 3173 lock_num_exmode_failed(lockres), 3174 lock_total_prmode(lockres), 3175 lock_total_exmode(lockres), 3176 lock_max_prmode(lockres), 3177 lock_max_exmode(lockres), 3178 lock_refresh(lockres)); 3179 3180 /* End the line */ 3181 seq_printf(m, "\n"); 3182 return 0; 3183 } 3184 3185 static const struct seq_operations ocfs2_dlm_seq_ops = { 3186 .start = ocfs2_dlm_seq_start, 3187 .stop = ocfs2_dlm_seq_stop, 3188 .next = ocfs2_dlm_seq_next, 3189 .show = ocfs2_dlm_seq_show, 3190 }; 3191 3192 static int ocfs2_dlm_debug_release(struct inode *inode, struct file *file) 3193 { 3194 struct seq_file *seq = file->private_data; 3195 struct ocfs2_dlm_seq_priv *priv = seq->private; 3196 struct ocfs2_lock_res *res = &priv->p_iter_res; 3197 3198 ocfs2_remove_lockres_tracking(res); 3199 ocfs2_put_dlm_debug(priv->p_dlm_debug); 3200 return seq_release_private(inode, file); 3201 } 3202 3203 static int ocfs2_dlm_debug_open(struct inode *inode, struct file *file) 3204 { 3205 struct ocfs2_dlm_seq_priv *priv; 3206 struct ocfs2_super *osb; 3207 3208 priv = __seq_open_private(file, &ocfs2_dlm_seq_ops, sizeof(*priv)); 3209 if (!priv) { 3210 mlog_errno(-ENOMEM); 3211 return -ENOMEM; 3212 } 3213 3214 osb = inode->i_private; 3215 ocfs2_get_dlm_debug(osb->osb_dlm_debug); 3216 priv->p_dlm_debug = osb->osb_dlm_debug; 3217 INIT_LIST_HEAD(&priv->p_iter_res.l_debug_list); 3218 3219 ocfs2_add_lockres_tracking(&priv->p_iter_res, 3220 priv->p_dlm_debug); 3221 3222 return 0; 3223 } 3224 3225 static const struct file_operations ocfs2_dlm_debug_fops = { 3226 .open = ocfs2_dlm_debug_open, 3227 .release = ocfs2_dlm_debug_release, 3228 .read = seq_read, 3229 .llseek = seq_lseek, 3230 }; 3231 3232 static int ocfs2_dlm_init_debug(struct ocfs2_super *osb) 3233 { 3234 int ret = 0; 3235 struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug; 3236 3237 dlm_debug->d_locking_state = debugfs_create_file("locking_state", 3238 S_IFREG|S_IRUSR, 3239 osb->osb_debug_root, 3240 osb, 3241 &ocfs2_dlm_debug_fops); 3242 if (!dlm_debug->d_locking_state) { 3243 ret = -EINVAL; 3244 mlog(ML_ERROR, 3245 "Unable to create locking state debugfs file.\n"); 3246 goto out; 3247 } 3248 3249 ocfs2_get_dlm_debug(dlm_debug); 3250 out: 3251 return ret; 3252 } 3253 3254 static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb) 3255 { 3256 struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug; 3257 3258 if (dlm_debug) { 3259 debugfs_remove(dlm_debug->d_locking_state); 3260 ocfs2_put_dlm_debug(dlm_debug); 3261 } 3262 } 3263 3264 int ocfs2_dlm_init(struct ocfs2_super *osb) 3265 { 3266 int status = 0; 3267 struct ocfs2_cluster_connection *conn = NULL; 3268 3269 if (ocfs2_mount_local(osb)) { 3270 osb->node_num = 0; 3271 goto local; 3272 } 3273 3274 status = ocfs2_dlm_init_debug(osb); 3275 if (status < 0) { 3276 mlog_errno(status); 3277 goto bail; 3278 } 3279 3280 /* launch downconvert thread */ 3281 osb->dc_task = kthread_run(ocfs2_downconvert_thread, osb, "ocfs2dc-%s", 3282 osb->uuid_str); 3283 if (IS_ERR(osb->dc_task)) { 3284 status = PTR_ERR(osb->dc_task); 3285 osb->dc_task = NULL; 3286 mlog_errno(status); 3287 goto bail; 3288 } 3289 3290 /* for now, uuid == domain */ 3291 status = ocfs2_cluster_connect(osb->osb_cluster_stack, 3292 osb->osb_cluster_name, 3293 strlen(osb->osb_cluster_name), 3294 osb->uuid_str, 3295 strlen(osb->uuid_str), 3296 &lproto, ocfs2_do_node_down, osb, 3297 &conn); 3298 if (status) { 3299 mlog_errno(status); 3300 goto bail; 3301 } 3302 3303 status = ocfs2_cluster_this_node(conn, &osb->node_num); 3304 if (status < 0) { 3305 mlog_errno(status); 3306 mlog(ML_ERROR, 3307 "could not find this host's node number\n"); 3308 ocfs2_cluster_disconnect(conn, 0); 3309 goto bail; 3310 } 3311 3312 local: 3313 ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb); 3314 ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb); 3315 ocfs2_nfs_sync_lock_res_init(&osb->osb_nfs_sync_lockres, osb); 3316 ocfs2_orphan_scan_lock_res_init(&osb->osb_orphan_scan.os_lockres, osb); 3317 3318 osb->cconn = conn; 3319 bail: 3320 if (status < 0) { 3321 ocfs2_dlm_shutdown_debug(osb); 3322 if (osb->dc_task) 3323 kthread_stop(osb->dc_task); 3324 } 3325 3326 return status; 3327 } 3328 3329 void ocfs2_dlm_shutdown(struct ocfs2_super *osb, 3330 int hangup_pending) 3331 { 3332 ocfs2_drop_osb_locks(osb); 3333 3334 /* 3335 * Now that we have dropped all locks and ocfs2_dismount_volume() 3336 * has disabled recovery, the DLM won't be talking to us. It's 3337 * safe to tear things down before disconnecting the cluster. 3338 */ 3339 3340 if (osb->dc_task) { 3341 kthread_stop(osb->dc_task); 3342 osb->dc_task = NULL; 3343 } 3344 3345 ocfs2_lock_res_free(&osb->osb_super_lockres); 3346 ocfs2_lock_res_free(&osb->osb_rename_lockres); 3347 ocfs2_lock_res_free(&osb->osb_nfs_sync_lockres); 3348 ocfs2_lock_res_free(&osb->osb_orphan_scan.os_lockres); 3349 3350 ocfs2_cluster_disconnect(osb->cconn, hangup_pending); 3351 osb->cconn = NULL; 3352 3353 ocfs2_dlm_shutdown_debug(osb); 3354 } 3355 3356 static int ocfs2_drop_lock(struct ocfs2_super *osb, 3357 struct ocfs2_lock_res *lockres) 3358 { 3359 int ret; 3360 unsigned long flags; 3361 u32 lkm_flags = 0; 3362 3363 /* We didn't get anywhere near actually using this lockres. */ 3364 if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED)) 3365 goto out; 3366 3367 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) 3368 lkm_flags |= DLM_LKF_VALBLK; 3369 3370 spin_lock_irqsave(&lockres->l_lock, flags); 3371 3372 mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_FREEING), 3373 "lockres %s, flags 0x%lx\n", 3374 lockres->l_name, lockres->l_flags); 3375 3376 while (lockres->l_flags & OCFS2_LOCK_BUSY) { 3377 mlog(0, "waiting on busy lock \"%s\": flags = %lx, action = " 3378 "%u, unlock_action = %u\n", 3379 lockres->l_name, lockres->l_flags, lockres->l_action, 3380 lockres->l_unlock_action); 3381 3382 spin_unlock_irqrestore(&lockres->l_lock, flags); 3383 3384 /* XXX: Today we just wait on any busy 3385 * locks... Perhaps we need to cancel converts in the 3386 * future? */ 3387 ocfs2_wait_on_busy_lock(lockres); 3388 3389 spin_lock_irqsave(&lockres->l_lock, flags); 3390 } 3391 3392 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) { 3393 if (lockres->l_flags & OCFS2_LOCK_ATTACHED && 3394 lockres->l_level == DLM_LOCK_EX && 3395 !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) 3396 lockres->l_ops->set_lvb(lockres); 3397 } 3398 3399 if (lockres->l_flags & OCFS2_LOCK_BUSY) 3400 mlog(ML_ERROR, "destroying busy lock: \"%s\"\n", 3401 lockres->l_name); 3402 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) 3403 mlog(0, "destroying blocked lock: \"%s\"\n", lockres->l_name); 3404 3405 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 3406 spin_unlock_irqrestore(&lockres->l_lock, flags); 3407 goto out; 3408 } 3409 3410 lockres_clear_flags(lockres, OCFS2_LOCK_ATTACHED); 3411 3412 /* make sure we never get here while waiting for an ast to 3413 * fire. */ 3414 BUG_ON(lockres->l_action != OCFS2_AST_INVALID); 3415 3416 /* is this necessary? */ 3417 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 3418 lockres->l_unlock_action = OCFS2_UNLOCK_DROP_LOCK; 3419 spin_unlock_irqrestore(&lockres->l_lock, flags); 3420 3421 mlog(0, "lock %s\n", lockres->l_name); 3422 3423 ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, lkm_flags); 3424 if (ret) { 3425 ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres); 3426 mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags); 3427 ocfs2_dlm_dump_lksb(&lockres->l_lksb); 3428 BUG(); 3429 } 3430 mlog(0, "lock %s, successful return from ocfs2_dlm_unlock\n", 3431 lockres->l_name); 3432 3433 ocfs2_wait_on_busy_lock(lockres); 3434 out: 3435 return 0; 3436 } 3437 3438 static void ocfs2_process_blocked_lock(struct ocfs2_super *osb, 3439 struct ocfs2_lock_res *lockres); 3440 3441 /* Mark the lockres as being dropped. It will no longer be 3442 * queued if blocking, but we still may have to wait on it 3443 * being dequeued from the downconvert thread before we can consider 3444 * it safe to drop. 3445 * 3446 * You can *not* attempt to call cluster_lock on this lockres anymore. */ 3447 void ocfs2_mark_lockres_freeing(struct ocfs2_super *osb, 3448 struct ocfs2_lock_res *lockres) 3449 { 3450 int status; 3451 struct ocfs2_mask_waiter mw; 3452 unsigned long flags, flags2; 3453 3454 ocfs2_init_mask_waiter(&mw); 3455 3456 spin_lock_irqsave(&lockres->l_lock, flags); 3457 lockres->l_flags |= OCFS2_LOCK_FREEING; 3458 if (lockres->l_flags & OCFS2_LOCK_QUEUED && current == osb->dc_task) { 3459 /* 3460 * We know the downconvert is queued but not in progress 3461 * because we are the downconvert thread and processing 3462 * different lock. So we can just remove the lock from the 3463 * queue. This is not only an optimization but also a way 3464 * to avoid the following deadlock: 3465 * ocfs2_dentry_post_unlock() 3466 * ocfs2_dentry_lock_put() 3467 * ocfs2_drop_dentry_lock() 3468 * iput() 3469 * ocfs2_evict_inode() 3470 * ocfs2_clear_inode() 3471 * ocfs2_mark_lockres_freeing() 3472 * ... blocks waiting for OCFS2_LOCK_QUEUED 3473 * since we are the downconvert thread which 3474 * should clear the flag. 3475 */ 3476 spin_unlock_irqrestore(&lockres->l_lock, flags); 3477 spin_lock_irqsave(&osb->dc_task_lock, flags2); 3478 list_del_init(&lockres->l_blocked_list); 3479 osb->blocked_lock_count--; 3480 spin_unlock_irqrestore(&osb->dc_task_lock, flags2); 3481 /* 3482 * Warn if we recurse into another post_unlock call. Strictly 3483 * speaking it isn't a problem but we need to be careful if 3484 * that happens (stack overflow, deadlocks, ...) so warn if 3485 * ocfs2 grows a path for which this can happen. 3486 */ 3487 WARN_ON_ONCE(lockres->l_ops->post_unlock); 3488 /* Since the lock is freeing we don't do much in the fn below */ 3489 ocfs2_process_blocked_lock(osb, lockres); 3490 return; 3491 } 3492 while (lockres->l_flags & OCFS2_LOCK_QUEUED) { 3493 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_QUEUED, 0); 3494 spin_unlock_irqrestore(&lockres->l_lock, flags); 3495 3496 mlog(0, "Waiting on lockres %s\n", lockres->l_name); 3497 3498 status = ocfs2_wait_for_mask(&mw); 3499 if (status) 3500 mlog_errno(status); 3501 3502 spin_lock_irqsave(&lockres->l_lock, flags); 3503 } 3504 spin_unlock_irqrestore(&lockres->l_lock, flags); 3505 } 3506 3507 void ocfs2_simple_drop_lockres(struct ocfs2_super *osb, 3508 struct ocfs2_lock_res *lockres) 3509 { 3510 int ret; 3511 3512 ocfs2_mark_lockres_freeing(osb, lockres); 3513 ret = ocfs2_drop_lock(osb, lockres); 3514 if (ret) 3515 mlog_errno(ret); 3516 } 3517 3518 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb) 3519 { 3520 ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres); 3521 ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres); 3522 ocfs2_simple_drop_lockres(osb, &osb->osb_nfs_sync_lockres); 3523 ocfs2_simple_drop_lockres(osb, &osb->osb_orphan_scan.os_lockres); 3524 } 3525 3526 int ocfs2_drop_inode_locks(struct inode *inode) 3527 { 3528 int status, err; 3529 3530 /* No need to call ocfs2_mark_lockres_freeing here - 3531 * ocfs2_clear_inode has done it for us. */ 3532 3533 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 3534 &OCFS2_I(inode)->ip_open_lockres); 3535 if (err < 0) 3536 mlog_errno(err); 3537 3538 status = err; 3539 3540 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 3541 &OCFS2_I(inode)->ip_inode_lockres); 3542 if (err < 0) 3543 mlog_errno(err); 3544 if (err < 0 && !status) 3545 status = err; 3546 3547 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 3548 &OCFS2_I(inode)->ip_rw_lockres); 3549 if (err < 0) 3550 mlog_errno(err); 3551 if (err < 0 && !status) 3552 status = err; 3553 3554 return status; 3555 } 3556 3557 static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres, 3558 int new_level) 3559 { 3560 assert_spin_locked(&lockres->l_lock); 3561 3562 BUG_ON(lockres->l_blocking <= DLM_LOCK_NL); 3563 3564 if (lockres->l_level <= new_level) { 3565 mlog(ML_ERROR, "lockres %s, lvl %d <= %d, blcklst %d, mask %d, " 3566 "type %d, flags 0x%lx, hold %d %d, act %d %d, req %d, " 3567 "block %d, pgen %d\n", lockres->l_name, lockres->l_level, 3568 new_level, list_empty(&lockres->l_blocked_list), 3569 list_empty(&lockres->l_mask_waiters), lockres->l_type, 3570 lockres->l_flags, lockres->l_ro_holders, 3571 lockres->l_ex_holders, lockres->l_action, 3572 lockres->l_unlock_action, lockres->l_requested, 3573 lockres->l_blocking, lockres->l_pending_gen); 3574 BUG(); 3575 } 3576 3577 mlog(ML_BASTS, "lockres %s, level %d => %d, blocking %d\n", 3578 lockres->l_name, lockres->l_level, new_level, lockres->l_blocking); 3579 3580 lockres->l_action = OCFS2_AST_DOWNCONVERT; 3581 lockres->l_requested = new_level; 3582 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 3583 return lockres_set_pending(lockres); 3584 } 3585 3586 static int ocfs2_downconvert_lock(struct ocfs2_super *osb, 3587 struct ocfs2_lock_res *lockres, 3588 int new_level, 3589 int lvb, 3590 unsigned int generation) 3591 { 3592 int ret; 3593 u32 dlm_flags = DLM_LKF_CONVERT; 3594 3595 mlog(ML_BASTS, "lockres %s, level %d => %d\n", lockres->l_name, 3596 lockres->l_level, new_level); 3597 3598 /* 3599 * On DLM_LKF_VALBLK, fsdlm behaves differently with o2cb. It always 3600 * expects DLM_LKF_VALBLK being set if the LKB has LVB, so that 3601 * we can recover correctly from node failure. Otherwise, we may get 3602 * invalid LVB in LKB, but without DLM_SBF_VALNOTVALID being set. 3603 */ 3604 if (!ocfs2_is_o2cb_active() && 3605 lockres->l_ops->flags & LOCK_TYPE_USES_LVB) 3606 lvb = 1; 3607 3608 if (lvb) 3609 dlm_flags |= DLM_LKF_VALBLK; 3610 3611 ret = ocfs2_dlm_lock(osb->cconn, 3612 new_level, 3613 &lockres->l_lksb, 3614 dlm_flags, 3615 lockres->l_name, 3616 OCFS2_LOCK_ID_MAX_LEN - 1); 3617 lockres_clear_pending(lockres, generation, osb); 3618 if (ret) { 3619 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 3620 ocfs2_recover_from_dlm_error(lockres, 1); 3621 goto bail; 3622 } 3623 3624 ret = 0; 3625 bail: 3626 return ret; 3627 } 3628 3629 /* returns 1 when the caller should unlock and call ocfs2_dlm_unlock */ 3630 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb, 3631 struct ocfs2_lock_res *lockres) 3632 { 3633 assert_spin_locked(&lockres->l_lock); 3634 3635 if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) { 3636 /* If we're already trying to cancel a lock conversion 3637 * then just drop the spinlock and allow the caller to 3638 * requeue this lock. */ 3639 mlog(ML_BASTS, "lockres %s, skip convert\n", lockres->l_name); 3640 return 0; 3641 } 3642 3643 /* were we in a convert when we got the bast fire? */ 3644 BUG_ON(lockres->l_action != OCFS2_AST_CONVERT && 3645 lockres->l_action != OCFS2_AST_DOWNCONVERT); 3646 /* set things up for the unlockast to know to just 3647 * clear out the ast_action and unset busy, etc. */ 3648 lockres->l_unlock_action = OCFS2_UNLOCK_CANCEL_CONVERT; 3649 3650 mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_BUSY), 3651 "lock %s, invalid flags: 0x%lx\n", 3652 lockres->l_name, lockres->l_flags); 3653 3654 mlog(ML_BASTS, "lockres %s\n", lockres->l_name); 3655 3656 return 1; 3657 } 3658 3659 static int ocfs2_cancel_convert(struct ocfs2_super *osb, 3660 struct ocfs2_lock_res *lockres) 3661 { 3662 int ret; 3663 3664 ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, 3665 DLM_LKF_CANCEL); 3666 if (ret) { 3667 ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres); 3668 ocfs2_recover_from_dlm_error(lockres, 0); 3669 } 3670 3671 mlog(ML_BASTS, "lockres %s\n", lockres->l_name); 3672 3673 return ret; 3674 } 3675 3676 static int ocfs2_unblock_lock(struct ocfs2_super *osb, 3677 struct ocfs2_lock_res *lockres, 3678 struct ocfs2_unblock_ctl *ctl) 3679 { 3680 unsigned long flags; 3681 int blocking; 3682 int new_level; 3683 int level; 3684 int ret = 0; 3685 int set_lvb = 0; 3686 unsigned int gen; 3687 3688 spin_lock_irqsave(&lockres->l_lock, flags); 3689 3690 recheck: 3691 /* 3692 * Is it still blocking? If not, we have no more work to do. 3693 */ 3694 if (!(lockres->l_flags & OCFS2_LOCK_BLOCKED)) { 3695 BUG_ON(lockres->l_blocking != DLM_LOCK_NL); 3696 spin_unlock_irqrestore(&lockres->l_lock, flags); 3697 ret = 0; 3698 goto leave; 3699 } 3700 3701 if (lockres->l_flags & OCFS2_LOCK_BUSY) { 3702 /* XXX 3703 * This is a *big* race. The OCFS2_LOCK_PENDING flag 3704 * exists entirely for one reason - another thread has set 3705 * OCFS2_LOCK_BUSY, but has *NOT* yet called dlm_lock(). 3706 * 3707 * If we do ocfs2_cancel_convert() before the other thread 3708 * calls dlm_lock(), our cancel will do nothing. We will 3709 * get no ast, and we will have no way of knowing the 3710 * cancel failed. Meanwhile, the other thread will call 3711 * into dlm_lock() and wait...forever. 3712 * 3713 * Why forever? Because another node has asked for the 3714 * lock first; that's why we're here in unblock_lock(). 3715 * 3716 * The solution is OCFS2_LOCK_PENDING. When PENDING is 3717 * set, we just requeue the unblock. Only when the other 3718 * thread has called dlm_lock() and cleared PENDING will 3719 * we then cancel their request. 3720 * 3721 * All callers of dlm_lock() must set OCFS2_DLM_PENDING 3722 * at the same time they set OCFS2_DLM_BUSY. They must 3723 * clear OCFS2_DLM_PENDING after dlm_lock() returns. 3724 */ 3725 if (lockres->l_flags & OCFS2_LOCK_PENDING) { 3726 mlog(ML_BASTS, "lockres %s, ReQ: Pending\n", 3727 lockres->l_name); 3728 goto leave_requeue; 3729 } 3730 3731 ctl->requeue = 1; 3732 ret = ocfs2_prepare_cancel_convert(osb, lockres); 3733 spin_unlock_irqrestore(&lockres->l_lock, flags); 3734 if (ret) { 3735 ret = ocfs2_cancel_convert(osb, lockres); 3736 if (ret < 0) 3737 mlog_errno(ret); 3738 } 3739 goto leave; 3740 } 3741 3742 /* 3743 * This prevents livelocks. OCFS2_LOCK_UPCONVERT_FINISHING flag is 3744 * set when the ast is received for an upconvert just before the 3745 * OCFS2_LOCK_BUSY flag is cleared. Now if the fs received a bast 3746 * on the heels of the ast, we want to delay the downconvert just 3747 * enough to allow the up requestor to do its task. Because this 3748 * lock is in the blocked queue, the lock will be downconverted 3749 * as soon as the requestor is done with the lock. 3750 */ 3751 if (lockres->l_flags & OCFS2_LOCK_UPCONVERT_FINISHING) 3752 goto leave_requeue; 3753 3754 /* 3755 * How can we block and yet be at NL? We were trying to upconvert 3756 * from NL and got canceled. The code comes back here, and now 3757 * we notice and clear BLOCKING. 3758 */ 3759 if (lockres->l_level == DLM_LOCK_NL) { 3760 BUG_ON(lockres->l_ex_holders || lockres->l_ro_holders); 3761 mlog(ML_BASTS, "lockres %s, Aborting dc\n", lockres->l_name); 3762 lockres->l_blocking = DLM_LOCK_NL; 3763 lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED); 3764 spin_unlock_irqrestore(&lockres->l_lock, flags); 3765 goto leave; 3766 } 3767 3768 /* if we're blocking an exclusive and we have *any* holders, 3769 * then requeue. */ 3770 if ((lockres->l_blocking == DLM_LOCK_EX) 3771 && (lockres->l_ex_holders || lockres->l_ro_holders)) { 3772 mlog(ML_BASTS, "lockres %s, ReQ: EX/PR Holders %u,%u\n", 3773 lockres->l_name, lockres->l_ex_holders, 3774 lockres->l_ro_holders); 3775 goto leave_requeue; 3776 } 3777 3778 /* If it's a PR we're blocking, then only 3779 * requeue if we've got any EX holders */ 3780 if (lockres->l_blocking == DLM_LOCK_PR && 3781 lockres->l_ex_holders) { 3782 mlog(ML_BASTS, "lockres %s, ReQ: EX Holders %u\n", 3783 lockres->l_name, lockres->l_ex_holders); 3784 goto leave_requeue; 3785 } 3786 3787 /* 3788 * Can we get a lock in this state if the holder counts are 3789 * zero? The meta data unblock code used to check this. 3790 */ 3791 if ((lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 3792 && (lockres->l_flags & OCFS2_LOCK_REFRESHING)) { 3793 mlog(ML_BASTS, "lockres %s, ReQ: Lock Refreshing\n", 3794 lockres->l_name); 3795 goto leave_requeue; 3796 } 3797 3798 new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking); 3799 3800 if (lockres->l_ops->check_downconvert 3801 && !lockres->l_ops->check_downconvert(lockres, new_level)) { 3802 mlog(ML_BASTS, "lockres %s, ReQ: Checkpointing\n", 3803 lockres->l_name); 3804 goto leave_requeue; 3805 } 3806 3807 /* If we get here, then we know that there are no more 3808 * incompatible holders (and anyone asking for an incompatible 3809 * lock is blocked). We can now downconvert the lock */ 3810 if (!lockres->l_ops->downconvert_worker) 3811 goto downconvert; 3812 3813 /* Some lockres types want to do a bit of work before 3814 * downconverting a lock. Allow that here. The worker function 3815 * may sleep, so we save off a copy of what we're blocking as 3816 * it may change while we're not holding the spin lock. */ 3817 blocking = lockres->l_blocking; 3818 level = lockres->l_level; 3819 spin_unlock_irqrestore(&lockres->l_lock, flags); 3820 3821 ctl->unblock_action = lockres->l_ops->downconvert_worker(lockres, blocking); 3822 3823 if (ctl->unblock_action == UNBLOCK_STOP_POST) { 3824 mlog(ML_BASTS, "lockres %s, UNBLOCK_STOP_POST\n", 3825 lockres->l_name); 3826 goto leave; 3827 } 3828 3829 spin_lock_irqsave(&lockres->l_lock, flags); 3830 if ((blocking != lockres->l_blocking) || (level != lockres->l_level)) { 3831 /* If this changed underneath us, then we can't drop 3832 * it just yet. */ 3833 mlog(ML_BASTS, "lockres %s, block=%d:%d, level=%d:%d, " 3834 "Recheck\n", lockres->l_name, blocking, 3835 lockres->l_blocking, level, lockres->l_level); 3836 goto recheck; 3837 } 3838 3839 downconvert: 3840 ctl->requeue = 0; 3841 3842 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) { 3843 if (lockres->l_level == DLM_LOCK_EX) 3844 set_lvb = 1; 3845 3846 /* 3847 * We only set the lvb if the lock has been fully 3848 * refreshed - otherwise we risk setting stale 3849 * data. Otherwise, there's no need to actually clear 3850 * out the lvb here as it's value is still valid. 3851 */ 3852 if (set_lvb && !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) 3853 lockres->l_ops->set_lvb(lockres); 3854 } 3855 3856 gen = ocfs2_prepare_downconvert(lockres, new_level); 3857 spin_unlock_irqrestore(&lockres->l_lock, flags); 3858 ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb, 3859 gen); 3860 3861 leave: 3862 if (ret) 3863 mlog_errno(ret); 3864 return ret; 3865 3866 leave_requeue: 3867 spin_unlock_irqrestore(&lockres->l_lock, flags); 3868 ctl->requeue = 1; 3869 3870 return 0; 3871 } 3872 3873 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres, 3874 int blocking) 3875 { 3876 struct inode *inode; 3877 struct address_space *mapping; 3878 struct ocfs2_inode_info *oi; 3879 3880 inode = ocfs2_lock_res_inode(lockres); 3881 mapping = inode->i_mapping; 3882 3883 if (S_ISDIR(inode->i_mode)) { 3884 oi = OCFS2_I(inode); 3885 oi->ip_dir_lock_gen++; 3886 mlog(0, "generation: %u\n", oi->ip_dir_lock_gen); 3887 goto out; 3888 } 3889 3890 if (!S_ISREG(inode->i_mode)) 3891 goto out; 3892 3893 /* 3894 * We need this before the filemap_fdatawrite() so that it can 3895 * transfer the dirty bit from the PTE to the 3896 * page. Unfortunately this means that even for EX->PR 3897 * downconverts, we'll lose our mappings and have to build 3898 * them up again. 3899 */ 3900 unmap_mapping_range(mapping, 0, 0, 0); 3901 3902 if (filemap_fdatawrite(mapping)) { 3903 mlog(ML_ERROR, "Could not sync inode %llu for downconvert!", 3904 (unsigned long long)OCFS2_I(inode)->ip_blkno); 3905 } 3906 sync_mapping_buffers(mapping); 3907 if (blocking == DLM_LOCK_EX) { 3908 truncate_inode_pages(mapping, 0); 3909 } else { 3910 /* We only need to wait on the I/O if we're not also 3911 * truncating pages because truncate_inode_pages waits 3912 * for us above. We don't truncate pages if we're 3913 * blocking anything < EXMODE because we want to keep 3914 * them around in that case. */ 3915 filemap_fdatawait(mapping); 3916 } 3917 3918 forget_all_cached_acls(inode); 3919 3920 out: 3921 return UNBLOCK_CONTINUE; 3922 } 3923 3924 static int ocfs2_ci_checkpointed(struct ocfs2_caching_info *ci, 3925 struct ocfs2_lock_res *lockres, 3926 int new_level) 3927 { 3928 int checkpointed = ocfs2_ci_fully_checkpointed(ci); 3929 3930 BUG_ON(new_level != DLM_LOCK_NL && new_level != DLM_LOCK_PR); 3931 BUG_ON(lockres->l_level != DLM_LOCK_EX && !checkpointed); 3932 3933 if (checkpointed) 3934 return 1; 3935 3936 ocfs2_start_checkpoint(OCFS2_SB(ocfs2_metadata_cache_get_super(ci))); 3937 return 0; 3938 } 3939 3940 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres, 3941 int new_level) 3942 { 3943 struct inode *inode = ocfs2_lock_res_inode(lockres); 3944 3945 return ocfs2_ci_checkpointed(INODE_CACHE(inode), lockres, new_level); 3946 } 3947 3948 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres) 3949 { 3950 struct inode *inode = ocfs2_lock_res_inode(lockres); 3951 3952 __ocfs2_stuff_meta_lvb(inode); 3953 } 3954 3955 /* 3956 * Does the final reference drop on our dentry lock. Right now this 3957 * happens in the downconvert thread, but we could choose to simplify the 3958 * dlmglue API and push these off to the ocfs2_wq in the future. 3959 */ 3960 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb, 3961 struct ocfs2_lock_res *lockres) 3962 { 3963 struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres); 3964 ocfs2_dentry_lock_put(osb, dl); 3965 } 3966 3967 /* 3968 * d_delete() matching dentries before the lock downconvert. 3969 * 3970 * At this point, any process waiting to destroy the 3971 * dentry_lock due to last ref count is stopped by the 3972 * OCFS2_LOCK_QUEUED flag. 3973 * 3974 * We have two potential problems 3975 * 3976 * 1) If we do the last reference drop on our dentry_lock (via dput) 3977 * we'll wind up in ocfs2_release_dentry_lock(), waiting on 3978 * the downconvert to finish. Instead we take an elevated 3979 * reference and push the drop until after we've completed our 3980 * unblock processing. 3981 * 3982 * 2) There might be another process with a final reference, 3983 * waiting on us to finish processing. If this is the case, we 3984 * detect it and exit out - there's no more dentries anyway. 3985 */ 3986 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres, 3987 int blocking) 3988 { 3989 struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres); 3990 struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode); 3991 struct dentry *dentry; 3992 unsigned long flags; 3993 int extra_ref = 0; 3994 3995 /* 3996 * This node is blocking another node from getting a read 3997 * lock. This happens when we've renamed within a 3998 * directory. We've forced the other nodes to d_delete(), but 3999 * we never actually dropped our lock because it's still 4000 * valid. The downconvert code will retain a PR for this node, 4001 * so there's no further work to do. 4002 */ 4003 if (blocking == DLM_LOCK_PR) 4004 return UNBLOCK_CONTINUE; 4005 4006 /* 4007 * Mark this inode as potentially orphaned. The code in 4008 * ocfs2_delete_inode() will figure out whether it actually 4009 * needs to be freed or not. 4010 */ 4011 spin_lock(&oi->ip_lock); 4012 oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED; 4013 spin_unlock(&oi->ip_lock); 4014 4015 /* 4016 * Yuck. We need to make sure however that the check of 4017 * OCFS2_LOCK_FREEING and the extra reference are atomic with 4018 * respect to a reference decrement or the setting of that 4019 * flag. 4020 */ 4021 spin_lock_irqsave(&lockres->l_lock, flags); 4022 spin_lock(&dentry_attach_lock); 4023 if (!(lockres->l_flags & OCFS2_LOCK_FREEING) 4024 && dl->dl_count) { 4025 dl->dl_count++; 4026 extra_ref = 1; 4027 } 4028 spin_unlock(&dentry_attach_lock); 4029 spin_unlock_irqrestore(&lockres->l_lock, flags); 4030 4031 mlog(0, "extra_ref = %d\n", extra_ref); 4032 4033 /* 4034 * We have a process waiting on us in ocfs2_dentry_iput(), 4035 * which means we can't have any more outstanding 4036 * aliases. There's no need to do any more work. 4037 */ 4038 if (!extra_ref) 4039 return UNBLOCK_CONTINUE; 4040 4041 spin_lock(&dentry_attach_lock); 4042 while (1) { 4043 dentry = ocfs2_find_local_alias(dl->dl_inode, 4044 dl->dl_parent_blkno, 1); 4045 if (!dentry) 4046 break; 4047 spin_unlock(&dentry_attach_lock); 4048 4049 if (S_ISDIR(dl->dl_inode->i_mode)) 4050 shrink_dcache_parent(dentry); 4051 4052 mlog(0, "d_delete(%pd);\n", dentry); 4053 4054 /* 4055 * The following dcache calls may do an 4056 * iput(). Normally we don't want that from the 4057 * downconverting thread, but in this case it's ok 4058 * because the requesting node already has an 4059 * exclusive lock on the inode, so it can't be queued 4060 * for a downconvert. 4061 */ 4062 d_delete(dentry); 4063 dput(dentry); 4064 4065 spin_lock(&dentry_attach_lock); 4066 } 4067 spin_unlock(&dentry_attach_lock); 4068 4069 /* 4070 * If we are the last holder of this dentry lock, there is no 4071 * reason to downconvert so skip straight to the unlock. 4072 */ 4073 if (dl->dl_count == 1) 4074 return UNBLOCK_STOP_POST; 4075 4076 return UNBLOCK_CONTINUE_POST; 4077 } 4078 4079 static int ocfs2_check_refcount_downconvert(struct ocfs2_lock_res *lockres, 4080 int new_level) 4081 { 4082 struct ocfs2_refcount_tree *tree = 4083 ocfs2_lock_res_refcount_tree(lockres); 4084 4085 return ocfs2_ci_checkpointed(&tree->rf_ci, lockres, new_level); 4086 } 4087 4088 static int ocfs2_refcount_convert_worker(struct ocfs2_lock_res *lockres, 4089 int blocking) 4090 { 4091 struct ocfs2_refcount_tree *tree = 4092 ocfs2_lock_res_refcount_tree(lockres); 4093 4094 ocfs2_metadata_cache_purge(&tree->rf_ci); 4095 4096 return UNBLOCK_CONTINUE; 4097 } 4098 4099 static void ocfs2_set_qinfo_lvb(struct ocfs2_lock_res *lockres) 4100 { 4101 struct ocfs2_qinfo_lvb *lvb; 4102 struct ocfs2_mem_dqinfo *oinfo = ocfs2_lock_res_qinfo(lockres); 4103 struct mem_dqinfo *info = sb_dqinfo(oinfo->dqi_gi.dqi_sb, 4104 oinfo->dqi_gi.dqi_type); 4105 4106 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 4107 lvb->lvb_version = OCFS2_QINFO_LVB_VERSION; 4108 lvb->lvb_bgrace = cpu_to_be32(info->dqi_bgrace); 4109 lvb->lvb_igrace = cpu_to_be32(info->dqi_igrace); 4110 lvb->lvb_syncms = cpu_to_be32(oinfo->dqi_syncms); 4111 lvb->lvb_blocks = cpu_to_be32(oinfo->dqi_gi.dqi_blocks); 4112 lvb->lvb_free_blk = cpu_to_be32(oinfo->dqi_gi.dqi_free_blk); 4113 lvb->lvb_free_entry = cpu_to_be32(oinfo->dqi_gi.dqi_free_entry); 4114 } 4115 4116 void ocfs2_qinfo_unlock(struct ocfs2_mem_dqinfo *oinfo, int ex) 4117 { 4118 struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock; 4119 struct ocfs2_super *osb = OCFS2_SB(oinfo->dqi_gi.dqi_sb); 4120 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 4121 4122 if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb)) 4123 ocfs2_cluster_unlock(osb, lockres, level); 4124 } 4125 4126 static int ocfs2_refresh_qinfo(struct ocfs2_mem_dqinfo *oinfo) 4127 { 4128 struct mem_dqinfo *info = sb_dqinfo(oinfo->dqi_gi.dqi_sb, 4129 oinfo->dqi_gi.dqi_type); 4130 struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock; 4131 struct ocfs2_qinfo_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 4132 struct buffer_head *bh = NULL; 4133 struct ocfs2_global_disk_dqinfo *gdinfo; 4134 int status = 0; 4135 4136 if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) && 4137 lvb->lvb_version == OCFS2_QINFO_LVB_VERSION) { 4138 info->dqi_bgrace = be32_to_cpu(lvb->lvb_bgrace); 4139 info->dqi_igrace = be32_to_cpu(lvb->lvb_igrace); 4140 oinfo->dqi_syncms = be32_to_cpu(lvb->lvb_syncms); 4141 oinfo->dqi_gi.dqi_blocks = be32_to_cpu(lvb->lvb_blocks); 4142 oinfo->dqi_gi.dqi_free_blk = be32_to_cpu(lvb->lvb_free_blk); 4143 oinfo->dqi_gi.dqi_free_entry = 4144 be32_to_cpu(lvb->lvb_free_entry); 4145 } else { 4146 status = ocfs2_read_quota_phys_block(oinfo->dqi_gqinode, 4147 oinfo->dqi_giblk, &bh); 4148 if (status) { 4149 mlog_errno(status); 4150 goto bail; 4151 } 4152 gdinfo = (struct ocfs2_global_disk_dqinfo *) 4153 (bh->b_data + OCFS2_GLOBAL_INFO_OFF); 4154 info->dqi_bgrace = le32_to_cpu(gdinfo->dqi_bgrace); 4155 info->dqi_igrace = le32_to_cpu(gdinfo->dqi_igrace); 4156 oinfo->dqi_syncms = le32_to_cpu(gdinfo->dqi_syncms); 4157 oinfo->dqi_gi.dqi_blocks = le32_to_cpu(gdinfo->dqi_blocks); 4158 oinfo->dqi_gi.dqi_free_blk = le32_to_cpu(gdinfo->dqi_free_blk); 4159 oinfo->dqi_gi.dqi_free_entry = 4160 le32_to_cpu(gdinfo->dqi_free_entry); 4161 brelse(bh); 4162 ocfs2_track_lock_refresh(lockres); 4163 } 4164 4165 bail: 4166 return status; 4167 } 4168 4169 /* Lock quota info, this function expects at least shared lock on the quota file 4170 * so that we can safely refresh quota info from disk. */ 4171 int ocfs2_qinfo_lock(struct ocfs2_mem_dqinfo *oinfo, int ex) 4172 { 4173 struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock; 4174 struct ocfs2_super *osb = OCFS2_SB(oinfo->dqi_gi.dqi_sb); 4175 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 4176 int status = 0; 4177 4178 /* On RO devices, locking really isn't needed... */ 4179 if (ocfs2_is_hard_readonly(osb)) { 4180 if (ex) 4181 status = -EROFS; 4182 goto bail; 4183 } 4184 if (ocfs2_mount_local(osb)) 4185 goto bail; 4186 4187 status = ocfs2_cluster_lock(osb, lockres, level, 0, 0); 4188 if (status < 0) { 4189 mlog_errno(status); 4190 goto bail; 4191 } 4192 if (!ocfs2_should_refresh_lock_res(lockres)) 4193 goto bail; 4194 /* OK, we have the lock but we need to refresh the quota info */ 4195 status = ocfs2_refresh_qinfo(oinfo); 4196 if (status) 4197 ocfs2_qinfo_unlock(oinfo, ex); 4198 ocfs2_complete_lock_res_refresh(lockres, status); 4199 bail: 4200 return status; 4201 } 4202 4203 int ocfs2_refcount_lock(struct ocfs2_refcount_tree *ref_tree, int ex) 4204 { 4205 int status; 4206 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 4207 struct ocfs2_lock_res *lockres = &ref_tree->rf_lockres; 4208 struct ocfs2_super *osb = lockres->l_priv; 4209 4210 4211 if (ocfs2_is_hard_readonly(osb)) 4212 return -EROFS; 4213 4214 if (ocfs2_mount_local(osb)) 4215 return 0; 4216 4217 status = ocfs2_cluster_lock(osb, lockres, level, 0, 0); 4218 if (status < 0) 4219 mlog_errno(status); 4220 4221 return status; 4222 } 4223 4224 void ocfs2_refcount_unlock(struct ocfs2_refcount_tree *ref_tree, int ex) 4225 { 4226 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 4227 struct ocfs2_lock_res *lockres = &ref_tree->rf_lockres; 4228 struct ocfs2_super *osb = lockres->l_priv; 4229 4230 if (!ocfs2_mount_local(osb)) 4231 ocfs2_cluster_unlock(osb, lockres, level); 4232 } 4233 4234 static void ocfs2_process_blocked_lock(struct ocfs2_super *osb, 4235 struct ocfs2_lock_res *lockres) 4236 { 4237 int status; 4238 struct ocfs2_unblock_ctl ctl = {0, 0,}; 4239 unsigned long flags; 4240 4241 /* Our reference to the lockres in this function can be 4242 * considered valid until we remove the OCFS2_LOCK_QUEUED 4243 * flag. */ 4244 4245 BUG_ON(!lockres); 4246 BUG_ON(!lockres->l_ops); 4247 4248 mlog(ML_BASTS, "lockres %s blocked\n", lockres->l_name); 4249 4250 /* Detect whether a lock has been marked as going away while 4251 * the downconvert thread was processing other things. A lock can 4252 * still be marked with OCFS2_LOCK_FREEING after this check, 4253 * but short circuiting here will still save us some 4254 * performance. */ 4255 spin_lock_irqsave(&lockres->l_lock, flags); 4256 if (lockres->l_flags & OCFS2_LOCK_FREEING) 4257 goto unqueue; 4258 spin_unlock_irqrestore(&lockres->l_lock, flags); 4259 4260 status = ocfs2_unblock_lock(osb, lockres, &ctl); 4261 if (status < 0) 4262 mlog_errno(status); 4263 4264 spin_lock_irqsave(&lockres->l_lock, flags); 4265 unqueue: 4266 if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) { 4267 lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED); 4268 } else 4269 ocfs2_schedule_blocked_lock(osb, lockres); 4270 4271 mlog(ML_BASTS, "lockres %s, requeue = %s.\n", lockres->l_name, 4272 ctl.requeue ? "yes" : "no"); 4273 spin_unlock_irqrestore(&lockres->l_lock, flags); 4274 4275 if (ctl.unblock_action != UNBLOCK_CONTINUE 4276 && lockres->l_ops->post_unlock) 4277 lockres->l_ops->post_unlock(osb, lockres); 4278 } 4279 4280 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb, 4281 struct ocfs2_lock_res *lockres) 4282 { 4283 unsigned long flags; 4284 4285 assert_spin_locked(&lockres->l_lock); 4286 4287 if (lockres->l_flags & OCFS2_LOCK_FREEING) { 4288 /* Do not schedule a lock for downconvert when it's on 4289 * the way to destruction - any nodes wanting access 4290 * to the resource will get it soon. */ 4291 mlog(ML_BASTS, "lockres %s won't be scheduled: flags 0x%lx\n", 4292 lockres->l_name, lockres->l_flags); 4293 return; 4294 } 4295 4296 lockres_or_flags(lockres, OCFS2_LOCK_QUEUED); 4297 4298 spin_lock_irqsave(&osb->dc_task_lock, flags); 4299 if (list_empty(&lockres->l_blocked_list)) { 4300 list_add_tail(&lockres->l_blocked_list, 4301 &osb->blocked_lock_list); 4302 osb->blocked_lock_count++; 4303 } 4304 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4305 } 4306 4307 static void ocfs2_downconvert_thread_do_work(struct ocfs2_super *osb) 4308 { 4309 unsigned long processed; 4310 unsigned long flags; 4311 struct ocfs2_lock_res *lockres; 4312 4313 spin_lock_irqsave(&osb->dc_task_lock, flags); 4314 /* grab this early so we know to try again if a state change and 4315 * wake happens part-way through our work */ 4316 osb->dc_work_sequence = osb->dc_wake_sequence; 4317 4318 processed = osb->blocked_lock_count; 4319 /* 4320 * blocked lock processing in this loop might call iput which can 4321 * remove items off osb->blocked_lock_list. Downconvert up to 4322 * 'processed' number of locks, but stop short if we had some 4323 * removed in ocfs2_mark_lockres_freeing when downconverting. 4324 */ 4325 while (processed && !list_empty(&osb->blocked_lock_list)) { 4326 lockres = list_entry(osb->blocked_lock_list.next, 4327 struct ocfs2_lock_res, l_blocked_list); 4328 list_del_init(&lockres->l_blocked_list); 4329 osb->blocked_lock_count--; 4330 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4331 4332 BUG_ON(!processed); 4333 processed--; 4334 4335 ocfs2_process_blocked_lock(osb, lockres); 4336 4337 spin_lock_irqsave(&osb->dc_task_lock, flags); 4338 } 4339 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4340 } 4341 4342 static int ocfs2_downconvert_thread_lists_empty(struct ocfs2_super *osb) 4343 { 4344 int empty = 0; 4345 unsigned long flags; 4346 4347 spin_lock_irqsave(&osb->dc_task_lock, flags); 4348 if (list_empty(&osb->blocked_lock_list)) 4349 empty = 1; 4350 4351 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4352 return empty; 4353 } 4354 4355 static int ocfs2_downconvert_thread_should_wake(struct ocfs2_super *osb) 4356 { 4357 int should_wake = 0; 4358 unsigned long flags; 4359 4360 spin_lock_irqsave(&osb->dc_task_lock, flags); 4361 if (osb->dc_work_sequence != osb->dc_wake_sequence) 4362 should_wake = 1; 4363 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4364 4365 return should_wake; 4366 } 4367 4368 static int ocfs2_downconvert_thread(void *arg) 4369 { 4370 int status = 0; 4371 struct ocfs2_super *osb = arg; 4372 4373 /* only quit once we've been asked to stop and there is no more 4374 * work available */ 4375 while (!(kthread_should_stop() && 4376 ocfs2_downconvert_thread_lists_empty(osb))) { 4377 4378 wait_event_interruptible(osb->dc_event, 4379 ocfs2_downconvert_thread_should_wake(osb) || 4380 kthread_should_stop()); 4381 4382 mlog(0, "downconvert_thread: awoken\n"); 4383 4384 ocfs2_downconvert_thread_do_work(osb); 4385 } 4386 4387 osb->dc_task = NULL; 4388 return status; 4389 } 4390 4391 void ocfs2_wake_downconvert_thread(struct ocfs2_super *osb) 4392 { 4393 unsigned long flags; 4394 4395 spin_lock_irqsave(&osb->dc_task_lock, flags); 4396 /* make sure the voting thread gets a swipe at whatever changes 4397 * the caller may have made to the voting state */ 4398 osb->dc_wake_sequence++; 4399 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4400 wake_up(&osb->dc_event); 4401 } 4402