1 /* -*- mode: c; c-basic-offset: 8; -*- 2 * vim: noexpandtab sw=8 ts=8 sts=0: 3 * 4 * dlmglue.c 5 * 6 * Code which implements an OCFS2 specific interface to our DLM. 7 * 8 * Copyright (C) 2003, 2004 Oracle. All rights reserved. 9 * 10 * This program is free software; you can redistribute it and/or 11 * modify it under the terms of the GNU General Public 12 * License as published by the Free Software Foundation; either 13 * version 2 of the License, or (at your option) any later version. 14 * 15 * This program is distributed in the hope that it will be useful, 16 * but WITHOUT ANY WARRANTY; without even the implied warranty of 17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 18 * General Public License for more details. 19 * 20 * You should have received a copy of the GNU General Public 21 * License along with this program; if not, write to the 22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 23 * Boston, MA 021110-1307, USA. 24 */ 25 26 #include <linux/types.h> 27 #include <linux/slab.h> 28 #include <linux/highmem.h> 29 #include <linux/mm.h> 30 #include <linux/kthread.h> 31 #include <linux/pagemap.h> 32 #include <linux/debugfs.h> 33 #include <linux/seq_file.h> 34 #include <linux/time.h> 35 #include <linux/quotaops.h> 36 #include <linux/sched/signal.h> 37 38 #define MLOG_MASK_PREFIX ML_DLM_GLUE 39 #include <cluster/masklog.h> 40 41 #include "ocfs2.h" 42 #include "ocfs2_lockingver.h" 43 44 #include "alloc.h" 45 #include "dcache.h" 46 #include "dlmglue.h" 47 #include "extent_map.h" 48 #include "file.h" 49 #include "heartbeat.h" 50 #include "inode.h" 51 #include "journal.h" 52 #include "stackglue.h" 53 #include "slot_map.h" 54 #include "super.h" 55 #include "uptodate.h" 56 #include "quota.h" 57 #include "refcounttree.h" 58 #include "acl.h" 59 60 #include "buffer_head_io.h" 61 62 struct ocfs2_mask_waiter { 63 struct list_head mw_item; 64 int mw_status; 65 struct completion mw_complete; 66 unsigned long mw_mask; 67 unsigned long mw_goal; 68 #ifdef CONFIG_OCFS2_FS_STATS 69 ktime_t mw_lock_start; 70 #endif 71 }; 72 73 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres); 74 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres); 75 static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres); 76 static struct ocfs2_super *ocfs2_get_qinfo_osb(struct ocfs2_lock_res *lockres); 77 78 /* 79 * Return value from ->downconvert_worker functions. 80 * 81 * These control the precise actions of ocfs2_unblock_lock() 82 * and ocfs2_process_blocked_lock() 83 * 84 */ 85 enum ocfs2_unblock_action { 86 UNBLOCK_CONTINUE = 0, /* Continue downconvert */ 87 UNBLOCK_CONTINUE_POST = 1, /* Continue downconvert, fire 88 * ->post_unlock callback */ 89 UNBLOCK_STOP_POST = 2, /* Do not downconvert, fire 90 * ->post_unlock() callback. */ 91 }; 92 93 struct ocfs2_unblock_ctl { 94 int requeue; 95 enum ocfs2_unblock_action unblock_action; 96 }; 97 98 /* Lockdep class keys */ 99 struct lock_class_key lockdep_keys[OCFS2_NUM_LOCK_TYPES]; 100 101 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres, 102 int new_level); 103 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres); 104 105 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres, 106 int blocking); 107 108 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres, 109 int blocking); 110 111 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb, 112 struct ocfs2_lock_res *lockres); 113 114 static void ocfs2_set_qinfo_lvb(struct ocfs2_lock_res *lockres); 115 116 static int ocfs2_check_refcount_downconvert(struct ocfs2_lock_res *lockres, 117 int new_level); 118 static int ocfs2_refcount_convert_worker(struct ocfs2_lock_res *lockres, 119 int blocking); 120 121 #define mlog_meta_lvb(__level, __lockres) ocfs2_dump_meta_lvb_info(__level, __PRETTY_FUNCTION__, __LINE__, __lockres) 122 123 /* This aids in debugging situations where a bad LVB might be involved. */ 124 static void ocfs2_dump_meta_lvb_info(u64 level, 125 const char *function, 126 unsigned int line, 127 struct ocfs2_lock_res *lockres) 128 { 129 struct ocfs2_meta_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 130 131 mlog(level, "LVB information for %s (called from %s:%u):\n", 132 lockres->l_name, function, line); 133 mlog(level, "version: %u, clusters: %u, generation: 0x%x\n", 134 lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters), 135 be32_to_cpu(lvb->lvb_igeneration)); 136 mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n", 137 (unsigned long long)be64_to_cpu(lvb->lvb_isize), 138 be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid), 139 be16_to_cpu(lvb->lvb_imode)); 140 mlog(level, "nlink %u, atime_packed 0x%llx, ctime_packed 0x%llx, " 141 "mtime_packed 0x%llx iattr 0x%x\n", be16_to_cpu(lvb->lvb_inlink), 142 (long long)be64_to_cpu(lvb->lvb_iatime_packed), 143 (long long)be64_to_cpu(lvb->lvb_ictime_packed), 144 (long long)be64_to_cpu(lvb->lvb_imtime_packed), 145 be32_to_cpu(lvb->lvb_iattr)); 146 } 147 148 149 /* 150 * OCFS2 Lock Resource Operations 151 * 152 * These fine tune the behavior of the generic dlmglue locking infrastructure. 153 * 154 * The most basic of lock types can point ->l_priv to their respective 155 * struct ocfs2_super and allow the default actions to manage things. 156 * 157 * Right now, each lock type also needs to implement an init function, 158 * and trivial lock/unlock wrappers. ocfs2_simple_drop_lockres() 159 * should be called when the lock is no longer needed (i.e., object 160 * destruction time). 161 */ 162 struct ocfs2_lock_res_ops { 163 /* 164 * Translate an ocfs2_lock_res * into an ocfs2_super *. Define 165 * this callback if ->l_priv is not an ocfs2_super pointer 166 */ 167 struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *); 168 169 /* 170 * Optionally called in the downconvert thread after a 171 * successful downconvert. The lockres will not be referenced 172 * after this callback is called, so it is safe to free 173 * memory, etc. 174 * 175 * The exact semantics of when this is called are controlled 176 * by ->downconvert_worker() 177 */ 178 void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *); 179 180 /* 181 * Allow a lock type to add checks to determine whether it is 182 * safe to downconvert a lock. Return 0 to re-queue the 183 * downconvert at a later time, nonzero to continue. 184 * 185 * For most locks, the default checks that there are no 186 * incompatible holders are sufficient. 187 * 188 * Called with the lockres spinlock held. 189 */ 190 int (*check_downconvert)(struct ocfs2_lock_res *, int); 191 192 /* 193 * Allows a lock type to populate the lock value block. This 194 * is called on downconvert, and when we drop a lock. 195 * 196 * Locks that want to use this should set LOCK_TYPE_USES_LVB 197 * in the flags field. 198 * 199 * Called with the lockres spinlock held. 200 */ 201 void (*set_lvb)(struct ocfs2_lock_res *); 202 203 /* 204 * Called from the downconvert thread when it is determined 205 * that a lock will be downconverted. This is called without 206 * any locks held so the function can do work that might 207 * schedule (syncing out data, etc). 208 * 209 * This should return any one of the ocfs2_unblock_action 210 * values, depending on what it wants the thread to do. 211 */ 212 int (*downconvert_worker)(struct ocfs2_lock_res *, int); 213 214 /* 215 * LOCK_TYPE_* flags which describe the specific requirements 216 * of a lock type. Descriptions of each individual flag follow. 217 */ 218 int flags; 219 }; 220 221 /* 222 * Some locks want to "refresh" potentially stale data when a 223 * meaningful (PRMODE or EXMODE) lock level is first obtained. If this 224 * flag is set, the OCFS2_LOCK_NEEDS_REFRESH flag will be set on the 225 * individual lockres l_flags member from the ast function. It is 226 * expected that the locking wrapper will clear the 227 * OCFS2_LOCK_NEEDS_REFRESH flag when done. 228 */ 229 #define LOCK_TYPE_REQUIRES_REFRESH 0x1 230 231 /* 232 * Indicate that a lock type makes use of the lock value block. The 233 * ->set_lvb lock type callback must be defined. 234 */ 235 #define LOCK_TYPE_USES_LVB 0x2 236 237 static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = { 238 .get_osb = ocfs2_get_inode_osb, 239 .flags = 0, 240 }; 241 242 static struct ocfs2_lock_res_ops ocfs2_inode_inode_lops = { 243 .get_osb = ocfs2_get_inode_osb, 244 .check_downconvert = ocfs2_check_meta_downconvert, 245 .set_lvb = ocfs2_set_meta_lvb, 246 .downconvert_worker = ocfs2_data_convert_worker, 247 .flags = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB, 248 }; 249 250 static struct ocfs2_lock_res_ops ocfs2_super_lops = { 251 .flags = LOCK_TYPE_REQUIRES_REFRESH, 252 }; 253 254 static struct ocfs2_lock_res_ops ocfs2_rename_lops = { 255 .flags = 0, 256 }; 257 258 static struct ocfs2_lock_res_ops ocfs2_nfs_sync_lops = { 259 .flags = 0, 260 }; 261 262 static struct ocfs2_lock_res_ops ocfs2_trim_fs_lops = { 263 .flags = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB, 264 }; 265 266 static struct ocfs2_lock_res_ops ocfs2_orphan_scan_lops = { 267 .flags = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB, 268 }; 269 270 static struct ocfs2_lock_res_ops ocfs2_dentry_lops = { 271 .get_osb = ocfs2_get_dentry_osb, 272 .post_unlock = ocfs2_dentry_post_unlock, 273 .downconvert_worker = ocfs2_dentry_convert_worker, 274 .flags = 0, 275 }; 276 277 static struct ocfs2_lock_res_ops ocfs2_inode_open_lops = { 278 .get_osb = ocfs2_get_inode_osb, 279 .flags = 0, 280 }; 281 282 static struct ocfs2_lock_res_ops ocfs2_flock_lops = { 283 .get_osb = ocfs2_get_file_osb, 284 .flags = 0, 285 }; 286 287 static struct ocfs2_lock_res_ops ocfs2_qinfo_lops = { 288 .set_lvb = ocfs2_set_qinfo_lvb, 289 .get_osb = ocfs2_get_qinfo_osb, 290 .flags = LOCK_TYPE_REQUIRES_REFRESH | LOCK_TYPE_USES_LVB, 291 }; 292 293 static struct ocfs2_lock_res_ops ocfs2_refcount_block_lops = { 294 .check_downconvert = ocfs2_check_refcount_downconvert, 295 .downconvert_worker = ocfs2_refcount_convert_worker, 296 .flags = 0, 297 }; 298 299 static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres) 300 { 301 return lockres->l_type == OCFS2_LOCK_TYPE_META || 302 lockres->l_type == OCFS2_LOCK_TYPE_RW || 303 lockres->l_type == OCFS2_LOCK_TYPE_OPEN; 304 } 305 306 static inline struct ocfs2_lock_res *ocfs2_lksb_to_lock_res(struct ocfs2_dlm_lksb *lksb) 307 { 308 return container_of(lksb, struct ocfs2_lock_res, l_lksb); 309 } 310 311 static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres) 312 { 313 BUG_ON(!ocfs2_is_inode_lock(lockres)); 314 315 return (struct inode *) lockres->l_priv; 316 } 317 318 static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres) 319 { 320 BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY); 321 322 return (struct ocfs2_dentry_lock *)lockres->l_priv; 323 } 324 325 static inline struct ocfs2_mem_dqinfo *ocfs2_lock_res_qinfo(struct ocfs2_lock_res *lockres) 326 { 327 BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_QINFO); 328 329 return (struct ocfs2_mem_dqinfo *)lockres->l_priv; 330 } 331 332 static inline struct ocfs2_refcount_tree * 333 ocfs2_lock_res_refcount_tree(struct ocfs2_lock_res *res) 334 { 335 return container_of(res, struct ocfs2_refcount_tree, rf_lockres); 336 } 337 338 static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *lockres) 339 { 340 if (lockres->l_ops->get_osb) 341 return lockres->l_ops->get_osb(lockres); 342 343 return (struct ocfs2_super *)lockres->l_priv; 344 } 345 346 static int ocfs2_lock_create(struct ocfs2_super *osb, 347 struct ocfs2_lock_res *lockres, 348 int level, 349 u32 dlm_flags); 350 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres, 351 int wanted); 352 static void __ocfs2_cluster_unlock(struct ocfs2_super *osb, 353 struct ocfs2_lock_res *lockres, 354 int level, unsigned long caller_ip); 355 static inline void ocfs2_cluster_unlock(struct ocfs2_super *osb, 356 struct ocfs2_lock_res *lockres, 357 int level) 358 { 359 __ocfs2_cluster_unlock(osb, lockres, level, _RET_IP_); 360 } 361 362 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres); 363 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres); 364 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres); 365 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, int level); 366 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb, 367 struct ocfs2_lock_res *lockres); 368 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres, 369 int convert); 370 #define ocfs2_log_dlm_error(_func, _err, _lockres) do { \ 371 if ((_lockres)->l_type != OCFS2_LOCK_TYPE_DENTRY) \ 372 mlog(ML_ERROR, "DLM error %d while calling %s on resource %s\n", \ 373 _err, _func, _lockres->l_name); \ 374 else \ 375 mlog(ML_ERROR, "DLM error %d while calling %s on resource %.*s%08x\n", \ 376 _err, _func, OCFS2_DENTRY_LOCK_INO_START - 1, (_lockres)->l_name, \ 377 (unsigned int)ocfs2_get_dentry_lock_ino(_lockres)); \ 378 } while (0) 379 static int ocfs2_downconvert_thread(void *arg); 380 static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb, 381 struct ocfs2_lock_res *lockres); 382 static int ocfs2_inode_lock_update(struct inode *inode, 383 struct buffer_head **bh); 384 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb); 385 static inline int ocfs2_highest_compat_lock_level(int level); 386 static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres, 387 int new_level); 388 static int ocfs2_downconvert_lock(struct ocfs2_super *osb, 389 struct ocfs2_lock_res *lockres, 390 int new_level, 391 int lvb, 392 unsigned int generation); 393 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb, 394 struct ocfs2_lock_res *lockres); 395 static int ocfs2_cancel_convert(struct ocfs2_super *osb, 396 struct ocfs2_lock_res *lockres); 397 398 399 static void ocfs2_build_lock_name(enum ocfs2_lock_type type, 400 u64 blkno, 401 u32 generation, 402 char *name) 403 { 404 int len; 405 406 BUG_ON(type >= OCFS2_NUM_LOCK_TYPES); 407 408 len = snprintf(name, OCFS2_LOCK_ID_MAX_LEN, "%c%s%016llx%08x", 409 ocfs2_lock_type_char(type), OCFS2_LOCK_ID_PAD, 410 (long long)blkno, generation); 411 412 BUG_ON(len != (OCFS2_LOCK_ID_MAX_LEN - 1)); 413 414 mlog(0, "built lock resource with name: %s\n", name); 415 } 416 417 static DEFINE_SPINLOCK(ocfs2_dlm_tracking_lock); 418 419 static void ocfs2_add_lockres_tracking(struct ocfs2_lock_res *res, 420 struct ocfs2_dlm_debug *dlm_debug) 421 { 422 mlog(0, "Add tracking for lockres %s\n", res->l_name); 423 424 spin_lock(&ocfs2_dlm_tracking_lock); 425 list_add(&res->l_debug_list, &dlm_debug->d_lockres_tracking); 426 spin_unlock(&ocfs2_dlm_tracking_lock); 427 } 428 429 static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res) 430 { 431 spin_lock(&ocfs2_dlm_tracking_lock); 432 if (!list_empty(&res->l_debug_list)) 433 list_del_init(&res->l_debug_list); 434 spin_unlock(&ocfs2_dlm_tracking_lock); 435 } 436 437 #ifdef CONFIG_OCFS2_FS_STATS 438 static void ocfs2_init_lock_stats(struct ocfs2_lock_res *res) 439 { 440 res->l_lock_refresh = 0; 441 memset(&res->l_lock_prmode, 0, sizeof(struct ocfs2_lock_stats)); 442 memset(&res->l_lock_exmode, 0, sizeof(struct ocfs2_lock_stats)); 443 } 444 445 static void ocfs2_update_lock_stats(struct ocfs2_lock_res *res, int level, 446 struct ocfs2_mask_waiter *mw, int ret) 447 { 448 u32 usec; 449 ktime_t kt; 450 struct ocfs2_lock_stats *stats; 451 452 if (level == LKM_PRMODE) 453 stats = &res->l_lock_prmode; 454 else if (level == LKM_EXMODE) 455 stats = &res->l_lock_exmode; 456 else 457 return; 458 459 kt = ktime_sub(ktime_get(), mw->mw_lock_start); 460 usec = ktime_to_us(kt); 461 462 stats->ls_gets++; 463 stats->ls_total += ktime_to_ns(kt); 464 /* overflow */ 465 if (unlikely(stats->ls_gets == 0)) { 466 stats->ls_gets++; 467 stats->ls_total = ktime_to_ns(kt); 468 } 469 470 if (stats->ls_max < usec) 471 stats->ls_max = usec; 472 473 if (ret) 474 stats->ls_fail++; 475 } 476 477 static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres) 478 { 479 lockres->l_lock_refresh++; 480 } 481 482 static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw) 483 { 484 mw->mw_lock_start = ktime_get(); 485 } 486 #else 487 static inline void ocfs2_init_lock_stats(struct ocfs2_lock_res *res) 488 { 489 } 490 static inline void ocfs2_update_lock_stats(struct ocfs2_lock_res *res, 491 int level, struct ocfs2_mask_waiter *mw, int ret) 492 { 493 } 494 static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres) 495 { 496 } 497 static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw) 498 { 499 } 500 #endif 501 502 static void ocfs2_lock_res_init_common(struct ocfs2_super *osb, 503 struct ocfs2_lock_res *res, 504 enum ocfs2_lock_type type, 505 struct ocfs2_lock_res_ops *ops, 506 void *priv) 507 { 508 res->l_type = type; 509 res->l_ops = ops; 510 res->l_priv = priv; 511 512 res->l_level = DLM_LOCK_IV; 513 res->l_requested = DLM_LOCK_IV; 514 res->l_blocking = DLM_LOCK_IV; 515 res->l_action = OCFS2_AST_INVALID; 516 res->l_unlock_action = OCFS2_UNLOCK_INVALID; 517 518 res->l_flags = OCFS2_LOCK_INITIALIZED; 519 520 ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug); 521 522 ocfs2_init_lock_stats(res); 523 #ifdef CONFIG_DEBUG_LOCK_ALLOC 524 if (type != OCFS2_LOCK_TYPE_OPEN) 525 lockdep_init_map(&res->l_lockdep_map, ocfs2_lock_type_strings[type], 526 &lockdep_keys[type], 0); 527 else 528 res->l_lockdep_map.key = NULL; 529 #endif 530 } 531 532 void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res) 533 { 534 /* This also clears out the lock status block */ 535 memset(res, 0, sizeof(struct ocfs2_lock_res)); 536 spin_lock_init(&res->l_lock); 537 init_waitqueue_head(&res->l_event); 538 INIT_LIST_HEAD(&res->l_blocked_list); 539 INIT_LIST_HEAD(&res->l_mask_waiters); 540 INIT_LIST_HEAD(&res->l_holders); 541 } 542 543 void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res, 544 enum ocfs2_lock_type type, 545 unsigned int generation, 546 struct inode *inode) 547 { 548 struct ocfs2_lock_res_ops *ops; 549 550 switch(type) { 551 case OCFS2_LOCK_TYPE_RW: 552 ops = &ocfs2_inode_rw_lops; 553 break; 554 case OCFS2_LOCK_TYPE_META: 555 ops = &ocfs2_inode_inode_lops; 556 break; 557 case OCFS2_LOCK_TYPE_OPEN: 558 ops = &ocfs2_inode_open_lops; 559 break; 560 default: 561 mlog_bug_on_msg(1, "type: %d\n", type); 562 ops = NULL; /* thanks, gcc */ 563 break; 564 }; 565 566 ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno, 567 generation, res->l_name); 568 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode); 569 } 570 571 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres) 572 { 573 struct inode *inode = ocfs2_lock_res_inode(lockres); 574 575 return OCFS2_SB(inode->i_sb); 576 } 577 578 static struct ocfs2_super *ocfs2_get_qinfo_osb(struct ocfs2_lock_res *lockres) 579 { 580 struct ocfs2_mem_dqinfo *info = lockres->l_priv; 581 582 return OCFS2_SB(info->dqi_gi.dqi_sb); 583 } 584 585 static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres) 586 { 587 struct ocfs2_file_private *fp = lockres->l_priv; 588 589 return OCFS2_SB(fp->fp_file->f_mapping->host->i_sb); 590 } 591 592 static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres) 593 { 594 __be64 inode_blkno_be; 595 596 memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], 597 sizeof(__be64)); 598 599 return be64_to_cpu(inode_blkno_be); 600 } 601 602 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres) 603 { 604 struct ocfs2_dentry_lock *dl = lockres->l_priv; 605 606 return OCFS2_SB(dl->dl_inode->i_sb); 607 } 608 609 void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl, 610 u64 parent, struct inode *inode) 611 { 612 int len; 613 u64 inode_blkno = OCFS2_I(inode)->ip_blkno; 614 __be64 inode_blkno_be = cpu_to_be64(inode_blkno); 615 struct ocfs2_lock_res *lockres = &dl->dl_lockres; 616 617 ocfs2_lock_res_init_once(lockres); 618 619 /* 620 * Unfortunately, the standard lock naming scheme won't work 621 * here because we have two 16 byte values to use. Instead, 622 * we'll stuff the inode number as a binary value. We still 623 * want error prints to show something without garbling the 624 * display, so drop a null byte in there before the inode 625 * number. A future version of OCFS2 will likely use all 626 * binary lock names. The stringified names have been a 627 * tremendous aid in debugging, but now that the debugfs 628 * interface exists, we can mangle things there if need be. 629 * 630 * NOTE: We also drop the standard "pad" value (the total lock 631 * name size stays the same though - the last part is all 632 * zeros due to the memset in ocfs2_lock_res_init_once() 633 */ 634 len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START, 635 "%c%016llx", 636 ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY), 637 (long long)parent); 638 639 BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1)); 640 641 memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be, 642 sizeof(__be64)); 643 644 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres, 645 OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops, 646 dl); 647 } 648 649 static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res, 650 struct ocfs2_super *osb) 651 { 652 /* Superblock lockres doesn't come from a slab so we call init 653 * once on it manually. */ 654 ocfs2_lock_res_init_once(res); 655 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO, 656 0, res->l_name); 657 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER, 658 &ocfs2_super_lops, osb); 659 } 660 661 static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res, 662 struct ocfs2_super *osb) 663 { 664 /* Rename lockres doesn't come from a slab so we call init 665 * once on it manually. */ 666 ocfs2_lock_res_init_once(res); 667 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name); 668 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME, 669 &ocfs2_rename_lops, osb); 670 } 671 672 static void ocfs2_nfs_sync_lock_res_init(struct ocfs2_lock_res *res, 673 struct ocfs2_super *osb) 674 { 675 /* nfs_sync lockres doesn't come from a slab so we call init 676 * once on it manually. */ 677 ocfs2_lock_res_init_once(res); 678 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_NFS_SYNC, 0, 0, res->l_name); 679 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_NFS_SYNC, 680 &ocfs2_nfs_sync_lops, osb); 681 } 682 683 void ocfs2_trim_fs_lock_res_init(struct ocfs2_super *osb) 684 { 685 struct ocfs2_lock_res *lockres = &osb->osb_trim_fs_lockres; 686 687 ocfs2_lock_res_init_once(lockres); 688 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_TRIM_FS, 0, 0, lockres->l_name); 689 ocfs2_lock_res_init_common(osb, lockres, OCFS2_LOCK_TYPE_TRIM_FS, 690 &ocfs2_trim_fs_lops, osb); 691 } 692 693 void ocfs2_trim_fs_lock_res_uninit(struct ocfs2_super *osb) 694 { 695 struct ocfs2_lock_res *lockres = &osb->osb_trim_fs_lockres; 696 697 ocfs2_simple_drop_lockres(osb, lockres); 698 ocfs2_lock_res_free(lockres); 699 } 700 701 static void ocfs2_orphan_scan_lock_res_init(struct ocfs2_lock_res *res, 702 struct ocfs2_super *osb) 703 { 704 ocfs2_lock_res_init_once(res); 705 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_ORPHAN_SCAN, 0, 0, res->l_name); 706 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_ORPHAN_SCAN, 707 &ocfs2_orphan_scan_lops, osb); 708 } 709 710 void ocfs2_file_lock_res_init(struct ocfs2_lock_res *lockres, 711 struct ocfs2_file_private *fp) 712 { 713 struct inode *inode = fp->fp_file->f_mapping->host; 714 struct ocfs2_inode_info *oi = OCFS2_I(inode); 715 716 ocfs2_lock_res_init_once(lockres); 717 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_FLOCK, oi->ip_blkno, 718 inode->i_generation, lockres->l_name); 719 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres, 720 OCFS2_LOCK_TYPE_FLOCK, &ocfs2_flock_lops, 721 fp); 722 lockres->l_flags |= OCFS2_LOCK_NOCACHE; 723 } 724 725 void ocfs2_qinfo_lock_res_init(struct ocfs2_lock_res *lockres, 726 struct ocfs2_mem_dqinfo *info) 727 { 728 ocfs2_lock_res_init_once(lockres); 729 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_QINFO, info->dqi_gi.dqi_type, 730 0, lockres->l_name); 731 ocfs2_lock_res_init_common(OCFS2_SB(info->dqi_gi.dqi_sb), lockres, 732 OCFS2_LOCK_TYPE_QINFO, &ocfs2_qinfo_lops, 733 info); 734 } 735 736 void ocfs2_refcount_lock_res_init(struct ocfs2_lock_res *lockres, 737 struct ocfs2_super *osb, u64 ref_blkno, 738 unsigned int generation) 739 { 740 ocfs2_lock_res_init_once(lockres); 741 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_REFCOUNT, ref_blkno, 742 generation, lockres->l_name); 743 ocfs2_lock_res_init_common(osb, lockres, OCFS2_LOCK_TYPE_REFCOUNT, 744 &ocfs2_refcount_block_lops, osb); 745 } 746 747 void ocfs2_lock_res_free(struct ocfs2_lock_res *res) 748 { 749 if (!(res->l_flags & OCFS2_LOCK_INITIALIZED)) 750 return; 751 752 ocfs2_remove_lockres_tracking(res); 753 754 mlog_bug_on_msg(!list_empty(&res->l_blocked_list), 755 "Lockres %s is on the blocked list\n", 756 res->l_name); 757 mlog_bug_on_msg(!list_empty(&res->l_mask_waiters), 758 "Lockres %s has mask waiters pending\n", 759 res->l_name); 760 mlog_bug_on_msg(spin_is_locked(&res->l_lock), 761 "Lockres %s is locked\n", 762 res->l_name); 763 mlog_bug_on_msg(res->l_ro_holders, 764 "Lockres %s has %u ro holders\n", 765 res->l_name, res->l_ro_holders); 766 mlog_bug_on_msg(res->l_ex_holders, 767 "Lockres %s has %u ex holders\n", 768 res->l_name, res->l_ex_holders); 769 770 /* Need to clear out the lock status block for the dlm */ 771 memset(&res->l_lksb, 0, sizeof(res->l_lksb)); 772 773 res->l_flags = 0UL; 774 } 775 776 /* 777 * Keep a list of processes who have interest in a lockres. 778 * Note: this is now only uesed for check recursive cluster locking. 779 */ 780 static inline void ocfs2_add_holder(struct ocfs2_lock_res *lockres, 781 struct ocfs2_lock_holder *oh) 782 { 783 INIT_LIST_HEAD(&oh->oh_list); 784 oh->oh_owner_pid = get_pid(task_pid(current)); 785 786 spin_lock(&lockres->l_lock); 787 list_add_tail(&oh->oh_list, &lockres->l_holders); 788 spin_unlock(&lockres->l_lock); 789 } 790 791 static inline void ocfs2_remove_holder(struct ocfs2_lock_res *lockres, 792 struct ocfs2_lock_holder *oh) 793 { 794 spin_lock(&lockres->l_lock); 795 list_del(&oh->oh_list); 796 spin_unlock(&lockres->l_lock); 797 798 put_pid(oh->oh_owner_pid); 799 } 800 801 static inline int ocfs2_is_locked_by_me(struct ocfs2_lock_res *lockres) 802 { 803 struct ocfs2_lock_holder *oh; 804 struct pid *pid; 805 806 /* look in the list of holders for one with the current task as owner */ 807 spin_lock(&lockres->l_lock); 808 pid = task_pid(current); 809 list_for_each_entry(oh, &lockres->l_holders, oh_list) { 810 if (oh->oh_owner_pid == pid) { 811 spin_unlock(&lockres->l_lock); 812 return 1; 813 } 814 } 815 spin_unlock(&lockres->l_lock); 816 817 return 0; 818 } 819 820 static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres, 821 int level) 822 { 823 BUG_ON(!lockres); 824 825 switch(level) { 826 case DLM_LOCK_EX: 827 lockres->l_ex_holders++; 828 break; 829 case DLM_LOCK_PR: 830 lockres->l_ro_holders++; 831 break; 832 default: 833 BUG(); 834 } 835 } 836 837 static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres, 838 int level) 839 { 840 BUG_ON(!lockres); 841 842 switch(level) { 843 case DLM_LOCK_EX: 844 BUG_ON(!lockres->l_ex_holders); 845 lockres->l_ex_holders--; 846 break; 847 case DLM_LOCK_PR: 848 BUG_ON(!lockres->l_ro_holders); 849 lockres->l_ro_holders--; 850 break; 851 default: 852 BUG(); 853 } 854 } 855 856 /* WARNING: This function lives in a world where the only three lock 857 * levels are EX, PR, and NL. It *will* have to be adjusted when more 858 * lock types are added. */ 859 static inline int ocfs2_highest_compat_lock_level(int level) 860 { 861 int new_level = DLM_LOCK_EX; 862 863 if (level == DLM_LOCK_EX) 864 new_level = DLM_LOCK_NL; 865 else if (level == DLM_LOCK_PR) 866 new_level = DLM_LOCK_PR; 867 return new_level; 868 } 869 870 static void lockres_set_flags(struct ocfs2_lock_res *lockres, 871 unsigned long newflags) 872 { 873 struct ocfs2_mask_waiter *mw, *tmp; 874 875 assert_spin_locked(&lockres->l_lock); 876 877 lockres->l_flags = newflags; 878 879 list_for_each_entry_safe(mw, tmp, &lockres->l_mask_waiters, mw_item) { 880 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal) 881 continue; 882 883 list_del_init(&mw->mw_item); 884 mw->mw_status = 0; 885 complete(&mw->mw_complete); 886 } 887 } 888 static void lockres_or_flags(struct ocfs2_lock_res *lockres, unsigned long or) 889 { 890 lockres_set_flags(lockres, lockres->l_flags | or); 891 } 892 static void lockres_clear_flags(struct ocfs2_lock_res *lockres, 893 unsigned long clear) 894 { 895 lockres_set_flags(lockres, lockres->l_flags & ~clear); 896 } 897 898 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres) 899 { 900 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 901 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED)); 902 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); 903 BUG_ON(lockres->l_blocking <= DLM_LOCK_NL); 904 905 lockres->l_level = lockres->l_requested; 906 if (lockres->l_level <= 907 ocfs2_highest_compat_lock_level(lockres->l_blocking)) { 908 lockres->l_blocking = DLM_LOCK_NL; 909 lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED); 910 } 911 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 912 } 913 914 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres) 915 { 916 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 917 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED)); 918 919 /* Convert from RO to EX doesn't really need anything as our 920 * information is already up to data. Convert from NL to 921 * *anything* however should mark ourselves as needing an 922 * update */ 923 if (lockres->l_level == DLM_LOCK_NL && 924 lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 925 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 926 927 lockres->l_level = lockres->l_requested; 928 929 /* 930 * We set the OCFS2_LOCK_UPCONVERT_FINISHING flag before clearing 931 * the OCFS2_LOCK_BUSY flag to prevent the dc thread from 932 * downconverting the lock before the upconvert has fully completed. 933 * Do not prevent the dc thread from downconverting if NONBLOCK lock 934 * had already returned. 935 */ 936 if (!(lockres->l_flags & OCFS2_LOCK_NONBLOCK_FINISHED)) 937 lockres_or_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING); 938 else 939 lockres_clear_flags(lockres, OCFS2_LOCK_NONBLOCK_FINISHED); 940 941 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 942 } 943 944 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres) 945 { 946 BUG_ON((!(lockres->l_flags & OCFS2_LOCK_BUSY))); 947 BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED); 948 949 if (lockres->l_requested > DLM_LOCK_NL && 950 !(lockres->l_flags & OCFS2_LOCK_LOCAL) && 951 lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 952 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 953 954 lockres->l_level = lockres->l_requested; 955 lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED); 956 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 957 } 958 959 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, 960 int level) 961 { 962 int needs_downconvert = 0; 963 964 assert_spin_locked(&lockres->l_lock); 965 966 if (level > lockres->l_blocking) { 967 /* only schedule a downconvert if we haven't already scheduled 968 * one that goes low enough to satisfy the level we're 969 * blocking. this also catches the case where we get 970 * duplicate BASTs */ 971 if (ocfs2_highest_compat_lock_level(level) < 972 ocfs2_highest_compat_lock_level(lockres->l_blocking)) 973 needs_downconvert = 1; 974 975 lockres->l_blocking = level; 976 } 977 978 mlog(ML_BASTS, "lockres %s, block %d, level %d, l_block %d, dwn %d\n", 979 lockres->l_name, level, lockres->l_level, lockres->l_blocking, 980 needs_downconvert); 981 982 if (needs_downconvert) 983 lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED); 984 mlog(0, "needs_downconvert = %d\n", needs_downconvert); 985 return needs_downconvert; 986 } 987 988 /* 989 * OCFS2_LOCK_PENDING and l_pending_gen. 990 * 991 * Why does OCFS2_LOCK_PENDING exist? To close a race between setting 992 * OCFS2_LOCK_BUSY and calling ocfs2_dlm_lock(). See ocfs2_unblock_lock() 993 * for more details on the race. 994 * 995 * OCFS2_LOCK_PENDING closes the race quite nicely. However, it introduces 996 * a race on itself. In o2dlm, we can get the ast before ocfs2_dlm_lock() 997 * returns. The ast clears OCFS2_LOCK_BUSY, and must therefore clear 998 * OCFS2_LOCK_PENDING at the same time. When ocfs2_dlm_lock() returns, 999 * the caller is going to try to clear PENDING again. If nothing else is 1000 * happening, __lockres_clear_pending() sees PENDING is unset and does 1001 * nothing. 1002 * 1003 * But what if another path (eg downconvert thread) has just started a 1004 * new locking action? The other path has re-set PENDING. Our path 1005 * cannot clear PENDING, because that will re-open the original race 1006 * window. 1007 * 1008 * [Example] 1009 * 1010 * ocfs2_meta_lock() 1011 * ocfs2_cluster_lock() 1012 * set BUSY 1013 * set PENDING 1014 * drop l_lock 1015 * ocfs2_dlm_lock() 1016 * ocfs2_locking_ast() ocfs2_downconvert_thread() 1017 * clear PENDING ocfs2_unblock_lock() 1018 * take_l_lock 1019 * !BUSY 1020 * ocfs2_prepare_downconvert() 1021 * set BUSY 1022 * set PENDING 1023 * drop l_lock 1024 * take l_lock 1025 * clear PENDING 1026 * drop l_lock 1027 * <window> 1028 * ocfs2_dlm_lock() 1029 * 1030 * So as you can see, we now have a window where l_lock is not held, 1031 * PENDING is not set, and ocfs2_dlm_lock() has not been called. 1032 * 1033 * The core problem is that ocfs2_cluster_lock() has cleared the PENDING 1034 * set by ocfs2_prepare_downconvert(). That wasn't nice. 1035 * 1036 * To solve this we introduce l_pending_gen. A call to 1037 * lockres_clear_pending() will only do so when it is passed a generation 1038 * number that matches the lockres. lockres_set_pending() will return the 1039 * current generation number. When ocfs2_cluster_lock() goes to clear 1040 * PENDING, it passes the generation it got from set_pending(). In our 1041 * example above, the generation numbers will *not* match. Thus, 1042 * ocfs2_cluster_lock() will not clear the PENDING set by 1043 * ocfs2_prepare_downconvert(). 1044 */ 1045 1046 /* Unlocked version for ocfs2_locking_ast() */ 1047 static void __lockres_clear_pending(struct ocfs2_lock_res *lockres, 1048 unsigned int generation, 1049 struct ocfs2_super *osb) 1050 { 1051 assert_spin_locked(&lockres->l_lock); 1052 1053 /* 1054 * The ast and locking functions can race us here. The winner 1055 * will clear pending, the loser will not. 1056 */ 1057 if (!(lockres->l_flags & OCFS2_LOCK_PENDING) || 1058 (lockres->l_pending_gen != generation)) 1059 return; 1060 1061 lockres_clear_flags(lockres, OCFS2_LOCK_PENDING); 1062 lockres->l_pending_gen++; 1063 1064 /* 1065 * The downconvert thread may have skipped us because we 1066 * were PENDING. Wake it up. 1067 */ 1068 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) 1069 ocfs2_wake_downconvert_thread(osb); 1070 } 1071 1072 /* Locked version for callers of ocfs2_dlm_lock() */ 1073 static void lockres_clear_pending(struct ocfs2_lock_res *lockres, 1074 unsigned int generation, 1075 struct ocfs2_super *osb) 1076 { 1077 unsigned long flags; 1078 1079 spin_lock_irqsave(&lockres->l_lock, flags); 1080 __lockres_clear_pending(lockres, generation, osb); 1081 spin_unlock_irqrestore(&lockres->l_lock, flags); 1082 } 1083 1084 static unsigned int lockres_set_pending(struct ocfs2_lock_res *lockres) 1085 { 1086 assert_spin_locked(&lockres->l_lock); 1087 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 1088 1089 lockres_or_flags(lockres, OCFS2_LOCK_PENDING); 1090 1091 return lockres->l_pending_gen; 1092 } 1093 1094 static void ocfs2_blocking_ast(struct ocfs2_dlm_lksb *lksb, int level) 1095 { 1096 struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb); 1097 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 1098 int needs_downconvert; 1099 unsigned long flags; 1100 1101 BUG_ON(level <= DLM_LOCK_NL); 1102 1103 mlog(ML_BASTS, "BAST fired for lockres %s, blocking %d, level %d, " 1104 "type %s\n", lockres->l_name, level, lockres->l_level, 1105 ocfs2_lock_type_string(lockres->l_type)); 1106 1107 /* 1108 * We can skip the bast for locks which don't enable caching - 1109 * they'll be dropped at the earliest possible time anyway. 1110 */ 1111 if (lockres->l_flags & OCFS2_LOCK_NOCACHE) 1112 return; 1113 1114 spin_lock_irqsave(&lockres->l_lock, flags); 1115 needs_downconvert = ocfs2_generic_handle_bast(lockres, level); 1116 if (needs_downconvert) 1117 ocfs2_schedule_blocked_lock(osb, lockres); 1118 spin_unlock_irqrestore(&lockres->l_lock, flags); 1119 1120 wake_up(&lockres->l_event); 1121 1122 ocfs2_wake_downconvert_thread(osb); 1123 } 1124 1125 static void ocfs2_locking_ast(struct ocfs2_dlm_lksb *lksb) 1126 { 1127 struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb); 1128 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 1129 unsigned long flags; 1130 int status; 1131 1132 spin_lock_irqsave(&lockres->l_lock, flags); 1133 1134 status = ocfs2_dlm_lock_status(&lockres->l_lksb); 1135 1136 if (status == -EAGAIN) { 1137 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 1138 goto out; 1139 } 1140 1141 if (status) { 1142 mlog(ML_ERROR, "lockres %s: lksb status value of %d!\n", 1143 lockres->l_name, status); 1144 spin_unlock_irqrestore(&lockres->l_lock, flags); 1145 return; 1146 } 1147 1148 mlog(ML_BASTS, "AST fired for lockres %s, action %d, unlock %d, " 1149 "level %d => %d\n", lockres->l_name, lockres->l_action, 1150 lockres->l_unlock_action, lockres->l_level, lockres->l_requested); 1151 1152 switch(lockres->l_action) { 1153 case OCFS2_AST_ATTACH: 1154 ocfs2_generic_handle_attach_action(lockres); 1155 lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL); 1156 break; 1157 case OCFS2_AST_CONVERT: 1158 ocfs2_generic_handle_convert_action(lockres); 1159 break; 1160 case OCFS2_AST_DOWNCONVERT: 1161 ocfs2_generic_handle_downconvert_action(lockres); 1162 break; 1163 default: 1164 mlog(ML_ERROR, "lockres %s: AST fired with invalid action: %u, " 1165 "flags 0x%lx, unlock: %u\n", 1166 lockres->l_name, lockres->l_action, lockres->l_flags, 1167 lockres->l_unlock_action); 1168 BUG(); 1169 } 1170 out: 1171 /* set it to something invalid so if we get called again we 1172 * can catch it. */ 1173 lockres->l_action = OCFS2_AST_INVALID; 1174 1175 /* Did we try to cancel this lock? Clear that state */ 1176 if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) 1177 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 1178 1179 /* 1180 * We may have beaten the locking functions here. We certainly 1181 * know that dlm_lock() has been called :-) 1182 * Because we can't have two lock calls in flight at once, we 1183 * can use lockres->l_pending_gen. 1184 */ 1185 __lockres_clear_pending(lockres, lockres->l_pending_gen, osb); 1186 1187 wake_up(&lockres->l_event); 1188 spin_unlock_irqrestore(&lockres->l_lock, flags); 1189 } 1190 1191 static void ocfs2_unlock_ast(struct ocfs2_dlm_lksb *lksb, int error) 1192 { 1193 struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb); 1194 unsigned long flags; 1195 1196 mlog(ML_BASTS, "UNLOCK AST fired for lockres %s, action = %d\n", 1197 lockres->l_name, lockres->l_unlock_action); 1198 1199 spin_lock_irqsave(&lockres->l_lock, flags); 1200 if (error) { 1201 mlog(ML_ERROR, "Dlm passes error %d for lock %s, " 1202 "unlock_action %d\n", error, lockres->l_name, 1203 lockres->l_unlock_action); 1204 spin_unlock_irqrestore(&lockres->l_lock, flags); 1205 return; 1206 } 1207 1208 switch(lockres->l_unlock_action) { 1209 case OCFS2_UNLOCK_CANCEL_CONVERT: 1210 mlog(0, "Cancel convert success for %s\n", lockres->l_name); 1211 lockres->l_action = OCFS2_AST_INVALID; 1212 /* Downconvert thread may have requeued this lock, we 1213 * need to wake it. */ 1214 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) 1215 ocfs2_wake_downconvert_thread(ocfs2_get_lockres_osb(lockres)); 1216 break; 1217 case OCFS2_UNLOCK_DROP_LOCK: 1218 lockres->l_level = DLM_LOCK_IV; 1219 break; 1220 default: 1221 BUG(); 1222 } 1223 1224 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 1225 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 1226 wake_up(&lockres->l_event); 1227 spin_unlock_irqrestore(&lockres->l_lock, flags); 1228 } 1229 1230 /* 1231 * This is the filesystem locking protocol. It provides the lock handling 1232 * hooks for the underlying DLM. It has a maximum version number. 1233 * The version number allows interoperability with systems running at 1234 * the same major number and an equal or smaller minor number. 1235 * 1236 * Whenever the filesystem does new things with locks (adds or removes a 1237 * lock, orders them differently, does different things underneath a lock), 1238 * the version must be changed. The protocol is negotiated when joining 1239 * the dlm domain. A node may join the domain if its major version is 1240 * identical to all other nodes and its minor version is greater than 1241 * or equal to all other nodes. When its minor version is greater than 1242 * the other nodes, it will run at the minor version specified by the 1243 * other nodes. 1244 * 1245 * If a locking change is made that will not be compatible with older 1246 * versions, the major number must be increased and the minor version set 1247 * to zero. If a change merely adds a behavior that can be disabled when 1248 * speaking to older versions, the minor version must be increased. If a 1249 * change adds a fully backwards compatible change (eg, LVB changes that 1250 * are just ignored by older versions), the version does not need to be 1251 * updated. 1252 */ 1253 static struct ocfs2_locking_protocol lproto = { 1254 .lp_max_version = { 1255 .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR, 1256 .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR, 1257 }, 1258 .lp_lock_ast = ocfs2_locking_ast, 1259 .lp_blocking_ast = ocfs2_blocking_ast, 1260 .lp_unlock_ast = ocfs2_unlock_ast, 1261 }; 1262 1263 void ocfs2_set_locking_protocol(void) 1264 { 1265 ocfs2_stack_glue_set_max_proto_version(&lproto.lp_max_version); 1266 } 1267 1268 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres, 1269 int convert) 1270 { 1271 unsigned long flags; 1272 1273 spin_lock_irqsave(&lockres->l_lock, flags); 1274 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 1275 lockres_clear_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING); 1276 if (convert) 1277 lockres->l_action = OCFS2_AST_INVALID; 1278 else 1279 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 1280 spin_unlock_irqrestore(&lockres->l_lock, flags); 1281 1282 wake_up(&lockres->l_event); 1283 } 1284 1285 /* Note: If we detect another process working on the lock (i.e., 1286 * OCFS2_LOCK_BUSY), we'll bail out returning 0. It's up to the caller 1287 * to do the right thing in that case. 1288 */ 1289 static int ocfs2_lock_create(struct ocfs2_super *osb, 1290 struct ocfs2_lock_res *lockres, 1291 int level, 1292 u32 dlm_flags) 1293 { 1294 int ret = 0; 1295 unsigned long flags; 1296 unsigned int gen; 1297 1298 mlog(0, "lock %s, level = %d, flags = %u\n", lockres->l_name, level, 1299 dlm_flags); 1300 1301 spin_lock_irqsave(&lockres->l_lock, flags); 1302 if ((lockres->l_flags & OCFS2_LOCK_ATTACHED) || 1303 (lockres->l_flags & OCFS2_LOCK_BUSY)) { 1304 spin_unlock_irqrestore(&lockres->l_lock, flags); 1305 goto bail; 1306 } 1307 1308 lockres->l_action = OCFS2_AST_ATTACH; 1309 lockres->l_requested = level; 1310 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 1311 gen = lockres_set_pending(lockres); 1312 spin_unlock_irqrestore(&lockres->l_lock, flags); 1313 1314 ret = ocfs2_dlm_lock(osb->cconn, 1315 level, 1316 &lockres->l_lksb, 1317 dlm_flags, 1318 lockres->l_name, 1319 OCFS2_LOCK_ID_MAX_LEN - 1); 1320 lockres_clear_pending(lockres, gen, osb); 1321 if (ret) { 1322 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 1323 ocfs2_recover_from_dlm_error(lockres, 1); 1324 } 1325 1326 mlog(0, "lock %s, return from ocfs2_dlm_lock\n", lockres->l_name); 1327 1328 bail: 1329 return ret; 1330 } 1331 1332 static inline int ocfs2_check_wait_flag(struct ocfs2_lock_res *lockres, 1333 int flag) 1334 { 1335 unsigned long flags; 1336 int ret; 1337 1338 spin_lock_irqsave(&lockres->l_lock, flags); 1339 ret = lockres->l_flags & flag; 1340 spin_unlock_irqrestore(&lockres->l_lock, flags); 1341 1342 return ret; 1343 } 1344 1345 static inline void ocfs2_wait_on_busy_lock(struct ocfs2_lock_res *lockres) 1346 1347 { 1348 wait_event(lockres->l_event, 1349 !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_BUSY)); 1350 } 1351 1352 static inline void ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res *lockres) 1353 1354 { 1355 wait_event(lockres->l_event, 1356 !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_REFRESHING)); 1357 } 1358 1359 /* predict what lock level we'll be dropping down to on behalf 1360 * of another node, and return true if the currently wanted 1361 * level will be compatible with it. */ 1362 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres, 1363 int wanted) 1364 { 1365 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); 1366 1367 return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking); 1368 } 1369 1370 static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw) 1371 { 1372 INIT_LIST_HEAD(&mw->mw_item); 1373 init_completion(&mw->mw_complete); 1374 ocfs2_init_start_time(mw); 1375 } 1376 1377 static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw) 1378 { 1379 wait_for_completion(&mw->mw_complete); 1380 /* Re-arm the completion in case we want to wait on it again */ 1381 reinit_completion(&mw->mw_complete); 1382 return mw->mw_status; 1383 } 1384 1385 static void lockres_add_mask_waiter(struct ocfs2_lock_res *lockres, 1386 struct ocfs2_mask_waiter *mw, 1387 unsigned long mask, 1388 unsigned long goal) 1389 { 1390 BUG_ON(!list_empty(&mw->mw_item)); 1391 1392 assert_spin_locked(&lockres->l_lock); 1393 1394 list_add_tail(&mw->mw_item, &lockres->l_mask_waiters); 1395 mw->mw_mask = mask; 1396 mw->mw_goal = goal; 1397 } 1398 1399 /* returns 0 if the mw that was removed was already satisfied, -EBUSY 1400 * if the mask still hadn't reached its goal */ 1401 static int __lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres, 1402 struct ocfs2_mask_waiter *mw) 1403 { 1404 int ret = 0; 1405 1406 assert_spin_locked(&lockres->l_lock); 1407 if (!list_empty(&mw->mw_item)) { 1408 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal) 1409 ret = -EBUSY; 1410 1411 list_del_init(&mw->mw_item); 1412 init_completion(&mw->mw_complete); 1413 } 1414 1415 return ret; 1416 } 1417 1418 static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres, 1419 struct ocfs2_mask_waiter *mw) 1420 { 1421 unsigned long flags; 1422 int ret = 0; 1423 1424 spin_lock_irqsave(&lockres->l_lock, flags); 1425 ret = __lockres_remove_mask_waiter(lockres, mw); 1426 spin_unlock_irqrestore(&lockres->l_lock, flags); 1427 1428 return ret; 1429 1430 } 1431 1432 static int ocfs2_wait_for_mask_interruptible(struct ocfs2_mask_waiter *mw, 1433 struct ocfs2_lock_res *lockres) 1434 { 1435 int ret; 1436 1437 ret = wait_for_completion_interruptible(&mw->mw_complete); 1438 if (ret) 1439 lockres_remove_mask_waiter(lockres, mw); 1440 else 1441 ret = mw->mw_status; 1442 /* Re-arm the completion in case we want to wait on it again */ 1443 reinit_completion(&mw->mw_complete); 1444 return ret; 1445 } 1446 1447 static int __ocfs2_cluster_lock(struct ocfs2_super *osb, 1448 struct ocfs2_lock_res *lockres, 1449 int level, 1450 u32 lkm_flags, 1451 int arg_flags, 1452 int l_subclass, 1453 unsigned long caller_ip) 1454 { 1455 struct ocfs2_mask_waiter mw; 1456 int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR); 1457 int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */ 1458 unsigned long flags; 1459 unsigned int gen; 1460 int noqueue_attempted = 0; 1461 int dlm_locked = 0; 1462 int kick_dc = 0; 1463 1464 if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED)) { 1465 mlog_errno(-EINVAL); 1466 return -EINVAL; 1467 } 1468 1469 ocfs2_init_mask_waiter(&mw); 1470 1471 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) 1472 lkm_flags |= DLM_LKF_VALBLK; 1473 1474 again: 1475 wait = 0; 1476 1477 spin_lock_irqsave(&lockres->l_lock, flags); 1478 1479 if (catch_signals && signal_pending(current)) { 1480 ret = -ERESTARTSYS; 1481 goto unlock; 1482 } 1483 1484 mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING, 1485 "Cluster lock called on freeing lockres %s! flags " 1486 "0x%lx\n", lockres->l_name, lockres->l_flags); 1487 1488 /* We only compare against the currently granted level 1489 * here. If the lock is blocked waiting on a downconvert, 1490 * we'll get caught below. */ 1491 if (lockres->l_flags & OCFS2_LOCK_BUSY && 1492 level > lockres->l_level) { 1493 /* is someone sitting in dlm_lock? If so, wait on 1494 * them. */ 1495 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1496 wait = 1; 1497 goto unlock; 1498 } 1499 1500 if (lockres->l_flags & OCFS2_LOCK_UPCONVERT_FINISHING) { 1501 /* 1502 * We've upconverted. If the lock now has a level we can 1503 * work with, we take it. If, however, the lock is not at the 1504 * required level, we go thru the full cycle. One way this could 1505 * happen is if a process requesting an upconvert to PR is 1506 * closely followed by another requesting upconvert to an EX. 1507 * If the process requesting EX lands here, we want it to 1508 * continue attempting to upconvert and let the process 1509 * requesting PR take the lock. 1510 * If multiple processes request upconvert to PR, the first one 1511 * here will take the lock. The others will have to go thru the 1512 * OCFS2_LOCK_BLOCKED check to ensure that there is no pending 1513 * downconvert request. 1514 */ 1515 if (level <= lockres->l_level) 1516 goto update_holders; 1517 } 1518 1519 if (lockres->l_flags & OCFS2_LOCK_BLOCKED && 1520 !ocfs2_may_continue_on_blocked_lock(lockres, level)) { 1521 /* is the lock is currently blocked on behalf of 1522 * another node */ 1523 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BLOCKED, 0); 1524 wait = 1; 1525 goto unlock; 1526 } 1527 1528 if (level > lockres->l_level) { 1529 if (noqueue_attempted > 0) { 1530 ret = -EAGAIN; 1531 goto unlock; 1532 } 1533 if (lkm_flags & DLM_LKF_NOQUEUE) 1534 noqueue_attempted = 1; 1535 1536 if (lockres->l_action != OCFS2_AST_INVALID) 1537 mlog(ML_ERROR, "lockres %s has action %u pending\n", 1538 lockres->l_name, lockres->l_action); 1539 1540 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 1541 lockres->l_action = OCFS2_AST_ATTACH; 1542 lkm_flags &= ~DLM_LKF_CONVERT; 1543 } else { 1544 lockres->l_action = OCFS2_AST_CONVERT; 1545 lkm_flags |= DLM_LKF_CONVERT; 1546 } 1547 1548 lockres->l_requested = level; 1549 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 1550 gen = lockres_set_pending(lockres); 1551 spin_unlock_irqrestore(&lockres->l_lock, flags); 1552 1553 BUG_ON(level == DLM_LOCK_IV); 1554 BUG_ON(level == DLM_LOCK_NL); 1555 1556 mlog(ML_BASTS, "lockres %s, convert from %d to %d\n", 1557 lockres->l_name, lockres->l_level, level); 1558 1559 /* call dlm_lock to upgrade lock now */ 1560 ret = ocfs2_dlm_lock(osb->cconn, 1561 level, 1562 &lockres->l_lksb, 1563 lkm_flags, 1564 lockres->l_name, 1565 OCFS2_LOCK_ID_MAX_LEN - 1); 1566 lockres_clear_pending(lockres, gen, osb); 1567 if (ret) { 1568 if (!(lkm_flags & DLM_LKF_NOQUEUE) || 1569 (ret != -EAGAIN)) { 1570 ocfs2_log_dlm_error("ocfs2_dlm_lock", 1571 ret, lockres); 1572 } 1573 ocfs2_recover_from_dlm_error(lockres, 1); 1574 goto out; 1575 } 1576 dlm_locked = 1; 1577 1578 mlog(0, "lock %s, successful return from ocfs2_dlm_lock\n", 1579 lockres->l_name); 1580 1581 /* At this point we've gone inside the dlm and need to 1582 * complete our work regardless. */ 1583 catch_signals = 0; 1584 1585 /* wait for busy to clear and carry on */ 1586 goto again; 1587 } 1588 1589 update_holders: 1590 /* Ok, if we get here then we're good to go. */ 1591 ocfs2_inc_holders(lockres, level); 1592 1593 ret = 0; 1594 unlock: 1595 lockres_clear_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING); 1596 1597 /* ocfs2_unblock_lock reques on seeing OCFS2_LOCK_UPCONVERT_FINISHING */ 1598 kick_dc = (lockres->l_flags & OCFS2_LOCK_BLOCKED); 1599 1600 spin_unlock_irqrestore(&lockres->l_lock, flags); 1601 if (kick_dc) 1602 ocfs2_wake_downconvert_thread(osb); 1603 out: 1604 /* 1605 * This is helping work around a lock inversion between the page lock 1606 * and dlm locks. One path holds the page lock while calling aops 1607 * which block acquiring dlm locks. The voting thread holds dlm 1608 * locks while acquiring page locks while down converting data locks. 1609 * This block is helping an aop path notice the inversion and back 1610 * off to unlock its page lock before trying the dlm lock again. 1611 */ 1612 if (wait && arg_flags & OCFS2_LOCK_NONBLOCK && 1613 mw.mw_mask & (OCFS2_LOCK_BUSY|OCFS2_LOCK_BLOCKED)) { 1614 wait = 0; 1615 spin_lock_irqsave(&lockres->l_lock, flags); 1616 if (__lockres_remove_mask_waiter(lockres, &mw)) { 1617 if (dlm_locked) 1618 lockres_or_flags(lockres, 1619 OCFS2_LOCK_NONBLOCK_FINISHED); 1620 spin_unlock_irqrestore(&lockres->l_lock, flags); 1621 ret = -EAGAIN; 1622 } else { 1623 spin_unlock_irqrestore(&lockres->l_lock, flags); 1624 goto again; 1625 } 1626 } 1627 if (wait) { 1628 ret = ocfs2_wait_for_mask(&mw); 1629 if (ret == 0) 1630 goto again; 1631 mlog_errno(ret); 1632 } 1633 ocfs2_update_lock_stats(lockres, level, &mw, ret); 1634 1635 #ifdef CONFIG_DEBUG_LOCK_ALLOC 1636 if (!ret && lockres->l_lockdep_map.key != NULL) { 1637 if (level == DLM_LOCK_PR) 1638 rwsem_acquire_read(&lockres->l_lockdep_map, l_subclass, 1639 !!(arg_flags & OCFS2_META_LOCK_NOQUEUE), 1640 caller_ip); 1641 else 1642 rwsem_acquire(&lockres->l_lockdep_map, l_subclass, 1643 !!(arg_flags & OCFS2_META_LOCK_NOQUEUE), 1644 caller_ip); 1645 } 1646 #endif 1647 return ret; 1648 } 1649 1650 static inline int ocfs2_cluster_lock(struct ocfs2_super *osb, 1651 struct ocfs2_lock_res *lockres, 1652 int level, 1653 u32 lkm_flags, 1654 int arg_flags) 1655 { 1656 return __ocfs2_cluster_lock(osb, lockres, level, lkm_flags, arg_flags, 1657 0, _RET_IP_); 1658 } 1659 1660 1661 static void __ocfs2_cluster_unlock(struct ocfs2_super *osb, 1662 struct ocfs2_lock_res *lockres, 1663 int level, 1664 unsigned long caller_ip) 1665 { 1666 unsigned long flags; 1667 1668 spin_lock_irqsave(&lockres->l_lock, flags); 1669 ocfs2_dec_holders(lockres, level); 1670 ocfs2_downconvert_on_unlock(osb, lockres); 1671 spin_unlock_irqrestore(&lockres->l_lock, flags); 1672 #ifdef CONFIG_DEBUG_LOCK_ALLOC 1673 if (lockres->l_lockdep_map.key != NULL) 1674 rwsem_release(&lockres->l_lockdep_map, 1, caller_ip); 1675 #endif 1676 } 1677 1678 static int ocfs2_create_new_lock(struct ocfs2_super *osb, 1679 struct ocfs2_lock_res *lockres, 1680 int ex, 1681 int local) 1682 { 1683 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 1684 unsigned long flags; 1685 u32 lkm_flags = local ? DLM_LKF_LOCAL : 0; 1686 1687 spin_lock_irqsave(&lockres->l_lock, flags); 1688 BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED); 1689 lockres_or_flags(lockres, OCFS2_LOCK_LOCAL); 1690 spin_unlock_irqrestore(&lockres->l_lock, flags); 1691 1692 return ocfs2_lock_create(osb, lockres, level, lkm_flags); 1693 } 1694 1695 /* Grants us an EX lock on the data and metadata resources, skipping 1696 * the normal cluster directory lookup. Use this ONLY on newly created 1697 * inodes which other nodes can't possibly see, and which haven't been 1698 * hashed in the inode hash yet. This can give us a good performance 1699 * increase as it'll skip the network broadcast normally associated 1700 * with creating a new lock resource. */ 1701 int ocfs2_create_new_inode_locks(struct inode *inode) 1702 { 1703 int ret; 1704 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1705 1706 BUG_ON(!ocfs2_inode_is_new(inode)); 1707 1708 mlog(0, "Inode %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno); 1709 1710 /* NOTE: That we don't increment any of the holder counts, nor 1711 * do we add anything to a journal handle. Since this is 1712 * supposed to be a new inode which the cluster doesn't know 1713 * about yet, there is no need to. As far as the LVB handling 1714 * is concerned, this is basically like acquiring an EX lock 1715 * on a resource which has an invalid one -- we'll set it 1716 * valid when we release the EX. */ 1717 1718 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1, 1); 1719 if (ret) { 1720 mlog_errno(ret); 1721 goto bail; 1722 } 1723 1724 /* 1725 * We don't want to use DLM_LKF_LOCAL on a meta data lock as they 1726 * don't use a generation in their lock names. 1727 */ 1728 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_inode_lockres, 1, 0); 1729 if (ret) { 1730 mlog_errno(ret); 1731 goto bail; 1732 } 1733 1734 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_open_lockres, 0, 0); 1735 if (ret) 1736 mlog_errno(ret); 1737 1738 bail: 1739 return ret; 1740 } 1741 1742 int ocfs2_rw_lock(struct inode *inode, int write) 1743 { 1744 int status, level; 1745 struct ocfs2_lock_res *lockres; 1746 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1747 1748 mlog(0, "inode %llu take %s RW lock\n", 1749 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1750 write ? "EXMODE" : "PRMODE"); 1751 1752 if (ocfs2_mount_local(osb)) 1753 return 0; 1754 1755 lockres = &OCFS2_I(inode)->ip_rw_lockres; 1756 1757 level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1758 1759 status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0, 1760 0); 1761 if (status < 0) 1762 mlog_errno(status); 1763 1764 return status; 1765 } 1766 1767 int ocfs2_try_rw_lock(struct inode *inode, int write) 1768 { 1769 int status, level; 1770 struct ocfs2_lock_res *lockres; 1771 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1772 1773 mlog(0, "inode %llu try to take %s RW lock\n", 1774 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1775 write ? "EXMODE" : "PRMODE"); 1776 1777 if (ocfs2_mount_local(osb)) 1778 return 0; 1779 1780 lockres = &OCFS2_I(inode)->ip_rw_lockres; 1781 1782 level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1783 1784 status = ocfs2_cluster_lock(osb, lockres, level, DLM_LKF_NOQUEUE, 0); 1785 return status; 1786 } 1787 1788 void ocfs2_rw_unlock(struct inode *inode, int write) 1789 { 1790 int level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1791 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres; 1792 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1793 1794 mlog(0, "inode %llu drop %s RW lock\n", 1795 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1796 write ? "EXMODE" : "PRMODE"); 1797 1798 if (!ocfs2_mount_local(osb)) 1799 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level); 1800 } 1801 1802 /* 1803 * ocfs2_open_lock always get PR mode lock. 1804 */ 1805 int ocfs2_open_lock(struct inode *inode) 1806 { 1807 int status = 0; 1808 struct ocfs2_lock_res *lockres; 1809 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1810 1811 mlog(0, "inode %llu take PRMODE open lock\n", 1812 (unsigned long long)OCFS2_I(inode)->ip_blkno); 1813 1814 if (ocfs2_is_hard_readonly(osb) || ocfs2_mount_local(osb)) 1815 goto out; 1816 1817 lockres = &OCFS2_I(inode)->ip_open_lockres; 1818 1819 status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, 1820 DLM_LOCK_PR, 0, 0); 1821 if (status < 0) 1822 mlog_errno(status); 1823 1824 out: 1825 return status; 1826 } 1827 1828 int ocfs2_try_open_lock(struct inode *inode, int write) 1829 { 1830 int status = 0, level; 1831 struct ocfs2_lock_res *lockres; 1832 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1833 1834 mlog(0, "inode %llu try to take %s open lock\n", 1835 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1836 write ? "EXMODE" : "PRMODE"); 1837 1838 if (ocfs2_is_hard_readonly(osb)) { 1839 if (write) 1840 status = -EROFS; 1841 goto out; 1842 } 1843 1844 if (ocfs2_mount_local(osb)) 1845 goto out; 1846 1847 lockres = &OCFS2_I(inode)->ip_open_lockres; 1848 1849 level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1850 1851 /* 1852 * The file system may already holding a PRMODE/EXMODE open lock. 1853 * Since we pass DLM_LKF_NOQUEUE, the request won't block waiting on 1854 * other nodes and the -EAGAIN will indicate to the caller that 1855 * this inode is still in use. 1856 */ 1857 status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, 1858 level, DLM_LKF_NOQUEUE, 0); 1859 1860 out: 1861 return status; 1862 } 1863 1864 /* 1865 * ocfs2_open_unlock unlock PR and EX mode open locks. 1866 */ 1867 void ocfs2_open_unlock(struct inode *inode) 1868 { 1869 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_open_lockres; 1870 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1871 1872 mlog(0, "inode %llu drop open lock\n", 1873 (unsigned long long)OCFS2_I(inode)->ip_blkno); 1874 1875 if (ocfs2_mount_local(osb)) 1876 goto out; 1877 1878 if(lockres->l_ro_holders) 1879 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, 1880 DLM_LOCK_PR); 1881 if(lockres->l_ex_holders) 1882 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, 1883 DLM_LOCK_EX); 1884 1885 out: 1886 return; 1887 } 1888 1889 static int ocfs2_flock_handle_signal(struct ocfs2_lock_res *lockres, 1890 int level) 1891 { 1892 int ret; 1893 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 1894 unsigned long flags; 1895 struct ocfs2_mask_waiter mw; 1896 1897 ocfs2_init_mask_waiter(&mw); 1898 1899 retry_cancel: 1900 spin_lock_irqsave(&lockres->l_lock, flags); 1901 if (lockres->l_flags & OCFS2_LOCK_BUSY) { 1902 ret = ocfs2_prepare_cancel_convert(osb, lockres); 1903 if (ret) { 1904 spin_unlock_irqrestore(&lockres->l_lock, flags); 1905 ret = ocfs2_cancel_convert(osb, lockres); 1906 if (ret < 0) { 1907 mlog_errno(ret); 1908 goto out; 1909 } 1910 goto retry_cancel; 1911 } 1912 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1913 spin_unlock_irqrestore(&lockres->l_lock, flags); 1914 1915 ocfs2_wait_for_mask(&mw); 1916 goto retry_cancel; 1917 } 1918 1919 ret = -ERESTARTSYS; 1920 /* 1921 * We may still have gotten the lock, in which case there's no 1922 * point to restarting the syscall. 1923 */ 1924 if (lockres->l_level == level) 1925 ret = 0; 1926 1927 mlog(0, "Cancel returning %d. flags: 0x%lx, level: %d, act: %d\n", ret, 1928 lockres->l_flags, lockres->l_level, lockres->l_action); 1929 1930 spin_unlock_irqrestore(&lockres->l_lock, flags); 1931 1932 out: 1933 return ret; 1934 } 1935 1936 /* 1937 * ocfs2_file_lock() and ocfs2_file_unlock() map to a single pair of 1938 * flock() calls. The locking approach this requires is sufficiently 1939 * different from all other cluster lock types that we implement a 1940 * separate path to the "low-level" dlm calls. In particular: 1941 * 1942 * - No optimization of lock levels is done - we take at exactly 1943 * what's been requested. 1944 * 1945 * - No lock caching is employed. We immediately downconvert to 1946 * no-lock at unlock time. This also means flock locks never go on 1947 * the blocking list). 1948 * 1949 * - Since userspace can trivially deadlock itself with flock, we make 1950 * sure to allow cancellation of a misbehaving applications flock() 1951 * request. 1952 * 1953 * - Access to any flock lockres doesn't require concurrency, so we 1954 * can simplify the code by requiring the caller to guarantee 1955 * serialization of dlmglue flock calls. 1956 */ 1957 int ocfs2_file_lock(struct file *file, int ex, int trylock) 1958 { 1959 int ret, level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 1960 unsigned int lkm_flags = trylock ? DLM_LKF_NOQUEUE : 0; 1961 unsigned long flags; 1962 struct ocfs2_file_private *fp = file->private_data; 1963 struct ocfs2_lock_res *lockres = &fp->fp_flock; 1964 struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb); 1965 struct ocfs2_mask_waiter mw; 1966 1967 ocfs2_init_mask_waiter(&mw); 1968 1969 if ((lockres->l_flags & OCFS2_LOCK_BUSY) || 1970 (lockres->l_level > DLM_LOCK_NL)) { 1971 mlog(ML_ERROR, 1972 "File lock \"%s\" has busy or locked state: flags: 0x%lx, " 1973 "level: %u\n", lockres->l_name, lockres->l_flags, 1974 lockres->l_level); 1975 return -EINVAL; 1976 } 1977 1978 spin_lock_irqsave(&lockres->l_lock, flags); 1979 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 1980 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1981 spin_unlock_irqrestore(&lockres->l_lock, flags); 1982 1983 /* 1984 * Get the lock at NLMODE to start - that way we 1985 * can cancel the upconvert request if need be. 1986 */ 1987 ret = ocfs2_lock_create(osb, lockres, DLM_LOCK_NL, 0); 1988 if (ret < 0) { 1989 mlog_errno(ret); 1990 goto out; 1991 } 1992 1993 ret = ocfs2_wait_for_mask(&mw); 1994 if (ret) { 1995 mlog_errno(ret); 1996 goto out; 1997 } 1998 spin_lock_irqsave(&lockres->l_lock, flags); 1999 } 2000 2001 lockres->l_action = OCFS2_AST_CONVERT; 2002 lkm_flags |= DLM_LKF_CONVERT; 2003 lockres->l_requested = level; 2004 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 2005 2006 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 2007 spin_unlock_irqrestore(&lockres->l_lock, flags); 2008 2009 ret = ocfs2_dlm_lock(osb->cconn, level, &lockres->l_lksb, lkm_flags, 2010 lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1); 2011 if (ret) { 2012 if (!trylock || (ret != -EAGAIN)) { 2013 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 2014 ret = -EINVAL; 2015 } 2016 2017 ocfs2_recover_from_dlm_error(lockres, 1); 2018 lockres_remove_mask_waiter(lockres, &mw); 2019 goto out; 2020 } 2021 2022 ret = ocfs2_wait_for_mask_interruptible(&mw, lockres); 2023 if (ret == -ERESTARTSYS) { 2024 /* 2025 * Userspace can cause deadlock itself with 2026 * flock(). Current behavior locally is to allow the 2027 * deadlock, but abort the system call if a signal is 2028 * received. We follow this example, otherwise a 2029 * poorly written program could sit in kernel until 2030 * reboot. 2031 * 2032 * Handling this is a bit more complicated for Ocfs2 2033 * though. We can't exit this function with an 2034 * outstanding lock request, so a cancel convert is 2035 * required. We intentionally overwrite 'ret' - if the 2036 * cancel fails and the lock was granted, it's easier 2037 * to just bubble success back up to the user. 2038 */ 2039 ret = ocfs2_flock_handle_signal(lockres, level); 2040 } else if (!ret && (level > lockres->l_level)) { 2041 /* Trylock failed asynchronously */ 2042 BUG_ON(!trylock); 2043 ret = -EAGAIN; 2044 } 2045 2046 out: 2047 2048 mlog(0, "Lock: \"%s\" ex: %d, trylock: %d, returns: %d\n", 2049 lockres->l_name, ex, trylock, ret); 2050 return ret; 2051 } 2052 2053 void ocfs2_file_unlock(struct file *file) 2054 { 2055 int ret; 2056 unsigned int gen; 2057 unsigned long flags; 2058 struct ocfs2_file_private *fp = file->private_data; 2059 struct ocfs2_lock_res *lockres = &fp->fp_flock; 2060 struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb); 2061 struct ocfs2_mask_waiter mw; 2062 2063 ocfs2_init_mask_waiter(&mw); 2064 2065 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) 2066 return; 2067 2068 if (lockres->l_level == DLM_LOCK_NL) 2069 return; 2070 2071 mlog(0, "Unlock: \"%s\" flags: 0x%lx, level: %d, act: %d\n", 2072 lockres->l_name, lockres->l_flags, lockres->l_level, 2073 lockres->l_action); 2074 2075 spin_lock_irqsave(&lockres->l_lock, flags); 2076 /* 2077 * Fake a blocking ast for the downconvert code. 2078 */ 2079 lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED); 2080 lockres->l_blocking = DLM_LOCK_EX; 2081 2082 gen = ocfs2_prepare_downconvert(lockres, DLM_LOCK_NL); 2083 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 2084 spin_unlock_irqrestore(&lockres->l_lock, flags); 2085 2086 ret = ocfs2_downconvert_lock(osb, lockres, DLM_LOCK_NL, 0, gen); 2087 if (ret) { 2088 mlog_errno(ret); 2089 return; 2090 } 2091 2092 ret = ocfs2_wait_for_mask(&mw); 2093 if (ret) 2094 mlog_errno(ret); 2095 } 2096 2097 static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb, 2098 struct ocfs2_lock_res *lockres) 2099 { 2100 int kick = 0; 2101 2102 /* If we know that another node is waiting on our lock, kick 2103 * the downconvert thread * pre-emptively when we reach a release 2104 * condition. */ 2105 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) { 2106 switch(lockres->l_blocking) { 2107 case DLM_LOCK_EX: 2108 if (!lockres->l_ex_holders && !lockres->l_ro_holders) 2109 kick = 1; 2110 break; 2111 case DLM_LOCK_PR: 2112 if (!lockres->l_ex_holders) 2113 kick = 1; 2114 break; 2115 default: 2116 BUG(); 2117 } 2118 } 2119 2120 if (kick) 2121 ocfs2_wake_downconvert_thread(osb); 2122 } 2123 2124 #define OCFS2_SEC_BITS 34 2125 #define OCFS2_SEC_SHIFT (64 - 34) 2126 #define OCFS2_NSEC_MASK ((1ULL << OCFS2_SEC_SHIFT) - 1) 2127 2128 /* LVB only has room for 64 bits of time here so we pack it for 2129 * now. */ 2130 static u64 ocfs2_pack_timespec(struct timespec *spec) 2131 { 2132 u64 res; 2133 u64 sec = spec->tv_sec; 2134 u32 nsec = spec->tv_nsec; 2135 2136 res = (sec << OCFS2_SEC_SHIFT) | (nsec & OCFS2_NSEC_MASK); 2137 2138 return res; 2139 } 2140 2141 /* Call this with the lockres locked. I am reasonably sure we don't 2142 * need ip_lock in this function as anyone who would be changing those 2143 * values is supposed to be blocked in ocfs2_inode_lock right now. */ 2144 static void __ocfs2_stuff_meta_lvb(struct inode *inode) 2145 { 2146 struct ocfs2_inode_info *oi = OCFS2_I(inode); 2147 struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; 2148 struct ocfs2_meta_lvb *lvb; 2149 2150 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2151 2152 /* 2153 * Invalidate the LVB of a deleted inode - this way other 2154 * nodes are forced to go to disk and discover the new inode 2155 * status. 2156 */ 2157 if (oi->ip_flags & OCFS2_INODE_DELETED) { 2158 lvb->lvb_version = 0; 2159 goto out; 2160 } 2161 2162 lvb->lvb_version = OCFS2_LVB_VERSION; 2163 lvb->lvb_isize = cpu_to_be64(i_size_read(inode)); 2164 lvb->lvb_iclusters = cpu_to_be32(oi->ip_clusters); 2165 lvb->lvb_iuid = cpu_to_be32(i_uid_read(inode)); 2166 lvb->lvb_igid = cpu_to_be32(i_gid_read(inode)); 2167 lvb->lvb_imode = cpu_to_be16(inode->i_mode); 2168 lvb->lvb_inlink = cpu_to_be16(inode->i_nlink); 2169 lvb->lvb_iatime_packed = 2170 cpu_to_be64(ocfs2_pack_timespec(&inode->i_atime)); 2171 lvb->lvb_ictime_packed = 2172 cpu_to_be64(ocfs2_pack_timespec(&inode->i_ctime)); 2173 lvb->lvb_imtime_packed = 2174 cpu_to_be64(ocfs2_pack_timespec(&inode->i_mtime)); 2175 lvb->lvb_iattr = cpu_to_be32(oi->ip_attr); 2176 lvb->lvb_idynfeatures = cpu_to_be16(oi->ip_dyn_features); 2177 lvb->lvb_igeneration = cpu_to_be32(inode->i_generation); 2178 2179 out: 2180 mlog_meta_lvb(0, lockres); 2181 } 2182 2183 static void ocfs2_unpack_timespec(struct timespec *spec, 2184 u64 packed_time) 2185 { 2186 spec->tv_sec = packed_time >> OCFS2_SEC_SHIFT; 2187 spec->tv_nsec = packed_time & OCFS2_NSEC_MASK; 2188 } 2189 2190 static void ocfs2_refresh_inode_from_lvb(struct inode *inode) 2191 { 2192 struct ocfs2_inode_info *oi = OCFS2_I(inode); 2193 struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; 2194 struct ocfs2_meta_lvb *lvb; 2195 2196 mlog_meta_lvb(0, lockres); 2197 2198 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2199 2200 /* We're safe here without the lockres lock... */ 2201 spin_lock(&oi->ip_lock); 2202 oi->ip_clusters = be32_to_cpu(lvb->lvb_iclusters); 2203 i_size_write(inode, be64_to_cpu(lvb->lvb_isize)); 2204 2205 oi->ip_attr = be32_to_cpu(lvb->lvb_iattr); 2206 oi->ip_dyn_features = be16_to_cpu(lvb->lvb_idynfeatures); 2207 ocfs2_set_inode_flags(inode); 2208 2209 /* fast-symlinks are a special case */ 2210 if (S_ISLNK(inode->i_mode) && !oi->ip_clusters) 2211 inode->i_blocks = 0; 2212 else 2213 inode->i_blocks = ocfs2_inode_sector_count(inode); 2214 2215 i_uid_write(inode, be32_to_cpu(lvb->lvb_iuid)); 2216 i_gid_write(inode, be32_to_cpu(lvb->lvb_igid)); 2217 inode->i_mode = be16_to_cpu(lvb->lvb_imode); 2218 set_nlink(inode, be16_to_cpu(lvb->lvb_inlink)); 2219 ocfs2_unpack_timespec(&inode->i_atime, 2220 be64_to_cpu(lvb->lvb_iatime_packed)); 2221 ocfs2_unpack_timespec(&inode->i_mtime, 2222 be64_to_cpu(lvb->lvb_imtime_packed)); 2223 ocfs2_unpack_timespec(&inode->i_ctime, 2224 be64_to_cpu(lvb->lvb_ictime_packed)); 2225 spin_unlock(&oi->ip_lock); 2226 } 2227 2228 static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode, 2229 struct ocfs2_lock_res *lockres) 2230 { 2231 struct ocfs2_meta_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2232 2233 if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) 2234 && lvb->lvb_version == OCFS2_LVB_VERSION 2235 && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation) 2236 return 1; 2237 return 0; 2238 } 2239 2240 /* Determine whether a lock resource needs to be refreshed, and 2241 * arbitrate who gets to refresh it. 2242 * 2243 * 0 means no refresh needed. 2244 * 2245 * > 0 means you need to refresh this and you MUST call 2246 * ocfs2_complete_lock_res_refresh afterwards. */ 2247 static int ocfs2_should_refresh_lock_res(struct ocfs2_lock_res *lockres) 2248 { 2249 unsigned long flags; 2250 int status = 0; 2251 2252 refresh_check: 2253 spin_lock_irqsave(&lockres->l_lock, flags); 2254 if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) { 2255 spin_unlock_irqrestore(&lockres->l_lock, flags); 2256 goto bail; 2257 } 2258 2259 if (lockres->l_flags & OCFS2_LOCK_REFRESHING) { 2260 spin_unlock_irqrestore(&lockres->l_lock, flags); 2261 2262 ocfs2_wait_on_refreshing_lock(lockres); 2263 goto refresh_check; 2264 } 2265 2266 /* Ok, I'll be the one to refresh this lock. */ 2267 lockres_or_flags(lockres, OCFS2_LOCK_REFRESHING); 2268 spin_unlock_irqrestore(&lockres->l_lock, flags); 2269 2270 status = 1; 2271 bail: 2272 mlog(0, "status %d\n", status); 2273 return status; 2274 } 2275 2276 /* If status is non zero, I'll mark it as not being in refresh 2277 * anymroe, but i won't clear the needs refresh flag. */ 2278 static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockres, 2279 int status) 2280 { 2281 unsigned long flags; 2282 2283 spin_lock_irqsave(&lockres->l_lock, flags); 2284 lockres_clear_flags(lockres, OCFS2_LOCK_REFRESHING); 2285 if (!status) 2286 lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 2287 spin_unlock_irqrestore(&lockres->l_lock, flags); 2288 2289 wake_up(&lockres->l_event); 2290 } 2291 2292 /* may or may not return a bh if it went to disk. */ 2293 static int ocfs2_inode_lock_update(struct inode *inode, 2294 struct buffer_head **bh) 2295 { 2296 int status = 0; 2297 struct ocfs2_inode_info *oi = OCFS2_I(inode); 2298 struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; 2299 struct ocfs2_dinode *fe; 2300 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 2301 2302 if (ocfs2_mount_local(osb)) 2303 goto bail; 2304 2305 spin_lock(&oi->ip_lock); 2306 if (oi->ip_flags & OCFS2_INODE_DELETED) { 2307 mlog(0, "Orphaned inode %llu was deleted while we " 2308 "were waiting on a lock. ip_flags = 0x%x\n", 2309 (unsigned long long)oi->ip_blkno, oi->ip_flags); 2310 spin_unlock(&oi->ip_lock); 2311 status = -ENOENT; 2312 goto bail; 2313 } 2314 spin_unlock(&oi->ip_lock); 2315 2316 if (!ocfs2_should_refresh_lock_res(lockres)) 2317 goto bail; 2318 2319 /* This will discard any caching information we might have had 2320 * for the inode metadata. */ 2321 ocfs2_metadata_cache_purge(INODE_CACHE(inode)); 2322 2323 ocfs2_extent_map_trunc(inode, 0); 2324 2325 if (ocfs2_meta_lvb_is_trustable(inode, lockres)) { 2326 mlog(0, "Trusting LVB on inode %llu\n", 2327 (unsigned long long)oi->ip_blkno); 2328 ocfs2_refresh_inode_from_lvb(inode); 2329 } else { 2330 /* Boo, we have to go to disk. */ 2331 /* read bh, cast, ocfs2_refresh_inode */ 2332 status = ocfs2_read_inode_block(inode, bh); 2333 if (status < 0) { 2334 mlog_errno(status); 2335 goto bail_refresh; 2336 } 2337 fe = (struct ocfs2_dinode *) (*bh)->b_data; 2338 2339 /* This is a good chance to make sure we're not 2340 * locking an invalid object. ocfs2_read_inode_block() 2341 * already checked that the inode block is sane. 2342 * 2343 * We bug on a stale inode here because we checked 2344 * above whether it was wiped from disk. The wiping 2345 * node provides a guarantee that we receive that 2346 * message and can mark the inode before dropping any 2347 * locks associated with it. */ 2348 mlog_bug_on_msg(inode->i_generation != 2349 le32_to_cpu(fe->i_generation), 2350 "Invalid dinode %llu disk generation: %u " 2351 "inode->i_generation: %u\n", 2352 (unsigned long long)oi->ip_blkno, 2353 le32_to_cpu(fe->i_generation), 2354 inode->i_generation); 2355 mlog_bug_on_msg(le64_to_cpu(fe->i_dtime) || 2356 !(fe->i_flags & cpu_to_le32(OCFS2_VALID_FL)), 2357 "Stale dinode %llu dtime: %llu flags: 0x%x\n", 2358 (unsigned long long)oi->ip_blkno, 2359 (unsigned long long)le64_to_cpu(fe->i_dtime), 2360 le32_to_cpu(fe->i_flags)); 2361 2362 ocfs2_refresh_inode(inode, fe); 2363 ocfs2_track_lock_refresh(lockres); 2364 } 2365 2366 status = 0; 2367 bail_refresh: 2368 ocfs2_complete_lock_res_refresh(lockres, status); 2369 bail: 2370 return status; 2371 } 2372 2373 static int ocfs2_assign_bh(struct inode *inode, 2374 struct buffer_head **ret_bh, 2375 struct buffer_head *passed_bh) 2376 { 2377 int status; 2378 2379 if (passed_bh) { 2380 /* Ok, the update went to disk for us, use the 2381 * returned bh. */ 2382 *ret_bh = passed_bh; 2383 get_bh(*ret_bh); 2384 2385 return 0; 2386 } 2387 2388 status = ocfs2_read_inode_block(inode, ret_bh); 2389 if (status < 0) 2390 mlog_errno(status); 2391 2392 return status; 2393 } 2394 2395 /* 2396 * returns < 0 error if the callback will never be called, otherwise 2397 * the result of the lock will be communicated via the callback. 2398 */ 2399 int ocfs2_inode_lock_full_nested(struct inode *inode, 2400 struct buffer_head **ret_bh, 2401 int ex, 2402 int arg_flags, 2403 int subclass) 2404 { 2405 int status, level, acquired; 2406 u32 dlm_flags; 2407 struct ocfs2_lock_res *lockres = NULL; 2408 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 2409 struct buffer_head *local_bh = NULL; 2410 2411 mlog(0, "inode %llu, take %s META lock\n", 2412 (unsigned long long)OCFS2_I(inode)->ip_blkno, 2413 ex ? "EXMODE" : "PRMODE"); 2414 2415 status = 0; 2416 acquired = 0; 2417 /* We'll allow faking a readonly metadata lock for 2418 * rodevices. */ 2419 if (ocfs2_is_hard_readonly(osb)) { 2420 if (ex) 2421 status = -EROFS; 2422 goto getbh; 2423 } 2424 2425 if ((arg_flags & OCFS2_META_LOCK_GETBH) || 2426 ocfs2_mount_local(osb)) 2427 goto update; 2428 2429 if (!(arg_flags & OCFS2_META_LOCK_RECOVERY)) 2430 ocfs2_wait_for_recovery(osb); 2431 2432 lockres = &OCFS2_I(inode)->ip_inode_lockres; 2433 level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2434 dlm_flags = 0; 2435 if (arg_flags & OCFS2_META_LOCK_NOQUEUE) 2436 dlm_flags |= DLM_LKF_NOQUEUE; 2437 2438 status = __ocfs2_cluster_lock(osb, lockres, level, dlm_flags, 2439 arg_flags, subclass, _RET_IP_); 2440 if (status < 0) { 2441 if (status != -EAGAIN) 2442 mlog_errno(status); 2443 goto bail; 2444 } 2445 2446 /* Notify the error cleanup path to drop the cluster lock. */ 2447 acquired = 1; 2448 2449 /* We wait twice because a node may have died while we were in 2450 * the lower dlm layers. The second time though, we've 2451 * committed to owning this lock so we don't allow signals to 2452 * abort the operation. */ 2453 if (!(arg_flags & OCFS2_META_LOCK_RECOVERY)) 2454 ocfs2_wait_for_recovery(osb); 2455 2456 update: 2457 /* 2458 * We only see this flag if we're being called from 2459 * ocfs2_read_locked_inode(). It means we're locking an inode 2460 * which hasn't been populated yet, so clear the refresh flag 2461 * and let the caller handle it. 2462 */ 2463 if (inode->i_state & I_NEW) { 2464 status = 0; 2465 if (lockres) 2466 ocfs2_complete_lock_res_refresh(lockres, 0); 2467 goto bail; 2468 } 2469 2470 /* This is fun. The caller may want a bh back, or it may 2471 * not. ocfs2_inode_lock_update definitely wants one in, but 2472 * may or may not read one, depending on what's in the 2473 * LVB. The result of all of this is that we've *only* gone to 2474 * disk if we have to, so the complexity is worthwhile. */ 2475 status = ocfs2_inode_lock_update(inode, &local_bh); 2476 if (status < 0) { 2477 if (status != -ENOENT) 2478 mlog_errno(status); 2479 goto bail; 2480 } 2481 getbh: 2482 if (ret_bh) { 2483 status = ocfs2_assign_bh(inode, ret_bh, local_bh); 2484 if (status < 0) { 2485 mlog_errno(status); 2486 goto bail; 2487 } 2488 } 2489 2490 bail: 2491 if (status < 0) { 2492 if (ret_bh && (*ret_bh)) { 2493 brelse(*ret_bh); 2494 *ret_bh = NULL; 2495 } 2496 if (acquired) 2497 ocfs2_inode_unlock(inode, ex); 2498 } 2499 2500 if (local_bh) 2501 brelse(local_bh); 2502 2503 return status; 2504 } 2505 2506 /* 2507 * This is working around a lock inversion between tasks acquiring DLM 2508 * locks while holding a page lock and the downconvert thread which 2509 * blocks dlm lock acquiry while acquiring page locks. 2510 * 2511 * ** These _with_page variantes are only intended to be called from aop 2512 * methods that hold page locks and return a very specific *positive* error 2513 * code that aop methods pass up to the VFS -- test for errors with != 0. ** 2514 * 2515 * The DLM is called such that it returns -EAGAIN if it would have 2516 * blocked waiting for the downconvert thread. In that case we unlock 2517 * our page so the downconvert thread can make progress. Once we've 2518 * done this we have to return AOP_TRUNCATED_PAGE so the aop method 2519 * that called us can bubble that back up into the VFS who will then 2520 * immediately retry the aop call. 2521 */ 2522 int ocfs2_inode_lock_with_page(struct inode *inode, 2523 struct buffer_head **ret_bh, 2524 int ex, 2525 struct page *page) 2526 { 2527 int ret; 2528 2529 ret = ocfs2_inode_lock_full(inode, ret_bh, ex, OCFS2_LOCK_NONBLOCK); 2530 if (ret == -EAGAIN) { 2531 unlock_page(page); 2532 /* 2533 * If we can't get inode lock immediately, we should not return 2534 * directly here, since this will lead to a softlockup problem. 2535 * The method is to get a blocking lock and immediately unlock 2536 * before returning, this can avoid CPU resource waste due to 2537 * lots of retries, and benefits fairness in getting lock. 2538 */ 2539 if (ocfs2_inode_lock(inode, ret_bh, ex) == 0) 2540 ocfs2_inode_unlock(inode, ex); 2541 ret = AOP_TRUNCATED_PAGE; 2542 } 2543 2544 return ret; 2545 } 2546 2547 int ocfs2_inode_lock_atime(struct inode *inode, 2548 struct vfsmount *vfsmnt, 2549 int *level, int wait) 2550 { 2551 int ret; 2552 2553 if (wait) 2554 ret = ocfs2_inode_lock(inode, NULL, 0); 2555 else 2556 ret = ocfs2_try_inode_lock(inode, NULL, 0); 2557 2558 if (ret < 0) { 2559 if (ret != -EAGAIN) 2560 mlog_errno(ret); 2561 return ret; 2562 } 2563 2564 /* 2565 * If we should update atime, we will get EX lock, 2566 * otherwise we just get PR lock. 2567 */ 2568 if (ocfs2_should_update_atime(inode, vfsmnt)) { 2569 struct buffer_head *bh = NULL; 2570 2571 ocfs2_inode_unlock(inode, 0); 2572 if (wait) 2573 ret = ocfs2_inode_lock(inode, &bh, 1); 2574 else 2575 ret = ocfs2_try_inode_lock(inode, &bh, 1); 2576 2577 if (ret < 0) { 2578 if (ret != -EAGAIN) 2579 mlog_errno(ret); 2580 return ret; 2581 } 2582 *level = 1; 2583 if (ocfs2_should_update_atime(inode, vfsmnt)) 2584 ocfs2_update_inode_atime(inode, bh); 2585 if (bh) 2586 brelse(bh); 2587 } else 2588 *level = 0; 2589 2590 return ret; 2591 } 2592 2593 void ocfs2_inode_unlock(struct inode *inode, 2594 int ex) 2595 { 2596 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2597 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_inode_lockres; 2598 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 2599 2600 mlog(0, "inode %llu drop %s META lock\n", 2601 (unsigned long long)OCFS2_I(inode)->ip_blkno, 2602 ex ? "EXMODE" : "PRMODE"); 2603 2604 if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)) && 2605 !ocfs2_mount_local(osb)) 2606 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level); 2607 } 2608 2609 /* 2610 * This _tracker variantes are introduced to deal with the recursive cluster 2611 * locking issue. The idea is to keep track of a lock holder on the stack of 2612 * the current process. If there's a lock holder on the stack, we know the 2613 * task context is already protected by cluster locking. Currently, they're 2614 * used in some VFS entry routines. 2615 * 2616 * return < 0 on error, return == 0 if there's no lock holder on the stack 2617 * before this call, return == 1 if this call would be a recursive locking. 2618 */ 2619 int ocfs2_inode_lock_tracker(struct inode *inode, 2620 struct buffer_head **ret_bh, 2621 int ex, 2622 struct ocfs2_lock_holder *oh) 2623 { 2624 int status; 2625 int arg_flags = 0, has_locked; 2626 struct ocfs2_lock_res *lockres; 2627 2628 lockres = &OCFS2_I(inode)->ip_inode_lockres; 2629 has_locked = ocfs2_is_locked_by_me(lockres); 2630 /* Just get buffer head if the cluster lock has been taken */ 2631 if (has_locked) 2632 arg_flags = OCFS2_META_LOCK_GETBH; 2633 2634 if (likely(!has_locked || ret_bh)) { 2635 status = ocfs2_inode_lock_full(inode, ret_bh, ex, arg_flags); 2636 if (status < 0) { 2637 if (status != -ENOENT) 2638 mlog_errno(status); 2639 return status; 2640 } 2641 } 2642 if (!has_locked) 2643 ocfs2_add_holder(lockres, oh); 2644 2645 return has_locked; 2646 } 2647 2648 void ocfs2_inode_unlock_tracker(struct inode *inode, 2649 int ex, 2650 struct ocfs2_lock_holder *oh, 2651 int had_lock) 2652 { 2653 struct ocfs2_lock_res *lockres; 2654 2655 lockres = &OCFS2_I(inode)->ip_inode_lockres; 2656 /* had_lock means that the currect process already takes the cluster 2657 * lock previously. If had_lock is 1, we have nothing to do here, and 2658 * it will get unlocked where we got the lock. 2659 */ 2660 if (!had_lock) { 2661 ocfs2_remove_holder(lockres, oh); 2662 ocfs2_inode_unlock(inode, ex); 2663 } 2664 } 2665 2666 int ocfs2_orphan_scan_lock(struct ocfs2_super *osb, u32 *seqno) 2667 { 2668 struct ocfs2_lock_res *lockres; 2669 struct ocfs2_orphan_scan_lvb *lvb; 2670 int status = 0; 2671 2672 if (ocfs2_is_hard_readonly(osb)) 2673 return -EROFS; 2674 2675 if (ocfs2_mount_local(osb)) 2676 return 0; 2677 2678 lockres = &osb->osb_orphan_scan.os_lockres; 2679 status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0); 2680 if (status < 0) 2681 return status; 2682 2683 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2684 if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) && 2685 lvb->lvb_version == OCFS2_ORPHAN_LVB_VERSION) 2686 *seqno = be32_to_cpu(lvb->lvb_os_seqno); 2687 else 2688 *seqno = osb->osb_orphan_scan.os_seqno + 1; 2689 2690 return status; 2691 } 2692 2693 void ocfs2_orphan_scan_unlock(struct ocfs2_super *osb, u32 seqno) 2694 { 2695 struct ocfs2_lock_res *lockres; 2696 struct ocfs2_orphan_scan_lvb *lvb; 2697 2698 if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb)) { 2699 lockres = &osb->osb_orphan_scan.os_lockres; 2700 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2701 lvb->lvb_version = OCFS2_ORPHAN_LVB_VERSION; 2702 lvb->lvb_os_seqno = cpu_to_be32(seqno); 2703 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX); 2704 } 2705 } 2706 2707 int ocfs2_super_lock(struct ocfs2_super *osb, 2708 int ex) 2709 { 2710 int status = 0; 2711 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2712 struct ocfs2_lock_res *lockres = &osb->osb_super_lockres; 2713 2714 if (ocfs2_is_hard_readonly(osb)) 2715 return -EROFS; 2716 2717 if (ocfs2_mount_local(osb)) 2718 goto bail; 2719 2720 status = ocfs2_cluster_lock(osb, lockres, level, 0, 0); 2721 if (status < 0) { 2722 mlog_errno(status); 2723 goto bail; 2724 } 2725 2726 /* The super block lock path is really in the best position to 2727 * know when resources covered by the lock need to be 2728 * refreshed, so we do it here. Of course, making sense of 2729 * everything is up to the caller :) */ 2730 status = ocfs2_should_refresh_lock_res(lockres); 2731 if (status) { 2732 status = ocfs2_refresh_slot_info(osb); 2733 2734 ocfs2_complete_lock_res_refresh(lockres, status); 2735 2736 if (status < 0) { 2737 ocfs2_cluster_unlock(osb, lockres, level); 2738 mlog_errno(status); 2739 } 2740 ocfs2_track_lock_refresh(lockres); 2741 } 2742 bail: 2743 return status; 2744 } 2745 2746 void ocfs2_super_unlock(struct ocfs2_super *osb, 2747 int ex) 2748 { 2749 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2750 struct ocfs2_lock_res *lockres = &osb->osb_super_lockres; 2751 2752 if (!ocfs2_mount_local(osb)) 2753 ocfs2_cluster_unlock(osb, lockres, level); 2754 } 2755 2756 int ocfs2_rename_lock(struct ocfs2_super *osb) 2757 { 2758 int status; 2759 struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres; 2760 2761 if (ocfs2_is_hard_readonly(osb)) 2762 return -EROFS; 2763 2764 if (ocfs2_mount_local(osb)) 2765 return 0; 2766 2767 status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0); 2768 if (status < 0) 2769 mlog_errno(status); 2770 2771 return status; 2772 } 2773 2774 void ocfs2_rename_unlock(struct ocfs2_super *osb) 2775 { 2776 struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres; 2777 2778 if (!ocfs2_mount_local(osb)) 2779 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX); 2780 } 2781 2782 int ocfs2_nfs_sync_lock(struct ocfs2_super *osb, int ex) 2783 { 2784 int status; 2785 struct ocfs2_lock_res *lockres = &osb->osb_nfs_sync_lockres; 2786 2787 if (ocfs2_is_hard_readonly(osb)) 2788 return -EROFS; 2789 2790 if (ocfs2_mount_local(osb)) 2791 return 0; 2792 2793 status = ocfs2_cluster_lock(osb, lockres, ex ? LKM_EXMODE : LKM_PRMODE, 2794 0, 0); 2795 if (status < 0) 2796 mlog(ML_ERROR, "lock on nfs sync lock failed %d\n", status); 2797 2798 return status; 2799 } 2800 2801 void ocfs2_nfs_sync_unlock(struct ocfs2_super *osb, int ex) 2802 { 2803 struct ocfs2_lock_res *lockres = &osb->osb_nfs_sync_lockres; 2804 2805 if (!ocfs2_mount_local(osb)) 2806 ocfs2_cluster_unlock(osb, lockres, 2807 ex ? LKM_EXMODE : LKM_PRMODE); 2808 } 2809 2810 int ocfs2_trim_fs_lock(struct ocfs2_super *osb, 2811 struct ocfs2_trim_fs_info *info, int trylock) 2812 { 2813 int status; 2814 struct ocfs2_trim_fs_lvb *lvb; 2815 struct ocfs2_lock_res *lockres = &osb->osb_trim_fs_lockres; 2816 2817 if (info) 2818 info->tf_valid = 0; 2819 2820 if (ocfs2_is_hard_readonly(osb)) 2821 return -EROFS; 2822 2823 if (ocfs2_mount_local(osb)) 2824 return 0; 2825 2826 status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 2827 trylock ? DLM_LKF_NOQUEUE : 0, 0); 2828 if (status < 0) { 2829 if (status != -EAGAIN) 2830 mlog_errno(status); 2831 return status; 2832 } 2833 2834 if (info) { 2835 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2836 if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) && 2837 lvb->lvb_version == OCFS2_TRIMFS_LVB_VERSION) { 2838 info->tf_valid = 1; 2839 info->tf_success = lvb->lvb_success; 2840 info->tf_nodenum = be32_to_cpu(lvb->lvb_nodenum); 2841 info->tf_start = be64_to_cpu(lvb->lvb_start); 2842 info->tf_len = be64_to_cpu(lvb->lvb_len); 2843 info->tf_minlen = be64_to_cpu(lvb->lvb_minlen); 2844 info->tf_trimlen = be64_to_cpu(lvb->lvb_trimlen); 2845 } 2846 } 2847 2848 return status; 2849 } 2850 2851 void ocfs2_trim_fs_unlock(struct ocfs2_super *osb, 2852 struct ocfs2_trim_fs_info *info) 2853 { 2854 struct ocfs2_trim_fs_lvb *lvb; 2855 struct ocfs2_lock_res *lockres = &osb->osb_trim_fs_lockres; 2856 2857 if (ocfs2_mount_local(osb)) 2858 return; 2859 2860 if (info) { 2861 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2862 lvb->lvb_version = OCFS2_TRIMFS_LVB_VERSION; 2863 lvb->lvb_success = info->tf_success; 2864 lvb->lvb_nodenum = cpu_to_be32(info->tf_nodenum); 2865 lvb->lvb_start = cpu_to_be64(info->tf_start); 2866 lvb->lvb_len = cpu_to_be64(info->tf_len); 2867 lvb->lvb_minlen = cpu_to_be64(info->tf_minlen); 2868 lvb->lvb_trimlen = cpu_to_be64(info->tf_trimlen); 2869 } 2870 2871 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX); 2872 } 2873 2874 int ocfs2_dentry_lock(struct dentry *dentry, int ex) 2875 { 2876 int ret; 2877 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2878 struct ocfs2_dentry_lock *dl = dentry->d_fsdata; 2879 struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb); 2880 2881 BUG_ON(!dl); 2882 2883 if (ocfs2_is_hard_readonly(osb)) { 2884 if (ex) 2885 return -EROFS; 2886 return 0; 2887 } 2888 2889 if (ocfs2_mount_local(osb)) 2890 return 0; 2891 2892 ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0); 2893 if (ret < 0) 2894 mlog_errno(ret); 2895 2896 return ret; 2897 } 2898 2899 void ocfs2_dentry_unlock(struct dentry *dentry, int ex) 2900 { 2901 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2902 struct ocfs2_dentry_lock *dl = dentry->d_fsdata; 2903 struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb); 2904 2905 if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb)) 2906 ocfs2_cluster_unlock(osb, &dl->dl_lockres, level); 2907 } 2908 2909 /* Reference counting of the dlm debug structure. We want this because 2910 * open references on the debug inodes can live on after a mount, so 2911 * we can't rely on the ocfs2_super to always exist. */ 2912 static void ocfs2_dlm_debug_free(struct kref *kref) 2913 { 2914 struct ocfs2_dlm_debug *dlm_debug; 2915 2916 dlm_debug = container_of(kref, struct ocfs2_dlm_debug, d_refcnt); 2917 2918 kfree(dlm_debug); 2919 } 2920 2921 void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug) 2922 { 2923 if (dlm_debug) 2924 kref_put(&dlm_debug->d_refcnt, ocfs2_dlm_debug_free); 2925 } 2926 2927 static void ocfs2_get_dlm_debug(struct ocfs2_dlm_debug *debug) 2928 { 2929 kref_get(&debug->d_refcnt); 2930 } 2931 2932 struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void) 2933 { 2934 struct ocfs2_dlm_debug *dlm_debug; 2935 2936 dlm_debug = kmalloc(sizeof(struct ocfs2_dlm_debug), GFP_KERNEL); 2937 if (!dlm_debug) { 2938 mlog_errno(-ENOMEM); 2939 goto out; 2940 } 2941 2942 kref_init(&dlm_debug->d_refcnt); 2943 INIT_LIST_HEAD(&dlm_debug->d_lockres_tracking); 2944 dlm_debug->d_locking_state = NULL; 2945 out: 2946 return dlm_debug; 2947 } 2948 2949 /* Access to this is arbitrated for us via seq_file->sem. */ 2950 struct ocfs2_dlm_seq_priv { 2951 struct ocfs2_dlm_debug *p_dlm_debug; 2952 struct ocfs2_lock_res p_iter_res; 2953 struct ocfs2_lock_res p_tmp_res; 2954 }; 2955 2956 static struct ocfs2_lock_res *ocfs2_dlm_next_res(struct ocfs2_lock_res *start, 2957 struct ocfs2_dlm_seq_priv *priv) 2958 { 2959 struct ocfs2_lock_res *iter, *ret = NULL; 2960 struct ocfs2_dlm_debug *dlm_debug = priv->p_dlm_debug; 2961 2962 assert_spin_locked(&ocfs2_dlm_tracking_lock); 2963 2964 list_for_each_entry(iter, &start->l_debug_list, l_debug_list) { 2965 /* discover the head of the list */ 2966 if (&iter->l_debug_list == &dlm_debug->d_lockres_tracking) { 2967 mlog(0, "End of list found, %p\n", ret); 2968 break; 2969 } 2970 2971 /* We track our "dummy" iteration lockres' by a NULL 2972 * l_ops field. */ 2973 if (iter->l_ops != NULL) { 2974 ret = iter; 2975 break; 2976 } 2977 } 2978 2979 return ret; 2980 } 2981 2982 static void *ocfs2_dlm_seq_start(struct seq_file *m, loff_t *pos) 2983 { 2984 struct ocfs2_dlm_seq_priv *priv = m->private; 2985 struct ocfs2_lock_res *iter; 2986 2987 spin_lock(&ocfs2_dlm_tracking_lock); 2988 iter = ocfs2_dlm_next_res(&priv->p_iter_res, priv); 2989 if (iter) { 2990 /* Since lockres' have the lifetime of their container 2991 * (which can be inodes, ocfs2_supers, etc) we want to 2992 * copy this out to a temporary lockres while still 2993 * under the spinlock. Obviously after this we can't 2994 * trust any pointers on the copy returned, but that's 2995 * ok as the information we want isn't typically held 2996 * in them. */ 2997 priv->p_tmp_res = *iter; 2998 iter = &priv->p_tmp_res; 2999 } 3000 spin_unlock(&ocfs2_dlm_tracking_lock); 3001 3002 return iter; 3003 } 3004 3005 static void ocfs2_dlm_seq_stop(struct seq_file *m, void *v) 3006 { 3007 } 3008 3009 static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos) 3010 { 3011 struct ocfs2_dlm_seq_priv *priv = m->private; 3012 struct ocfs2_lock_res *iter = v; 3013 struct ocfs2_lock_res *dummy = &priv->p_iter_res; 3014 3015 spin_lock(&ocfs2_dlm_tracking_lock); 3016 iter = ocfs2_dlm_next_res(iter, priv); 3017 list_del_init(&dummy->l_debug_list); 3018 if (iter) { 3019 list_add(&dummy->l_debug_list, &iter->l_debug_list); 3020 priv->p_tmp_res = *iter; 3021 iter = &priv->p_tmp_res; 3022 } 3023 spin_unlock(&ocfs2_dlm_tracking_lock); 3024 3025 return iter; 3026 } 3027 3028 /* 3029 * Version is used by debugfs.ocfs2 to determine the format being used 3030 * 3031 * New in version 2 3032 * - Lock stats printed 3033 * New in version 3 3034 * - Max time in lock stats is in usecs (instead of nsecs) 3035 */ 3036 #define OCFS2_DLM_DEBUG_STR_VERSION 3 3037 static int ocfs2_dlm_seq_show(struct seq_file *m, void *v) 3038 { 3039 int i; 3040 char *lvb; 3041 struct ocfs2_lock_res *lockres = v; 3042 3043 if (!lockres) 3044 return -EINVAL; 3045 3046 seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION); 3047 3048 if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY) 3049 seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1, 3050 lockres->l_name, 3051 (unsigned int)ocfs2_get_dentry_lock_ino(lockres)); 3052 else 3053 seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name); 3054 3055 seq_printf(m, "%d\t" 3056 "0x%lx\t" 3057 "0x%x\t" 3058 "0x%x\t" 3059 "%u\t" 3060 "%u\t" 3061 "%d\t" 3062 "%d\t", 3063 lockres->l_level, 3064 lockres->l_flags, 3065 lockres->l_action, 3066 lockres->l_unlock_action, 3067 lockres->l_ro_holders, 3068 lockres->l_ex_holders, 3069 lockres->l_requested, 3070 lockres->l_blocking); 3071 3072 /* Dump the raw LVB */ 3073 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 3074 for(i = 0; i < DLM_LVB_LEN; i++) 3075 seq_printf(m, "0x%x\t", lvb[i]); 3076 3077 #ifdef CONFIG_OCFS2_FS_STATS 3078 # define lock_num_prmode(_l) ((_l)->l_lock_prmode.ls_gets) 3079 # define lock_num_exmode(_l) ((_l)->l_lock_exmode.ls_gets) 3080 # define lock_num_prmode_failed(_l) ((_l)->l_lock_prmode.ls_fail) 3081 # define lock_num_exmode_failed(_l) ((_l)->l_lock_exmode.ls_fail) 3082 # define lock_total_prmode(_l) ((_l)->l_lock_prmode.ls_total) 3083 # define lock_total_exmode(_l) ((_l)->l_lock_exmode.ls_total) 3084 # define lock_max_prmode(_l) ((_l)->l_lock_prmode.ls_max) 3085 # define lock_max_exmode(_l) ((_l)->l_lock_exmode.ls_max) 3086 # define lock_refresh(_l) ((_l)->l_lock_refresh) 3087 #else 3088 # define lock_num_prmode(_l) (0) 3089 # define lock_num_exmode(_l) (0) 3090 # define lock_num_prmode_failed(_l) (0) 3091 # define lock_num_exmode_failed(_l) (0) 3092 # define lock_total_prmode(_l) (0ULL) 3093 # define lock_total_exmode(_l) (0ULL) 3094 # define lock_max_prmode(_l) (0) 3095 # define lock_max_exmode(_l) (0) 3096 # define lock_refresh(_l) (0) 3097 #endif 3098 /* The following seq_print was added in version 2 of this output */ 3099 seq_printf(m, "%u\t" 3100 "%u\t" 3101 "%u\t" 3102 "%u\t" 3103 "%llu\t" 3104 "%llu\t" 3105 "%u\t" 3106 "%u\t" 3107 "%u\t", 3108 lock_num_prmode(lockres), 3109 lock_num_exmode(lockres), 3110 lock_num_prmode_failed(lockres), 3111 lock_num_exmode_failed(lockres), 3112 lock_total_prmode(lockres), 3113 lock_total_exmode(lockres), 3114 lock_max_prmode(lockres), 3115 lock_max_exmode(lockres), 3116 lock_refresh(lockres)); 3117 3118 /* End the line */ 3119 seq_printf(m, "\n"); 3120 return 0; 3121 } 3122 3123 static const struct seq_operations ocfs2_dlm_seq_ops = { 3124 .start = ocfs2_dlm_seq_start, 3125 .stop = ocfs2_dlm_seq_stop, 3126 .next = ocfs2_dlm_seq_next, 3127 .show = ocfs2_dlm_seq_show, 3128 }; 3129 3130 static int ocfs2_dlm_debug_release(struct inode *inode, struct file *file) 3131 { 3132 struct seq_file *seq = file->private_data; 3133 struct ocfs2_dlm_seq_priv *priv = seq->private; 3134 struct ocfs2_lock_res *res = &priv->p_iter_res; 3135 3136 ocfs2_remove_lockres_tracking(res); 3137 ocfs2_put_dlm_debug(priv->p_dlm_debug); 3138 return seq_release_private(inode, file); 3139 } 3140 3141 static int ocfs2_dlm_debug_open(struct inode *inode, struct file *file) 3142 { 3143 struct ocfs2_dlm_seq_priv *priv; 3144 struct ocfs2_super *osb; 3145 3146 priv = __seq_open_private(file, &ocfs2_dlm_seq_ops, sizeof(*priv)); 3147 if (!priv) { 3148 mlog_errno(-ENOMEM); 3149 return -ENOMEM; 3150 } 3151 3152 osb = inode->i_private; 3153 ocfs2_get_dlm_debug(osb->osb_dlm_debug); 3154 priv->p_dlm_debug = osb->osb_dlm_debug; 3155 INIT_LIST_HEAD(&priv->p_iter_res.l_debug_list); 3156 3157 ocfs2_add_lockres_tracking(&priv->p_iter_res, 3158 priv->p_dlm_debug); 3159 3160 return 0; 3161 } 3162 3163 static const struct file_operations ocfs2_dlm_debug_fops = { 3164 .open = ocfs2_dlm_debug_open, 3165 .release = ocfs2_dlm_debug_release, 3166 .read = seq_read, 3167 .llseek = seq_lseek, 3168 }; 3169 3170 static int ocfs2_dlm_init_debug(struct ocfs2_super *osb) 3171 { 3172 int ret = 0; 3173 struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug; 3174 3175 dlm_debug->d_locking_state = debugfs_create_file("locking_state", 3176 S_IFREG|S_IRUSR, 3177 osb->osb_debug_root, 3178 osb, 3179 &ocfs2_dlm_debug_fops); 3180 if (!dlm_debug->d_locking_state) { 3181 ret = -EINVAL; 3182 mlog(ML_ERROR, 3183 "Unable to create locking state debugfs file.\n"); 3184 goto out; 3185 } 3186 3187 ocfs2_get_dlm_debug(dlm_debug); 3188 out: 3189 return ret; 3190 } 3191 3192 static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb) 3193 { 3194 struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug; 3195 3196 if (dlm_debug) { 3197 debugfs_remove(dlm_debug->d_locking_state); 3198 ocfs2_put_dlm_debug(dlm_debug); 3199 } 3200 } 3201 3202 int ocfs2_dlm_init(struct ocfs2_super *osb) 3203 { 3204 int status = 0; 3205 struct ocfs2_cluster_connection *conn = NULL; 3206 3207 if (ocfs2_mount_local(osb)) { 3208 osb->node_num = 0; 3209 goto local; 3210 } 3211 3212 status = ocfs2_dlm_init_debug(osb); 3213 if (status < 0) { 3214 mlog_errno(status); 3215 goto bail; 3216 } 3217 3218 /* launch downconvert thread */ 3219 osb->dc_task = kthread_run(ocfs2_downconvert_thread, osb, "ocfs2dc-%s", 3220 osb->uuid_str); 3221 if (IS_ERR(osb->dc_task)) { 3222 status = PTR_ERR(osb->dc_task); 3223 osb->dc_task = NULL; 3224 mlog_errno(status); 3225 goto bail; 3226 } 3227 3228 /* for now, uuid == domain */ 3229 status = ocfs2_cluster_connect(osb->osb_cluster_stack, 3230 osb->osb_cluster_name, 3231 strlen(osb->osb_cluster_name), 3232 osb->uuid_str, 3233 strlen(osb->uuid_str), 3234 &lproto, ocfs2_do_node_down, osb, 3235 &conn); 3236 if (status) { 3237 mlog_errno(status); 3238 goto bail; 3239 } 3240 3241 status = ocfs2_cluster_this_node(conn, &osb->node_num); 3242 if (status < 0) { 3243 mlog_errno(status); 3244 mlog(ML_ERROR, 3245 "could not find this host's node number\n"); 3246 ocfs2_cluster_disconnect(conn, 0); 3247 goto bail; 3248 } 3249 3250 local: 3251 ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb); 3252 ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb); 3253 ocfs2_nfs_sync_lock_res_init(&osb->osb_nfs_sync_lockres, osb); 3254 ocfs2_orphan_scan_lock_res_init(&osb->osb_orphan_scan.os_lockres, osb); 3255 3256 osb->cconn = conn; 3257 bail: 3258 if (status < 0) { 3259 ocfs2_dlm_shutdown_debug(osb); 3260 if (osb->dc_task) 3261 kthread_stop(osb->dc_task); 3262 } 3263 3264 return status; 3265 } 3266 3267 void ocfs2_dlm_shutdown(struct ocfs2_super *osb, 3268 int hangup_pending) 3269 { 3270 ocfs2_drop_osb_locks(osb); 3271 3272 /* 3273 * Now that we have dropped all locks and ocfs2_dismount_volume() 3274 * has disabled recovery, the DLM won't be talking to us. It's 3275 * safe to tear things down before disconnecting the cluster. 3276 */ 3277 3278 if (osb->dc_task) { 3279 kthread_stop(osb->dc_task); 3280 osb->dc_task = NULL; 3281 } 3282 3283 ocfs2_lock_res_free(&osb->osb_super_lockres); 3284 ocfs2_lock_res_free(&osb->osb_rename_lockres); 3285 ocfs2_lock_res_free(&osb->osb_nfs_sync_lockres); 3286 ocfs2_lock_res_free(&osb->osb_orphan_scan.os_lockres); 3287 3288 ocfs2_cluster_disconnect(osb->cconn, hangup_pending); 3289 osb->cconn = NULL; 3290 3291 ocfs2_dlm_shutdown_debug(osb); 3292 } 3293 3294 static int ocfs2_drop_lock(struct ocfs2_super *osb, 3295 struct ocfs2_lock_res *lockres) 3296 { 3297 int ret; 3298 unsigned long flags; 3299 u32 lkm_flags = 0; 3300 3301 /* We didn't get anywhere near actually using this lockres. */ 3302 if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED)) 3303 goto out; 3304 3305 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) 3306 lkm_flags |= DLM_LKF_VALBLK; 3307 3308 spin_lock_irqsave(&lockres->l_lock, flags); 3309 3310 mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_FREEING), 3311 "lockres %s, flags 0x%lx\n", 3312 lockres->l_name, lockres->l_flags); 3313 3314 while (lockres->l_flags & OCFS2_LOCK_BUSY) { 3315 mlog(0, "waiting on busy lock \"%s\": flags = %lx, action = " 3316 "%u, unlock_action = %u\n", 3317 lockres->l_name, lockres->l_flags, lockres->l_action, 3318 lockres->l_unlock_action); 3319 3320 spin_unlock_irqrestore(&lockres->l_lock, flags); 3321 3322 /* XXX: Today we just wait on any busy 3323 * locks... Perhaps we need to cancel converts in the 3324 * future? */ 3325 ocfs2_wait_on_busy_lock(lockres); 3326 3327 spin_lock_irqsave(&lockres->l_lock, flags); 3328 } 3329 3330 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) { 3331 if (lockres->l_flags & OCFS2_LOCK_ATTACHED && 3332 lockres->l_level == DLM_LOCK_EX && 3333 !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) 3334 lockres->l_ops->set_lvb(lockres); 3335 } 3336 3337 if (lockres->l_flags & OCFS2_LOCK_BUSY) 3338 mlog(ML_ERROR, "destroying busy lock: \"%s\"\n", 3339 lockres->l_name); 3340 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) 3341 mlog(0, "destroying blocked lock: \"%s\"\n", lockres->l_name); 3342 3343 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 3344 spin_unlock_irqrestore(&lockres->l_lock, flags); 3345 goto out; 3346 } 3347 3348 lockres_clear_flags(lockres, OCFS2_LOCK_ATTACHED); 3349 3350 /* make sure we never get here while waiting for an ast to 3351 * fire. */ 3352 BUG_ON(lockres->l_action != OCFS2_AST_INVALID); 3353 3354 /* is this necessary? */ 3355 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 3356 lockres->l_unlock_action = OCFS2_UNLOCK_DROP_LOCK; 3357 spin_unlock_irqrestore(&lockres->l_lock, flags); 3358 3359 mlog(0, "lock %s\n", lockres->l_name); 3360 3361 ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, lkm_flags); 3362 if (ret) { 3363 ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres); 3364 mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags); 3365 ocfs2_dlm_dump_lksb(&lockres->l_lksb); 3366 BUG(); 3367 } 3368 mlog(0, "lock %s, successful return from ocfs2_dlm_unlock\n", 3369 lockres->l_name); 3370 3371 ocfs2_wait_on_busy_lock(lockres); 3372 out: 3373 return 0; 3374 } 3375 3376 static void ocfs2_process_blocked_lock(struct ocfs2_super *osb, 3377 struct ocfs2_lock_res *lockres); 3378 3379 /* Mark the lockres as being dropped. It will no longer be 3380 * queued if blocking, but we still may have to wait on it 3381 * being dequeued from the downconvert thread before we can consider 3382 * it safe to drop. 3383 * 3384 * You can *not* attempt to call cluster_lock on this lockres anymore. */ 3385 void ocfs2_mark_lockres_freeing(struct ocfs2_super *osb, 3386 struct ocfs2_lock_res *lockres) 3387 { 3388 int status; 3389 struct ocfs2_mask_waiter mw; 3390 unsigned long flags, flags2; 3391 3392 ocfs2_init_mask_waiter(&mw); 3393 3394 spin_lock_irqsave(&lockres->l_lock, flags); 3395 lockres->l_flags |= OCFS2_LOCK_FREEING; 3396 if (lockres->l_flags & OCFS2_LOCK_QUEUED && current == osb->dc_task) { 3397 /* 3398 * We know the downconvert is queued but not in progress 3399 * because we are the downconvert thread and processing 3400 * different lock. So we can just remove the lock from the 3401 * queue. This is not only an optimization but also a way 3402 * to avoid the following deadlock: 3403 * ocfs2_dentry_post_unlock() 3404 * ocfs2_dentry_lock_put() 3405 * ocfs2_drop_dentry_lock() 3406 * iput() 3407 * ocfs2_evict_inode() 3408 * ocfs2_clear_inode() 3409 * ocfs2_mark_lockres_freeing() 3410 * ... blocks waiting for OCFS2_LOCK_QUEUED 3411 * since we are the downconvert thread which 3412 * should clear the flag. 3413 */ 3414 spin_unlock_irqrestore(&lockres->l_lock, flags); 3415 spin_lock_irqsave(&osb->dc_task_lock, flags2); 3416 list_del_init(&lockres->l_blocked_list); 3417 osb->blocked_lock_count--; 3418 spin_unlock_irqrestore(&osb->dc_task_lock, flags2); 3419 /* 3420 * Warn if we recurse into another post_unlock call. Strictly 3421 * speaking it isn't a problem but we need to be careful if 3422 * that happens (stack overflow, deadlocks, ...) so warn if 3423 * ocfs2 grows a path for which this can happen. 3424 */ 3425 WARN_ON_ONCE(lockres->l_ops->post_unlock); 3426 /* Since the lock is freeing we don't do much in the fn below */ 3427 ocfs2_process_blocked_lock(osb, lockres); 3428 return; 3429 } 3430 while (lockres->l_flags & OCFS2_LOCK_QUEUED) { 3431 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_QUEUED, 0); 3432 spin_unlock_irqrestore(&lockres->l_lock, flags); 3433 3434 mlog(0, "Waiting on lockres %s\n", lockres->l_name); 3435 3436 status = ocfs2_wait_for_mask(&mw); 3437 if (status) 3438 mlog_errno(status); 3439 3440 spin_lock_irqsave(&lockres->l_lock, flags); 3441 } 3442 spin_unlock_irqrestore(&lockres->l_lock, flags); 3443 } 3444 3445 void ocfs2_simple_drop_lockres(struct ocfs2_super *osb, 3446 struct ocfs2_lock_res *lockres) 3447 { 3448 int ret; 3449 3450 ocfs2_mark_lockres_freeing(osb, lockres); 3451 ret = ocfs2_drop_lock(osb, lockres); 3452 if (ret) 3453 mlog_errno(ret); 3454 } 3455 3456 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb) 3457 { 3458 ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres); 3459 ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres); 3460 ocfs2_simple_drop_lockres(osb, &osb->osb_nfs_sync_lockres); 3461 ocfs2_simple_drop_lockres(osb, &osb->osb_orphan_scan.os_lockres); 3462 } 3463 3464 int ocfs2_drop_inode_locks(struct inode *inode) 3465 { 3466 int status, err; 3467 3468 /* No need to call ocfs2_mark_lockres_freeing here - 3469 * ocfs2_clear_inode has done it for us. */ 3470 3471 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 3472 &OCFS2_I(inode)->ip_open_lockres); 3473 if (err < 0) 3474 mlog_errno(err); 3475 3476 status = err; 3477 3478 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 3479 &OCFS2_I(inode)->ip_inode_lockres); 3480 if (err < 0) 3481 mlog_errno(err); 3482 if (err < 0 && !status) 3483 status = err; 3484 3485 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 3486 &OCFS2_I(inode)->ip_rw_lockres); 3487 if (err < 0) 3488 mlog_errno(err); 3489 if (err < 0 && !status) 3490 status = err; 3491 3492 return status; 3493 } 3494 3495 static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres, 3496 int new_level) 3497 { 3498 assert_spin_locked(&lockres->l_lock); 3499 3500 BUG_ON(lockres->l_blocking <= DLM_LOCK_NL); 3501 3502 if (lockres->l_level <= new_level) { 3503 mlog(ML_ERROR, "lockres %s, lvl %d <= %d, blcklst %d, mask %d, " 3504 "type %d, flags 0x%lx, hold %d %d, act %d %d, req %d, " 3505 "block %d, pgen %d\n", lockres->l_name, lockres->l_level, 3506 new_level, list_empty(&lockres->l_blocked_list), 3507 list_empty(&lockres->l_mask_waiters), lockres->l_type, 3508 lockres->l_flags, lockres->l_ro_holders, 3509 lockres->l_ex_holders, lockres->l_action, 3510 lockres->l_unlock_action, lockres->l_requested, 3511 lockres->l_blocking, lockres->l_pending_gen); 3512 BUG(); 3513 } 3514 3515 mlog(ML_BASTS, "lockres %s, level %d => %d, blocking %d\n", 3516 lockres->l_name, lockres->l_level, new_level, lockres->l_blocking); 3517 3518 lockres->l_action = OCFS2_AST_DOWNCONVERT; 3519 lockres->l_requested = new_level; 3520 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 3521 return lockres_set_pending(lockres); 3522 } 3523 3524 static int ocfs2_downconvert_lock(struct ocfs2_super *osb, 3525 struct ocfs2_lock_res *lockres, 3526 int new_level, 3527 int lvb, 3528 unsigned int generation) 3529 { 3530 int ret; 3531 u32 dlm_flags = DLM_LKF_CONVERT; 3532 3533 mlog(ML_BASTS, "lockres %s, level %d => %d\n", lockres->l_name, 3534 lockres->l_level, new_level); 3535 3536 /* 3537 * On DLM_LKF_VALBLK, fsdlm behaves differently with o2cb. It always 3538 * expects DLM_LKF_VALBLK being set if the LKB has LVB, so that 3539 * we can recover correctly from node failure. Otherwise, we may get 3540 * invalid LVB in LKB, but without DLM_SBF_VALNOTVALID being set. 3541 */ 3542 if (!ocfs2_is_o2cb_active() && 3543 lockres->l_ops->flags & LOCK_TYPE_USES_LVB) 3544 lvb = 1; 3545 3546 if (lvb) 3547 dlm_flags |= DLM_LKF_VALBLK; 3548 3549 ret = ocfs2_dlm_lock(osb->cconn, 3550 new_level, 3551 &lockres->l_lksb, 3552 dlm_flags, 3553 lockres->l_name, 3554 OCFS2_LOCK_ID_MAX_LEN - 1); 3555 lockres_clear_pending(lockres, generation, osb); 3556 if (ret) { 3557 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 3558 ocfs2_recover_from_dlm_error(lockres, 1); 3559 goto bail; 3560 } 3561 3562 ret = 0; 3563 bail: 3564 return ret; 3565 } 3566 3567 /* returns 1 when the caller should unlock and call ocfs2_dlm_unlock */ 3568 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb, 3569 struct ocfs2_lock_res *lockres) 3570 { 3571 assert_spin_locked(&lockres->l_lock); 3572 3573 if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) { 3574 /* If we're already trying to cancel a lock conversion 3575 * then just drop the spinlock and allow the caller to 3576 * requeue this lock. */ 3577 mlog(ML_BASTS, "lockres %s, skip convert\n", lockres->l_name); 3578 return 0; 3579 } 3580 3581 /* were we in a convert when we got the bast fire? */ 3582 BUG_ON(lockres->l_action != OCFS2_AST_CONVERT && 3583 lockres->l_action != OCFS2_AST_DOWNCONVERT); 3584 /* set things up for the unlockast to know to just 3585 * clear out the ast_action and unset busy, etc. */ 3586 lockres->l_unlock_action = OCFS2_UNLOCK_CANCEL_CONVERT; 3587 3588 mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_BUSY), 3589 "lock %s, invalid flags: 0x%lx\n", 3590 lockres->l_name, lockres->l_flags); 3591 3592 mlog(ML_BASTS, "lockres %s\n", lockres->l_name); 3593 3594 return 1; 3595 } 3596 3597 static int ocfs2_cancel_convert(struct ocfs2_super *osb, 3598 struct ocfs2_lock_res *lockres) 3599 { 3600 int ret; 3601 3602 ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, 3603 DLM_LKF_CANCEL); 3604 if (ret) { 3605 ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres); 3606 ocfs2_recover_from_dlm_error(lockres, 0); 3607 } 3608 3609 mlog(ML_BASTS, "lockres %s\n", lockres->l_name); 3610 3611 return ret; 3612 } 3613 3614 static int ocfs2_unblock_lock(struct ocfs2_super *osb, 3615 struct ocfs2_lock_res *lockres, 3616 struct ocfs2_unblock_ctl *ctl) 3617 { 3618 unsigned long flags; 3619 int blocking; 3620 int new_level; 3621 int level; 3622 int ret = 0; 3623 int set_lvb = 0; 3624 unsigned int gen; 3625 3626 spin_lock_irqsave(&lockres->l_lock, flags); 3627 3628 recheck: 3629 /* 3630 * Is it still blocking? If not, we have no more work to do. 3631 */ 3632 if (!(lockres->l_flags & OCFS2_LOCK_BLOCKED)) { 3633 BUG_ON(lockres->l_blocking != DLM_LOCK_NL); 3634 spin_unlock_irqrestore(&lockres->l_lock, flags); 3635 ret = 0; 3636 goto leave; 3637 } 3638 3639 if (lockres->l_flags & OCFS2_LOCK_BUSY) { 3640 /* XXX 3641 * This is a *big* race. The OCFS2_LOCK_PENDING flag 3642 * exists entirely for one reason - another thread has set 3643 * OCFS2_LOCK_BUSY, but has *NOT* yet called dlm_lock(). 3644 * 3645 * If we do ocfs2_cancel_convert() before the other thread 3646 * calls dlm_lock(), our cancel will do nothing. We will 3647 * get no ast, and we will have no way of knowing the 3648 * cancel failed. Meanwhile, the other thread will call 3649 * into dlm_lock() and wait...forever. 3650 * 3651 * Why forever? Because another node has asked for the 3652 * lock first; that's why we're here in unblock_lock(). 3653 * 3654 * The solution is OCFS2_LOCK_PENDING. When PENDING is 3655 * set, we just requeue the unblock. Only when the other 3656 * thread has called dlm_lock() and cleared PENDING will 3657 * we then cancel their request. 3658 * 3659 * All callers of dlm_lock() must set OCFS2_DLM_PENDING 3660 * at the same time they set OCFS2_DLM_BUSY. They must 3661 * clear OCFS2_DLM_PENDING after dlm_lock() returns. 3662 */ 3663 if (lockres->l_flags & OCFS2_LOCK_PENDING) { 3664 mlog(ML_BASTS, "lockres %s, ReQ: Pending\n", 3665 lockres->l_name); 3666 goto leave_requeue; 3667 } 3668 3669 ctl->requeue = 1; 3670 ret = ocfs2_prepare_cancel_convert(osb, lockres); 3671 spin_unlock_irqrestore(&lockres->l_lock, flags); 3672 if (ret) { 3673 ret = ocfs2_cancel_convert(osb, lockres); 3674 if (ret < 0) 3675 mlog_errno(ret); 3676 } 3677 goto leave; 3678 } 3679 3680 /* 3681 * This prevents livelocks. OCFS2_LOCK_UPCONVERT_FINISHING flag is 3682 * set when the ast is received for an upconvert just before the 3683 * OCFS2_LOCK_BUSY flag is cleared. Now if the fs received a bast 3684 * on the heels of the ast, we want to delay the downconvert just 3685 * enough to allow the up requestor to do its task. Because this 3686 * lock is in the blocked queue, the lock will be downconverted 3687 * as soon as the requestor is done with the lock. 3688 */ 3689 if (lockres->l_flags & OCFS2_LOCK_UPCONVERT_FINISHING) 3690 goto leave_requeue; 3691 3692 /* 3693 * How can we block and yet be at NL? We were trying to upconvert 3694 * from NL and got canceled. The code comes back here, and now 3695 * we notice and clear BLOCKING. 3696 */ 3697 if (lockres->l_level == DLM_LOCK_NL) { 3698 BUG_ON(lockres->l_ex_holders || lockres->l_ro_holders); 3699 mlog(ML_BASTS, "lockres %s, Aborting dc\n", lockres->l_name); 3700 lockres->l_blocking = DLM_LOCK_NL; 3701 lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED); 3702 spin_unlock_irqrestore(&lockres->l_lock, flags); 3703 goto leave; 3704 } 3705 3706 /* if we're blocking an exclusive and we have *any* holders, 3707 * then requeue. */ 3708 if ((lockres->l_blocking == DLM_LOCK_EX) 3709 && (lockres->l_ex_holders || lockres->l_ro_holders)) { 3710 mlog(ML_BASTS, "lockres %s, ReQ: EX/PR Holders %u,%u\n", 3711 lockres->l_name, lockres->l_ex_holders, 3712 lockres->l_ro_holders); 3713 goto leave_requeue; 3714 } 3715 3716 /* If it's a PR we're blocking, then only 3717 * requeue if we've got any EX holders */ 3718 if (lockres->l_blocking == DLM_LOCK_PR && 3719 lockres->l_ex_holders) { 3720 mlog(ML_BASTS, "lockres %s, ReQ: EX Holders %u\n", 3721 lockres->l_name, lockres->l_ex_holders); 3722 goto leave_requeue; 3723 } 3724 3725 /* 3726 * Can we get a lock in this state if the holder counts are 3727 * zero? The meta data unblock code used to check this. 3728 */ 3729 if ((lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 3730 && (lockres->l_flags & OCFS2_LOCK_REFRESHING)) { 3731 mlog(ML_BASTS, "lockres %s, ReQ: Lock Refreshing\n", 3732 lockres->l_name); 3733 goto leave_requeue; 3734 } 3735 3736 new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking); 3737 3738 if (lockres->l_ops->check_downconvert 3739 && !lockres->l_ops->check_downconvert(lockres, new_level)) { 3740 mlog(ML_BASTS, "lockres %s, ReQ: Checkpointing\n", 3741 lockres->l_name); 3742 goto leave_requeue; 3743 } 3744 3745 /* If we get here, then we know that there are no more 3746 * incompatible holders (and anyone asking for an incompatible 3747 * lock is blocked). We can now downconvert the lock */ 3748 if (!lockres->l_ops->downconvert_worker) 3749 goto downconvert; 3750 3751 /* Some lockres types want to do a bit of work before 3752 * downconverting a lock. Allow that here. The worker function 3753 * may sleep, so we save off a copy of what we're blocking as 3754 * it may change while we're not holding the spin lock. */ 3755 blocking = lockres->l_blocking; 3756 level = lockres->l_level; 3757 spin_unlock_irqrestore(&lockres->l_lock, flags); 3758 3759 ctl->unblock_action = lockres->l_ops->downconvert_worker(lockres, blocking); 3760 3761 if (ctl->unblock_action == UNBLOCK_STOP_POST) { 3762 mlog(ML_BASTS, "lockres %s, UNBLOCK_STOP_POST\n", 3763 lockres->l_name); 3764 goto leave; 3765 } 3766 3767 spin_lock_irqsave(&lockres->l_lock, flags); 3768 if ((blocking != lockres->l_blocking) || (level != lockres->l_level)) { 3769 /* If this changed underneath us, then we can't drop 3770 * it just yet. */ 3771 mlog(ML_BASTS, "lockres %s, block=%d:%d, level=%d:%d, " 3772 "Recheck\n", lockres->l_name, blocking, 3773 lockres->l_blocking, level, lockres->l_level); 3774 goto recheck; 3775 } 3776 3777 downconvert: 3778 ctl->requeue = 0; 3779 3780 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) { 3781 if (lockres->l_level == DLM_LOCK_EX) 3782 set_lvb = 1; 3783 3784 /* 3785 * We only set the lvb if the lock has been fully 3786 * refreshed - otherwise we risk setting stale 3787 * data. Otherwise, there's no need to actually clear 3788 * out the lvb here as it's value is still valid. 3789 */ 3790 if (set_lvb && !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) 3791 lockres->l_ops->set_lvb(lockres); 3792 } 3793 3794 gen = ocfs2_prepare_downconvert(lockres, new_level); 3795 spin_unlock_irqrestore(&lockres->l_lock, flags); 3796 ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb, 3797 gen); 3798 3799 leave: 3800 if (ret) 3801 mlog_errno(ret); 3802 return ret; 3803 3804 leave_requeue: 3805 spin_unlock_irqrestore(&lockres->l_lock, flags); 3806 ctl->requeue = 1; 3807 3808 return 0; 3809 } 3810 3811 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres, 3812 int blocking) 3813 { 3814 struct inode *inode; 3815 struct address_space *mapping; 3816 struct ocfs2_inode_info *oi; 3817 3818 inode = ocfs2_lock_res_inode(lockres); 3819 mapping = inode->i_mapping; 3820 3821 if (S_ISDIR(inode->i_mode)) { 3822 oi = OCFS2_I(inode); 3823 oi->ip_dir_lock_gen++; 3824 mlog(0, "generation: %u\n", oi->ip_dir_lock_gen); 3825 goto out; 3826 } 3827 3828 if (!S_ISREG(inode->i_mode)) 3829 goto out; 3830 3831 /* 3832 * We need this before the filemap_fdatawrite() so that it can 3833 * transfer the dirty bit from the PTE to the 3834 * page. Unfortunately this means that even for EX->PR 3835 * downconverts, we'll lose our mappings and have to build 3836 * them up again. 3837 */ 3838 unmap_mapping_range(mapping, 0, 0, 0); 3839 3840 if (filemap_fdatawrite(mapping)) { 3841 mlog(ML_ERROR, "Could not sync inode %llu for downconvert!", 3842 (unsigned long long)OCFS2_I(inode)->ip_blkno); 3843 } 3844 sync_mapping_buffers(mapping); 3845 if (blocking == DLM_LOCK_EX) { 3846 truncate_inode_pages(mapping, 0); 3847 } else { 3848 /* We only need to wait on the I/O if we're not also 3849 * truncating pages because truncate_inode_pages waits 3850 * for us above. We don't truncate pages if we're 3851 * blocking anything < EXMODE because we want to keep 3852 * them around in that case. */ 3853 filemap_fdatawait(mapping); 3854 } 3855 3856 forget_all_cached_acls(inode); 3857 3858 out: 3859 return UNBLOCK_CONTINUE; 3860 } 3861 3862 static int ocfs2_ci_checkpointed(struct ocfs2_caching_info *ci, 3863 struct ocfs2_lock_res *lockres, 3864 int new_level) 3865 { 3866 int checkpointed = ocfs2_ci_fully_checkpointed(ci); 3867 3868 BUG_ON(new_level != DLM_LOCK_NL && new_level != DLM_LOCK_PR); 3869 BUG_ON(lockres->l_level != DLM_LOCK_EX && !checkpointed); 3870 3871 if (checkpointed) 3872 return 1; 3873 3874 ocfs2_start_checkpoint(OCFS2_SB(ocfs2_metadata_cache_get_super(ci))); 3875 return 0; 3876 } 3877 3878 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres, 3879 int new_level) 3880 { 3881 struct inode *inode = ocfs2_lock_res_inode(lockres); 3882 3883 return ocfs2_ci_checkpointed(INODE_CACHE(inode), lockres, new_level); 3884 } 3885 3886 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres) 3887 { 3888 struct inode *inode = ocfs2_lock_res_inode(lockres); 3889 3890 __ocfs2_stuff_meta_lvb(inode); 3891 } 3892 3893 /* 3894 * Does the final reference drop on our dentry lock. Right now this 3895 * happens in the downconvert thread, but we could choose to simplify the 3896 * dlmglue API and push these off to the ocfs2_wq in the future. 3897 */ 3898 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb, 3899 struct ocfs2_lock_res *lockres) 3900 { 3901 struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres); 3902 ocfs2_dentry_lock_put(osb, dl); 3903 } 3904 3905 /* 3906 * d_delete() matching dentries before the lock downconvert. 3907 * 3908 * At this point, any process waiting to destroy the 3909 * dentry_lock due to last ref count is stopped by the 3910 * OCFS2_LOCK_QUEUED flag. 3911 * 3912 * We have two potential problems 3913 * 3914 * 1) If we do the last reference drop on our dentry_lock (via dput) 3915 * we'll wind up in ocfs2_release_dentry_lock(), waiting on 3916 * the downconvert to finish. Instead we take an elevated 3917 * reference and push the drop until after we've completed our 3918 * unblock processing. 3919 * 3920 * 2) There might be another process with a final reference, 3921 * waiting on us to finish processing. If this is the case, we 3922 * detect it and exit out - there's no more dentries anyway. 3923 */ 3924 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres, 3925 int blocking) 3926 { 3927 struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres); 3928 struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode); 3929 struct dentry *dentry; 3930 unsigned long flags; 3931 int extra_ref = 0; 3932 3933 /* 3934 * This node is blocking another node from getting a read 3935 * lock. This happens when we've renamed within a 3936 * directory. We've forced the other nodes to d_delete(), but 3937 * we never actually dropped our lock because it's still 3938 * valid. The downconvert code will retain a PR for this node, 3939 * so there's no further work to do. 3940 */ 3941 if (blocking == DLM_LOCK_PR) 3942 return UNBLOCK_CONTINUE; 3943 3944 /* 3945 * Mark this inode as potentially orphaned. The code in 3946 * ocfs2_delete_inode() will figure out whether it actually 3947 * needs to be freed or not. 3948 */ 3949 spin_lock(&oi->ip_lock); 3950 oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED; 3951 spin_unlock(&oi->ip_lock); 3952 3953 /* 3954 * Yuck. We need to make sure however that the check of 3955 * OCFS2_LOCK_FREEING and the extra reference are atomic with 3956 * respect to a reference decrement or the setting of that 3957 * flag. 3958 */ 3959 spin_lock_irqsave(&lockres->l_lock, flags); 3960 spin_lock(&dentry_attach_lock); 3961 if (!(lockres->l_flags & OCFS2_LOCK_FREEING) 3962 && dl->dl_count) { 3963 dl->dl_count++; 3964 extra_ref = 1; 3965 } 3966 spin_unlock(&dentry_attach_lock); 3967 spin_unlock_irqrestore(&lockres->l_lock, flags); 3968 3969 mlog(0, "extra_ref = %d\n", extra_ref); 3970 3971 /* 3972 * We have a process waiting on us in ocfs2_dentry_iput(), 3973 * which means we can't have any more outstanding 3974 * aliases. There's no need to do any more work. 3975 */ 3976 if (!extra_ref) 3977 return UNBLOCK_CONTINUE; 3978 3979 spin_lock(&dentry_attach_lock); 3980 while (1) { 3981 dentry = ocfs2_find_local_alias(dl->dl_inode, 3982 dl->dl_parent_blkno, 1); 3983 if (!dentry) 3984 break; 3985 spin_unlock(&dentry_attach_lock); 3986 3987 if (S_ISDIR(dl->dl_inode->i_mode)) 3988 shrink_dcache_parent(dentry); 3989 3990 mlog(0, "d_delete(%pd);\n", dentry); 3991 3992 /* 3993 * The following dcache calls may do an 3994 * iput(). Normally we don't want that from the 3995 * downconverting thread, but in this case it's ok 3996 * because the requesting node already has an 3997 * exclusive lock on the inode, so it can't be queued 3998 * for a downconvert. 3999 */ 4000 d_delete(dentry); 4001 dput(dentry); 4002 4003 spin_lock(&dentry_attach_lock); 4004 } 4005 spin_unlock(&dentry_attach_lock); 4006 4007 /* 4008 * If we are the last holder of this dentry lock, there is no 4009 * reason to downconvert so skip straight to the unlock. 4010 */ 4011 if (dl->dl_count == 1) 4012 return UNBLOCK_STOP_POST; 4013 4014 return UNBLOCK_CONTINUE_POST; 4015 } 4016 4017 static int ocfs2_check_refcount_downconvert(struct ocfs2_lock_res *lockres, 4018 int new_level) 4019 { 4020 struct ocfs2_refcount_tree *tree = 4021 ocfs2_lock_res_refcount_tree(lockres); 4022 4023 return ocfs2_ci_checkpointed(&tree->rf_ci, lockres, new_level); 4024 } 4025 4026 static int ocfs2_refcount_convert_worker(struct ocfs2_lock_res *lockres, 4027 int blocking) 4028 { 4029 struct ocfs2_refcount_tree *tree = 4030 ocfs2_lock_res_refcount_tree(lockres); 4031 4032 ocfs2_metadata_cache_purge(&tree->rf_ci); 4033 4034 return UNBLOCK_CONTINUE; 4035 } 4036 4037 static void ocfs2_set_qinfo_lvb(struct ocfs2_lock_res *lockres) 4038 { 4039 struct ocfs2_qinfo_lvb *lvb; 4040 struct ocfs2_mem_dqinfo *oinfo = ocfs2_lock_res_qinfo(lockres); 4041 struct mem_dqinfo *info = sb_dqinfo(oinfo->dqi_gi.dqi_sb, 4042 oinfo->dqi_gi.dqi_type); 4043 4044 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 4045 lvb->lvb_version = OCFS2_QINFO_LVB_VERSION; 4046 lvb->lvb_bgrace = cpu_to_be32(info->dqi_bgrace); 4047 lvb->lvb_igrace = cpu_to_be32(info->dqi_igrace); 4048 lvb->lvb_syncms = cpu_to_be32(oinfo->dqi_syncms); 4049 lvb->lvb_blocks = cpu_to_be32(oinfo->dqi_gi.dqi_blocks); 4050 lvb->lvb_free_blk = cpu_to_be32(oinfo->dqi_gi.dqi_free_blk); 4051 lvb->lvb_free_entry = cpu_to_be32(oinfo->dqi_gi.dqi_free_entry); 4052 } 4053 4054 void ocfs2_qinfo_unlock(struct ocfs2_mem_dqinfo *oinfo, int ex) 4055 { 4056 struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock; 4057 struct ocfs2_super *osb = OCFS2_SB(oinfo->dqi_gi.dqi_sb); 4058 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 4059 4060 if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb)) 4061 ocfs2_cluster_unlock(osb, lockres, level); 4062 } 4063 4064 static int ocfs2_refresh_qinfo(struct ocfs2_mem_dqinfo *oinfo) 4065 { 4066 struct mem_dqinfo *info = sb_dqinfo(oinfo->dqi_gi.dqi_sb, 4067 oinfo->dqi_gi.dqi_type); 4068 struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock; 4069 struct ocfs2_qinfo_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 4070 struct buffer_head *bh = NULL; 4071 struct ocfs2_global_disk_dqinfo *gdinfo; 4072 int status = 0; 4073 4074 if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) && 4075 lvb->lvb_version == OCFS2_QINFO_LVB_VERSION) { 4076 info->dqi_bgrace = be32_to_cpu(lvb->lvb_bgrace); 4077 info->dqi_igrace = be32_to_cpu(lvb->lvb_igrace); 4078 oinfo->dqi_syncms = be32_to_cpu(lvb->lvb_syncms); 4079 oinfo->dqi_gi.dqi_blocks = be32_to_cpu(lvb->lvb_blocks); 4080 oinfo->dqi_gi.dqi_free_blk = be32_to_cpu(lvb->lvb_free_blk); 4081 oinfo->dqi_gi.dqi_free_entry = 4082 be32_to_cpu(lvb->lvb_free_entry); 4083 } else { 4084 status = ocfs2_read_quota_phys_block(oinfo->dqi_gqinode, 4085 oinfo->dqi_giblk, &bh); 4086 if (status) { 4087 mlog_errno(status); 4088 goto bail; 4089 } 4090 gdinfo = (struct ocfs2_global_disk_dqinfo *) 4091 (bh->b_data + OCFS2_GLOBAL_INFO_OFF); 4092 info->dqi_bgrace = le32_to_cpu(gdinfo->dqi_bgrace); 4093 info->dqi_igrace = le32_to_cpu(gdinfo->dqi_igrace); 4094 oinfo->dqi_syncms = le32_to_cpu(gdinfo->dqi_syncms); 4095 oinfo->dqi_gi.dqi_blocks = le32_to_cpu(gdinfo->dqi_blocks); 4096 oinfo->dqi_gi.dqi_free_blk = le32_to_cpu(gdinfo->dqi_free_blk); 4097 oinfo->dqi_gi.dqi_free_entry = 4098 le32_to_cpu(gdinfo->dqi_free_entry); 4099 brelse(bh); 4100 ocfs2_track_lock_refresh(lockres); 4101 } 4102 4103 bail: 4104 return status; 4105 } 4106 4107 /* Lock quota info, this function expects at least shared lock on the quota file 4108 * so that we can safely refresh quota info from disk. */ 4109 int ocfs2_qinfo_lock(struct ocfs2_mem_dqinfo *oinfo, int ex) 4110 { 4111 struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock; 4112 struct ocfs2_super *osb = OCFS2_SB(oinfo->dqi_gi.dqi_sb); 4113 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 4114 int status = 0; 4115 4116 /* On RO devices, locking really isn't needed... */ 4117 if (ocfs2_is_hard_readonly(osb)) { 4118 if (ex) 4119 status = -EROFS; 4120 goto bail; 4121 } 4122 if (ocfs2_mount_local(osb)) 4123 goto bail; 4124 4125 status = ocfs2_cluster_lock(osb, lockres, level, 0, 0); 4126 if (status < 0) { 4127 mlog_errno(status); 4128 goto bail; 4129 } 4130 if (!ocfs2_should_refresh_lock_res(lockres)) 4131 goto bail; 4132 /* OK, we have the lock but we need to refresh the quota info */ 4133 status = ocfs2_refresh_qinfo(oinfo); 4134 if (status) 4135 ocfs2_qinfo_unlock(oinfo, ex); 4136 ocfs2_complete_lock_res_refresh(lockres, status); 4137 bail: 4138 return status; 4139 } 4140 4141 int ocfs2_refcount_lock(struct ocfs2_refcount_tree *ref_tree, int ex) 4142 { 4143 int status; 4144 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 4145 struct ocfs2_lock_res *lockres = &ref_tree->rf_lockres; 4146 struct ocfs2_super *osb = lockres->l_priv; 4147 4148 4149 if (ocfs2_is_hard_readonly(osb)) 4150 return -EROFS; 4151 4152 if (ocfs2_mount_local(osb)) 4153 return 0; 4154 4155 status = ocfs2_cluster_lock(osb, lockres, level, 0, 0); 4156 if (status < 0) 4157 mlog_errno(status); 4158 4159 return status; 4160 } 4161 4162 void ocfs2_refcount_unlock(struct ocfs2_refcount_tree *ref_tree, int ex) 4163 { 4164 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 4165 struct ocfs2_lock_res *lockres = &ref_tree->rf_lockres; 4166 struct ocfs2_super *osb = lockres->l_priv; 4167 4168 if (!ocfs2_mount_local(osb)) 4169 ocfs2_cluster_unlock(osb, lockres, level); 4170 } 4171 4172 static void ocfs2_process_blocked_lock(struct ocfs2_super *osb, 4173 struct ocfs2_lock_res *lockres) 4174 { 4175 int status; 4176 struct ocfs2_unblock_ctl ctl = {0, 0,}; 4177 unsigned long flags; 4178 4179 /* Our reference to the lockres in this function can be 4180 * considered valid until we remove the OCFS2_LOCK_QUEUED 4181 * flag. */ 4182 4183 BUG_ON(!lockres); 4184 BUG_ON(!lockres->l_ops); 4185 4186 mlog(ML_BASTS, "lockres %s blocked\n", lockres->l_name); 4187 4188 /* Detect whether a lock has been marked as going away while 4189 * the downconvert thread was processing other things. A lock can 4190 * still be marked with OCFS2_LOCK_FREEING after this check, 4191 * but short circuiting here will still save us some 4192 * performance. */ 4193 spin_lock_irqsave(&lockres->l_lock, flags); 4194 if (lockres->l_flags & OCFS2_LOCK_FREEING) 4195 goto unqueue; 4196 spin_unlock_irqrestore(&lockres->l_lock, flags); 4197 4198 status = ocfs2_unblock_lock(osb, lockres, &ctl); 4199 if (status < 0) 4200 mlog_errno(status); 4201 4202 spin_lock_irqsave(&lockres->l_lock, flags); 4203 unqueue: 4204 if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) { 4205 lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED); 4206 } else 4207 ocfs2_schedule_blocked_lock(osb, lockres); 4208 4209 mlog(ML_BASTS, "lockres %s, requeue = %s.\n", lockres->l_name, 4210 ctl.requeue ? "yes" : "no"); 4211 spin_unlock_irqrestore(&lockres->l_lock, flags); 4212 4213 if (ctl.unblock_action != UNBLOCK_CONTINUE 4214 && lockres->l_ops->post_unlock) 4215 lockres->l_ops->post_unlock(osb, lockres); 4216 } 4217 4218 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb, 4219 struct ocfs2_lock_res *lockres) 4220 { 4221 unsigned long flags; 4222 4223 assert_spin_locked(&lockres->l_lock); 4224 4225 if (lockres->l_flags & OCFS2_LOCK_FREEING) { 4226 /* Do not schedule a lock for downconvert when it's on 4227 * the way to destruction - any nodes wanting access 4228 * to the resource will get it soon. */ 4229 mlog(ML_BASTS, "lockres %s won't be scheduled: flags 0x%lx\n", 4230 lockres->l_name, lockres->l_flags); 4231 return; 4232 } 4233 4234 lockres_or_flags(lockres, OCFS2_LOCK_QUEUED); 4235 4236 spin_lock_irqsave(&osb->dc_task_lock, flags); 4237 if (list_empty(&lockres->l_blocked_list)) { 4238 list_add_tail(&lockres->l_blocked_list, 4239 &osb->blocked_lock_list); 4240 osb->blocked_lock_count++; 4241 } 4242 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4243 } 4244 4245 static void ocfs2_downconvert_thread_do_work(struct ocfs2_super *osb) 4246 { 4247 unsigned long processed; 4248 unsigned long flags; 4249 struct ocfs2_lock_res *lockres; 4250 4251 spin_lock_irqsave(&osb->dc_task_lock, flags); 4252 /* grab this early so we know to try again if a state change and 4253 * wake happens part-way through our work */ 4254 osb->dc_work_sequence = osb->dc_wake_sequence; 4255 4256 processed = osb->blocked_lock_count; 4257 /* 4258 * blocked lock processing in this loop might call iput which can 4259 * remove items off osb->blocked_lock_list. Downconvert up to 4260 * 'processed' number of locks, but stop short if we had some 4261 * removed in ocfs2_mark_lockres_freeing when downconverting. 4262 */ 4263 while (processed && !list_empty(&osb->blocked_lock_list)) { 4264 lockres = list_entry(osb->blocked_lock_list.next, 4265 struct ocfs2_lock_res, l_blocked_list); 4266 list_del_init(&lockres->l_blocked_list); 4267 osb->blocked_lock_count--; 4268 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4269 4270 BUG_ON(!processed); 4271 processed--; 4272 4273 ocfs2_process_blocked_lock(osb, lockres); 4274 4275 spin_lock_irqsave(&osb->dc_task_lock, flags); 4276 } 4277 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4278 } 4279 4280 static int ocfs2_downconvert_thread_lists_empty(struct ocfs2_super *osb) 4281 { 4282 int empty = 0; 4283 unsigned long flags; 4284 4285 spin_lock_irqsave(&osb->dc_task_lock, flags); 4286 if (list_empty(&osb->blocked_lock_list)) 4287 empty = 1; 4288 4289 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4290 return empty; 4291 } 4292 4293 static int ocfs2_downconvert_thread_should_wake(struct ocfs2_super *osb) 4294 { 4295 int should_wake = 0; 4296 unsigned long flags; 4297 4298 spin_lock_irqsave(&osb->dc_task_lock, flags); 4299 if (osb->dc_work_sequence != osb->dc_wake_sequence) 4300 should_wake = 1; 4301 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4302 4303 return should_wake; 4304 } 4305 4306 static int ocfs2_downconvert_thread(void *arg) 4307 { 4308 int status = 0; 4309 struct ocfs2_super *osb = arg; 4310 4311 /* only quit once we've been asked to stop and there is no more 4312 * work available */ 4313 while (!(kthread_should_stop() && 4314 ocfs2_downconvert_thread_lists_empty(osb))) { 4315 4316 wait_event_interruptible(osb->dc_event, 4317 ocfs2_downconvert_thread_should_wake(osb) || 4318 kthread_should_stop()); 4319 4320 mlog(0, "downconvert_thread: awoken\n"); 4321 4322 ocfs2_downconvert_thread_do_work(osb); 4323 } 4324 4325 osb->dc_task = NULL; 4326 return status; 4327 } 4328 4329 void ocfs2_wake_downconvert_thread(struct ocfs2_super *osb) 4330 { 4331 unsigned long flags; 4332 4333 spin_lock_irqsave(&osb->dc_task_lock, flags); 4334 /* make sure the voting thread gets a swipe at whatever changes 4335 * the caller may have made to the voting state */ 4336 osb->dc_wake_sequence++; 4337 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4338 wake_up(&osb->dc_event); 4339 } 4340