xref: /openbmc/linux/fs/ocfs2/dlmglue.c (revision f8a11425075ff11b4b5784f077cb84f3d2dfb3f0)
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /*
3  * dlmglue.c
4  *
5  * Code which implements an OCFS2 specific interface to our DLM.
6  *
7  * Copyright (C) 2003, 2004 Oracle.  All rights reserved.
8  */
9 
10 #include <linux/types.h>
11 #include <linux/slab.h>
12 #include <linux/highmem.h>
13 #include <linux/mm.h>
14 #include <linux/kthread.h>
15 #include <linux/pagemap.h>
16 #include <linux/debugfs.h>
17 #include <linux/seq_file.h>
18 #include <linux/time.h>
19 #include <linux/quotaops.h>
20 #include <linux/sched/signal.h>
21 
22 #define MLOG_MASK_PREFIX ML_DLM_GLUE
23 #include <cluster/masklog.h>
24 
25 #include "ocfs2.h"
26 #include "ocfs2_lockingver.h"
27 
28 #include "alloc.h"
29 #include "dcache.h"
30 #include "dlmglue.h"
31 #include "extent_map.h"
32 #include "file.h"
33 #include "heartbeat.h"
34 #include "inode.h"
35 #include "journal.h"
36 #include "stackglue.h"
37 #include "slot_map.h"
38 #include "super.h"
39 #include "uptodate.h"
40 #include "quota.h"
41 #include "refcounttree.h"
42 #include "acl.h"
43 
44 #include "buffer_head_io.h"
45 
46 struct ocfs2_mask_waiter {
47 	struct list_head	mw_item;
48 	int			mw_status;
49 	struct completion	mw_complete;
50 	unsigned long		mw_mask;
51 	unsigned long		mw_goal;
52 #ifdef CONFIG_OCFS2_FS_STATS
53 	ktime_t			mw_lock_start;
54 #endif
55 };
56 
57 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres);
58 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres);
59 static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres);
60 static struct ocfs2_super *ocfs2_get_qinfo_osb(struct ocfs2_lock_res *lockres);
61 
62 /*
63  * Return value from ->downconvert_worker functions.
64  *
65  * These control the precise actions of ocfs2_unblock_lock()
66  * and ocfs2_process_blocked_lock()
67  *
68  */
69 enum ocfs2_unblock_action {
70 	UNBLOCK_CONTINUE	= 0, /* Continue downconvert */
71 	UNBLOCK_CONTINUE_POST	= 1, /* Continue downconvert, fire
72 				      * ->post_unlock callback */
73 	UNBLOCK_STOP_POST	= 2, /* Do not downconvert, fire
74 				      * ->post_unlock() callback. */
75 };
76 
77 struct ocfs2_unblock_ctl {
78 	int requeue;
79 	enum ocfs2_unblock_action unblock_action;
80 };
81 
82 /* Lockdep class keys */
83 #ifdef CONFIG_DEBUG_LOCK_ALLOC
84 static struct lock_class_key lockdep_keys[OCFS2_NUM_LOCK_TYPES];
85 #endif
86 
87 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
88 					int new_level);
89 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres);
90 
91 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
92 				     int blocking);
93 
94 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
95 				       int blocking);
96 
97 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
98 				     struct ocfs2_lock_res *lockres);
99 
100 static void ocfs2_set_qinfo_lvb(struct ocfs2_lock_res *lockres);
101 
102 static int ocfs2_check_refcount_downconvert(struct ocfs2_lock_res *lockres,
103 					    int new_level);
104 static int ocfs2_refcount_convert_worker(struct ocfs2_lock_res *lockres,
105 					 int blocking);
106 
107 #define mlog_meta_lvb(__level, __lockres) ocfs2_dump_meta_lvb_info(__level, __PRETTY_FUNCTION__, __LINE__, __lockres)
108 
109 /* This aids in debugging situations where a bad LVB might be involved. */
110 static void ocfs2_dump_meta_lvb_info(u64 level,
111 				     const char *function,
112 				     unsigned int line,
113 				     struct ocfs2_lock_res *lockres)
114 {
115 	struct ocfs2_meta_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
116 
117 	mlog(level, "LVB information for %s (called from %s:%u):\n",
118 	     lockres->l_name, function, line);
119 	mlog(level, "version: %u, clusters: %u, generation: 0x%x\n",
120 	     lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters),
121 	     be32_to_cpu(lvb->lvb_igeneration));
122 	mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n",
123 	     (unsigned long long)be64_to_cpu(lvb->lvb_isize),
124 	     be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid),
125 	     be16_to_cpu(lvb->lvb_imode));
126 	mlog(level, "nlink %u, atime_packed 0x%llx, ctime_packed 0x%llx, "
127 	     "mtime_packed 0x%llx iattr 0x%x\n", be16_to_cpu(lvb->lvb_inlink),
128 	     (long long)be64_to_cpu(lvb->lvb_iatime_packed),
129 	     (long long)be64_to_cpu(lvb->lvb_ictime_packed),
130 	     (long long)be64_to_cpu(lvb->lvb_imtime_packed),
131 	     be32_to_cpu(lvb->lvb_iattr));
132 }
133 
134 
135 /*
136  * OCFS2 Lock Resource Operations
137  *
138  * These fine tune the behavior of the generic dlmglue locking infrastructure.
139  *
140  * The most basic of lock types can point ->l_priv to their respective
141  * struct ocfs2_super and allow the default actions to manage things.
142  *
143  * Right now, each lock type also needs to implement an init function,
144  * and trivial lock/unlock wrappers. ocfs2_simple_drop_lockres()
145  * should be called when the lock is no longer needed (i.e., object
146  * destruction time).
147  */
148 struct ocfs2_lock_res_ops {
149 	/*
150 	 * Translate an ocfs2_lock_res * into an ocfs2_super *. Define
151 	 * this callback if ->l_priv is not an ocfs2_super pointer
152 	 */
153 	struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *);
154 
155 	/*
156 	 * Optionally called in the downconvert thread after a
157 	 * successful downconvert. The lockres will not be referenced
158 	 * after this callback is called, so it is safe to free
159 	 * memory, etc.
160 	 *
161 	 * The exact semantics of when this is called are controlled
162 	 * by ->downconvert_worker()
163 	 */
164 	void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *);
165 
166 	/*
167 	 * Allow a lock type to add checks to determine whether it is
168 	 * safe to downconvert a lock. Return 0 to re-queue the
169 	 * downconvert at a later time, nonzero to continue.
170 	 *
171 	 * For most locks, the default checks that there are no
172 	 * incompatible holders are sufficient.
173 	 *
174 	 * Called with the lockres spinlock held.
175 	 */
176 	int (*check_downconvert)(struct ocfs2_lock_res *, int);
177 
178 	/*
179 	 * Allows a lock type to populate the lock value block. This
180 	 * is called on downconvert, and when we drop a lock.
181 	 *
182 	 * Locks that want to use this should set LOCK_TYPE_USES_LVB
183 	 * in the flags field.
184 	 *
185 	 * Called with the lockres spinlock held.
186 	 */
187 	void (*set_lvb)(struct ocfs2_lock_res *);
188 
189 	/*
190 	 * Called from the downconvert thread when it is determined
191 	 * that a lock will be downconverted. This is called without
192 	 * any locks held so the function can do work that might
193 	 * schedule (syncing out data, etc).
194 	 *
195 	 * This should return any one of the ocfs2_unblock_action
196 	 * values, depending on what it wants the thread to do.
197 	 */
198 	int (*downconvert_worker)(struct ocfs2_lock_res *, int);
199 
200 	/*
201 	 * LOCK_TYPE_* flags which describe the specific requirements
202 	 * of a lock type. Descriptions of each individual flag follow.
203 	 */
204 	int flags;
205 };
206 
207 /*
208  * Some locks want to "refresh" potentially stale data when a
209  * meaningful (PRMODE or EXMODE) lock level is first obtained. If this
210  * flag is set, the OCFS2_LOCK_NEEDS_REFRESH flag will be set on the
211  * individual lockres l_flags member from the ast function. It is
212  * expected that the locking wrapper will clear the
213  * OCFS2_LOCK_NEEDS_REFRESH flag when done.
214  */
215 #define LOCK_TYPE_REQUIRES_REFRESH 0x1
216 
217 /*
218  * Indicate that a lock type makes use of the lock value block. The
219  * ->set_lvb lock type callback must be defined.
220  */
221 #define LOCK_TYPE_USES_LVB		0x2
222 
223 static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = {
224 	.get_osb	= ocfs2_get_inode_osb,
225 	.flags		= 0,
226 };
227 
228 static struct ocfs2_lock_res_ops ocfs2_inode_inode_lops = {
229 	.get_osb	= ocfs2_get_inode_osb,
230 	.check_downconvert = ocfs2_check_meta_downconvert,
231 	.set_lvb	= ocfs2_set_meta_lvb,
232 	.downconvert_worker = ocfs2_data_convert_worker,
233 	.flags		= LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB,
234 };
235 
236 static struct ocfs2_lock_res_ops ocfs2_super_lops = {
237 	.flags		= LOCK_TYPE_REQUIRES_REFRESH,
238 };
239 
240 static struct ocfs2_lock_res_ops ocfs2_rename_lops = {
241 	.flags		= 0,
242 };
243 
244 static struct ocfs2_lock_res_ops ocfs2_nfs_sync_lops = {
245 	.flags		= 0,
246 };
247 
248 static struct ocfs2_lock_res_ops ocfs2_trim_fs_lops = {
249 	.flags		= LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB,
250 };
251 
252 static struct ocfs2_lock_res_ops ocfs2_orphan_scan_lops = {
253 	.flags		= LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB,
254 };
255 
256 static struct ocfs2_lock_res_ops ocfs2_dentry_lops = {
257 	.get_osb	= ocfs2_get_dentry_osb,
258 	.post_unlock	= ocfs2_dentry_post_unlock,
259 	.downconvert_worker = ocfs2_dentry_convert_worker,
260 	.flags		= 0,
261 };
262 
263 static struct ocfs2_lock_res_ops ocfs2_inode_open_lops = {
264 	.get_osb	= ocfs2_get_inode_osb,
265 	.flags		= 0,
266 };
267 
268 static struct ocfs2_lock_res_ops ocfs2_flock_lops = {
269 	.get_osb	= ocfs2_get_file_osb,
270 	.flags		= 0,
271 };
272 
273 static struct ocfs2_lock_res_ops ocfs2_qinfo_lops = {
274 	.set_lvb	= ocfs2_set_qinfo_lvb,
275 	.get_osb	= ocfs2_get_qinfo_osb,
276 	.flags		= LOCK_TYPE_REQUIRES_REFRESH | LOCK_TYPE_USES_LVB,
277 };
278 
279 static struct ocfs2_lock_res_ops ocfs2_refcount_block_lops = {
280 	.check_downconvert = ocfs2_check_refcount_downconvert,
281 	.downconvert_worker = ocfs2_refcount_convert_worker,
282 	.flags		= 0,
283 };
284 
285 static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres)
286 {
287 	return lockres->l_type == OCFS2_LOCK_TYPE_META ||
288 		lockres->l_type == OCFS2_LOCK_TYPE_RW ||
289 		lockres->l_type == OCFS2_LOCK_TYPE_OPEN;
290 }
291 
292 static inline struct ocfs2_lock_res *ocfs2_lksb_to_lock_res(struct ocfs2_dlm_lksb *lksb)
293 {
294 	return container_of(lksb, struct ocfs2_lock_res, l_lksb);
295 }
296 
297 static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres)
298 {
299 	BUG_ON(!ocfs2_is_inode_lock(lockres));
300 
301 	return (struct inode *) lockres->l_priv;
302 }
303 
304 static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres)
305 {
306 	BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY);
307 
308 	return (struct ocfs2_dentry_lock *)lockres->l_priv;
309 }
310 
311 static inline struct ocfs2_mem_dqinfo *ocfs2_lock_res_qinfo(struct ocfs2_lock_res *lockres)
312 {
313 	BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_QINFO);
314 
315 	return (struct ocfs2_mem_dqinfo *)lockres->l_priv;
316 }
317 
318 static inline struct ocfs2_refcount_tree *
319 ocfs2_lock_res_refcount_tree(struct ocfs2_lock_res *res)
320 {
321 	return container_of(res, struct ocfs2_refcount_tree, rf_lockres);
322 }
323 
324 static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *lockres)
325 {
326 	if (lockres->l_ops->get_osb)
327 		return lockres->l_ops->get_osb(lockres);
328 
329 	return (struct ocfs2_super *)lockres->l_priv;
330 }
331 
332 static int ocfs2_lock_create(struct ocfs2_super *osb,
333 			     struct ocfs2_lock_res *lockres,
334 			     int level,
335 			     u32 dlm_flags);
336 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
337 						     int wanted);
338 static void __ocfs2_cluster_unlock(struct ocfs2_super *osb,
339 				   struct ocfs2_lock_res *lockres,
340 				   int level, unsigned long caller_ip);
341 static inline void ocfs2_cluster_unlock(struct ocfs2_super *osb,
342 					struct ocfs2_lock_res *lockres,
343 					int level)
344 {
345 	__ocfs2_cluster_unlock(osb, lockres, level, _RET_IP_);
346 }
347 
348 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres);
349 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres);
350 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres);
351 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, int level);
352 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
353 					struct ocfs2_lock_res *lockres);
354 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
355 						int convert);
356 #define ocfs2_log_dlm_error(_func, _err, _lockres) do {					\
357 	if ((_lockres)->l_type != OCFS2_LOCK_TYPE_DENTRY)				\
358 		mlog(ML_ERROR, "DLM error %d while calling %s on resource %s\n",	\
359 		     _err, _func, _lockres->l_name);					\
360 	else										\
361 		mlog(ML_ERROR, "DLM error %d while calling %s on resource %.*s%08x\n",	\
362 		     _err, _func, OCFS2_DENTRY_LOCK_INO_START - 1, (_lockres)->l_name,	\
363 		     (unsigned int)ocfs2_get_dentry_lock_ino(_lockres));		\
364 } while (0)
365 static int ocfs2_downconvert_thread(void *arg);
366 static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb,
367 					struct ocfs2_lock_res *lockres);
368 static int ocfs2_inode_lock_update(struct inode *inode,
369 				  struct buffer_head **bh);
370 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb);
371 static inline int ocfs2_highest_compat_lock_level(int level);
372 static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
373 					      int new_level);
374 static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
375 				  struct ocfs2_lock_res *lockres,
376 				  int new_level,
377 				  int lvb,
378 				  unsigned int generation);
379 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
380 				        struct ocfs2_lock_res *lockres);
381 static int ocfs2_cancel_convert(struct ocfs2_super *osb,
382 				struct ocfs2_lock_res *lockres);
383 
384 
385 static void ocfs2_build_lock_name(enum ocfs2_lock_type type,
386 				  u64 blkno,
387 				  u32 generation,
388 				  char *name)
389 {
390 	int len;
391 
392 	BUG_ON(type >= OCFS2_NUM_LOCK_TYPES);
393 
394 	len = snprintf(name, OCFS2_LOCK_ID_MAX_LEN, "%c%s%016llx%08x",
395 		       ocfs2_lock_type_char(type), OCFS2_LOCK_ID_PAD,
396 		       (long long)blkno, generation);
397 
398 	BUG_ON(len != (OCFS2_LOCK_ID_MAX_LEN - 1));
399 
400 	mlog(0, "built lock resource with name: %s\n", name);
401 }
402 
403 static DEFINE_SPINLOCK(ocfs2_dlm_tracking_lock);
404 
405 static void ocfs2_add_lockres_tracking(struct ocfs2_lock_res *res,
406 				       struct ocfs2_dlm_debug *dlm_debug)
407 {
408 	mlog(0, "Add tracking for lockres %s\n", res->l_name);
409 
410 	spin_lock(&ocfs2_dlm_tracking_lock);
411 	list_add(&res->l_debug_list, &dlm_debug->d_lockres_tracking);
412 	spin_unlock(&ocfs2_dlm_tracking_lock);
413 }
414 
415 static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res)
416 {
417 	spin_lock(&ocfs2_dlm_tracking_lock);
418 	if (!list_empty(&res->l_debug_list))
419 		list_del_init(&res->l_debug_list);
420 	spin_unlock(&ocfs2_dlm_tracking_lock);
421 }
422 
423 #ifdef CONFIG_OCFS2_FS_STATS
424 static void ocfs2_init_lock_stats(struct ocfs2_lock_res *res)
425 {
426 	res->l_lock_refresh = 0;
427 	res->l_lock_wait = 0;
428 	memset(&res->l_lock_prmode, 0, sizeof(struct ocfs2_lock_stats));
429 	memset(&res->l_lock_exmode, 0, sizeof(struct ocfs2_lock_stats));
430 }
431 
432 static void ocfs2_update_lock_stats(struct ocfs2_lock_res *res, int level,
433 				    struct ocfs2_mask_waiter *mw, int ret)
434 {
435 	u32 usec;
436 	ktime_t kt;
437 	struct ocfs2_lock_stats *stats;
438 
439 	if (level == LKM_PRMODE)
440 		stats = &res->l_lock_prmode;
441 	else if (level == LKM_EXMODE)
442 		stats = &res->l_lock_exmode;
443 	else
444 		return;
445 
446 	kt = ktime_sub(ktime_get(), mw->mw_lock_start);
447 	usec = ktime_to_us(kt);
448 
449 	stats->ls_gets++;
450 	stats->ls_total += ktime_to_ns(kt);
451 	/* overflow */
452 	if (unlikely(stats->ls_gets == 0)) {
453 		stats->ls_gets++;
454 		stats->ls_total = ktime_to_ns(kt);
455 	}
456 
457 	if (stats->ls_max < usec)
458 		stats->ls_max = usec;
459 
460 	if (ret)
461 		stats->ls_fail++;
462 
463 	stats->ls_last = ktime_to_us(ktime_get_real());
464 }
465 
466 static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres)
467 {
468 	lockres->l_lock_refresh++;
469 }
470 
471 static inline void ocfs2_track_lock_wait(struct ocfs2_lock_res *lockres)
472 {
473 	struct ocfs2_mask_waiter *mw;
474 
475 	if (list_empty(&lockres->l_mask_waiters)) {
476 		lockres->l_lock_wait = 0;
477 		return;
478 	}
479 
480 	mw = list_first_entry(&lockres->l_mask_waiters,
481 				struct ocfs2_mask_waiter, mw_item);
482 	lockres->l_lock_wait =
483 			ktime_to_us(ktime_mono_to_real(mw->mw_lock_start));
484 }
485 
486 static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw)
487 {
488 	mw->mw_lock_start = ktime_get();
489 }
490 #else
491 static inline void ocfs2_init_lock_stats(struct ocfs2_lock_res *res)
492 {
493 }
494 static inline void ocfs2_update_lock_stats(struct ocfs2_lock_res *res,
495 			   int level, struct ocfs2_mask_waiter *mw, int ret)
496 {
497 }
498 static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres)
499 {
500 }
501 static inline void ocfs2_track_lock_wait(struct ocfs2_lock_res *lockres)
502 {
503 }
504 static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw)
505 {
506 }
507 #endif
508 
509 static void ocfs2_lock_res_init_common(struct ocfs2_super *osb,
510 				       struct ocfs2_lock_res *res,
511 				       enum ocfs2_lock_type type,
512 				       struct ocfs2_lock_res_ops *ops,
513 				       void *priv)
514 {
515 	res->l_type          = type;
516 	res->l_ops           = ops;
517 	res->l_priv          = priv;
518 
519 	res->l_level         = DLM_LOCK_IV;
520 	res->l_requested     = DLM_LOCK_IV;
521 	res->l_blocking      = DLM_LOCK_IV;
522 	res->l_action        = OCFS2_AST_INVALID;
523 	res->l_unlock_action = OCFS2_UNLOCK_INVALID;
524 
525 	res->l_flags         = OCFS2_LOCK_INITIALIZED;
526 
527 	ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug);
528 
529 	ocfs2_init_lock_stats(res);
530 #ifdef CONFIG_DEBUG_LOCK_ALLOC
531 	if (type != OCFS2_LOCK_TYPE_OPEN)
532 		lockdep_init_map(&res->l_lockdep_map, ocfs2_lock_type_strings[type],
533 				 &lockdep_keys[type], 0);
534 	else
535 		res->l_lockdep_map.key = NULL;
536 #endif
537 }
538 
539 void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res)
540 {
541 	/* This also clears out the lock status block */
542 	memset(res, 0, sizeof(struct ocfs2_lock_res));
543 	spin_lock_init(&res->l_lock);
544 	init_waitqueue_head(&res->l_event);
545 	INIT_LIST_HEAD(&res->l_blocked_list);
546 	INIT_LIST_HEAD(&res->l_mask_waiters);
547 	INIT_LIST_HEAD(&res->l_holders);
548 }
549 
550 void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res,
551 			       enum ocfs2_lock_type type,
552 			       unsigned int generation,
553 			       struct inode *inode)
554 {
555 	struct ocfs2_lock_res_ops *ops;
556 
557 	switch(type) {
558 		case OCFS2_LOCK_TYPE_RW:
559 			ops = &ocfs2_inode_rw_lops;
560 			break;
561 		case OCFS2_LOCK_TYPE_META:
562 			ops = &ocfs2_inode_inode_lops;
563 			break;
564 		case OCFS2_LOCK_TYPE_OPEN:
565 			ops = &ocfs2_inode_open_lops;
566 			break;
567 		default:
568 			mlog_bug_on_msg(1, "type: %d\n", type);
569 			ops = NULL; /* thanks, gcc */
570 			break;
571 	}
572 
573 	ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno,
574 			      generation, res->l_name);
575 	ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode);
576 }
577 
578 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres)
579 {
580 	struct inode *inode = ocfs2_lock_res_inode(lockres);
581 
582 	return OCFS2_SB(inode->i_sb);
583 }
584 
585 static struct ocfs2_super *ocfs2_get_qinfo_osb(struct ocfs2_lock_res *lockres)
586 {
587 	struct ocfs2_mem_dqinfo *info = lockres->l_priv;
588 
589 	return OCFS2_SB(info->dqi_gi.dqi_sb);
590 }
591 
592 static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres)
593 {
594 	struct ocfs2_file_private *fp = lockres->l_priv;
595 
596 	return OCFS2_SB(fp->fp_file->f_mapping->host->i_sb);
597 }
598 
599 static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres)
600 {
601 	__be64 inode_blkno_be;
602 
603 	memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START],
604 	       sizeof(__be64));
605 
606 	return be64_to_cpu(inode_blkno_be);
607 }
608 
609 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres)
610 {
611 	struct ocfs2_dentry_lock *dl = lockres->l_priv;
612 
613 	return OCFS2_SB(dl->dl_inode->i_sb);
614 }
615 
616 void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl,
617 				u64 parent, struct inode *inode)
618 {
619 	int len;
620 	u64 inode_blkno = OCFS2_I(inode)->ip_blkno;
621 	__be64 inode_blkno_be = cpu_to_be64(inode_blkno);
622 	struct ocfs2_lock_res *lockres = &dl->dl_lockres;
623 
624 	ocfs2_lock_res_init_once(lockres);
625 
626 	/*
627 	 * Unfortunately, the standard lock naming scheme won't work
628 	 * here because we have two 16 byte values to use. Instead,
629 	 * we'll stuff the inode number as a binary value. We still
630 	 * want error prints to show something without garbling the
631 	 * display, so drop a null byte in there before the inode
632 	 * number. A future version of OCFS2 will likely use all
633 	 * binary lock names. The stringified names have been a
634 	 * tremendous aid in debugging, but now that the debugfs
635 	 * interface exists, we can mangle things there if need be.
636 	 *
637 	 * NOTE: We also drop the standard "pad" value (the total lock
638 	 * name size stays the same though - the last part is all
639 	 * zeros due to the memset in ocfs2_lock_res_init_once()
640 	 */
641 	len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START,
642 		       "%c%016llx",
643 		       ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY),
644 		       (long long)parent);
645 
646 	BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1));
647 
648 	memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be,
649 	       sizeof(__be64));
650 
651 	ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
652 				   OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops,
653 				   dl);
654 }
655 
656 static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res,
657 				      struct ocfs2_super *osb)
658 {
659 	/* Superblock lockres doesn't come from a slab so we call init
660 	 * once on it manually.  */
661 	ocfs2_lock_res_init_once(res);
662 	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO,
663 			      0, res->l_name);
664 	ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER,
665 				   &ocfs2_super_lops, osb);
666 }
667 
668 static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res,
669 				       struct ocfs2_super *osb)
670 {
671 	/* Rename lockres doesn't come from a slab so we call init
672 	 * once on it manually.  */
673 	ocfs2_lock_res_init_once(res);
674 	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name);
675 	ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME,
676 				   &ocfs2_rename_lops, osb);
677 }
678 
679 static void ocfs2_nfs_sync_lock_res_init(struct ocfs2_lock_res *res,
680 					 struct ocfs2_super *osb)
681 {
682 	/* nfs_sync lockres doesn't come from a slab so we call init
683 	 * once on it manually.  */
684 	ocfs2_lock_res_init_once(res);
685 	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_NFS_SYNC, 0, 0, res->l_name);
686 	ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_NFS_SYNC,
687 				   &ocfs2_nfs_sync_lops, osb);
688 }
689 
690 static void ocfs2_nfs_sync_lock_init(struct ocfs2_super *osb)
691 {
692 	ocfs2_nfs_sync_lock_res_init(&osb->osb_nfs_sync_lockres, osb);
693 	init_rwsem(&osb->nfs_sync_rwlock);
694 }
695 
696 void ocfs2_trim_fs_lock_res_init(struct ocfs2_super *osb)
697 {
698 	struct ocfs2_lock_res *lockres = &osb->osb_trim_fs_lockres;
699 
700 	/* Only one trimfs thread are allowed to work at the same time. */
701 	mutex_lock(&osb->obs_trim_fs_mutex);
702 
703 	ocfs2_lock_res_init_once(lockres);
704 	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_TRIM_FS, 0, 0, lockres->l_name);
705 	ocfs2_lock_res_init_common(osb, lockres, OCFS2_LOCK_TYPE_TRIM_FS,
706 				   &ocfs2_trim_fs_lops, osb);
707 }
708 
709 void ocfs2_trim_fs_lock_res_uninit(struct ocfs2_super *osb)
710 {
711 	struct ocfs2_lock_res *lockres = &osb->osb_trim_fs_lockres;
712 
713 	ocfs2_simple_drop_lockres(osb, lockres);
714 	ocfs2_lock_res_free(lockres);
715 
716 	mutex_unlock(&osb->obs_trim_fs_mutex);
717 }
718 
719 static void ocfs2_orphan_scan_lock_res_init(struct ocfs2_lock_res *res,
720 					    struct ocfs2_super *osb)
721 {
722 	ocfs2_lock_res_init_once(res);
723 	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_ORPHAN_SCAN, 0, 0, res->l_name);
724 	ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_ORPHAN_SCAN,
725 				   &ocfs2_orphan_scan_lops, osb);
726 }
727 
728 void ocfs2_file_lock_res_init(struct ocfs2_lock_res *lockres,
729 			      struct ocfs2_file_private *fp)
730 {
731 	struct inode *inode = fp->fp_file->f_mapping->host;
732 	struct ocfs2_inode_info *oi = OCFS2_I(inode);
733 
734 	ocfs2_lock_res_init_once(lockres);
735 	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_FLOCK, oi->ip_blkno,
736 			      inode->i_generation, lockres->l_name);
737 	ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
738 				   OCFS2_LOCK_TYPE_FLOCK, &ocfs2_flock_lops,
739 				   fp);
740 	lockres->l_flags |= OCFS2_LOCK_NOCACHE;
741 }
742 
743 void ocfs2_qinfo_lock_res_init(struct ocfs2_lock_res *lockres,
744 			       struct ocfs2_mem_dqinfo *info)
745 {
746 	ocfs2_lock_res_init_once(lockres);
747 	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_QINFO, info->dqi_gi.dqi_type,
748 			      0, lockres->l_name);
749 	ocfs2_lock_res_init_common(OCFS2_SB(info->dqi_gi.dqi_sb), lockres,
750 				   OCFS2_LOCK_TYPE_QINFO, &ocfs2_qinfo_lops,
751 				   info);
752 }
753 
754 void ocfs2_refcount_lock_res_init(struct ocfs2_lock_res *lockres,
755 				  struct ocfs2_super *osb, u64 ref_blkno,
756 				  unsigned int generation)
757 {
758 	ocfs2_lock_res_init_once(lockres);
759 	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_REFCOUNT, ref_blkno,
760 			      generation, lockres->l_name);
761 	ocfs2_lock_res_init_common(osb, lockres, OCFS2_LOCK_TYPE_REFCOUNT,
762 				   &ocfs2_refcount_block_lops, osb);
763 }
764 
765 void ocfs2_lock_res_free(struct ocfs2_lock_res *res)
766 {
767 	if (!(res->l_flags & OCFS2_LOCK_INITIALIZED))
768 		return;
769 
770 	ocfs2_remove_lockres_tracking(res);
771 
772 	mlog_bug_on_msg(!list_empty(&res->l_blocked_list),
773 			"Lockres %s is on the blocked list\n",
774 			res->l_name);
775 	mlog_bug_on_msg(!list_empty(&res->l_mask_waiters),
776 			"Lockres %s has mask waiters pending\n",
777 			res->l_name);
778 	mlog_bug_on_msg(spin_is_locked(&res->l_lock),
779 			"Lockres %s is locked\n",
780 			res->l_name);
781 	mlog_bug_on_msg(res->l_ro_holders,
782 			"Lockres %s has %u ro holders\n",
783 			res->l_name, res->l_ro_holders);
784 	mlog_bug_on_msg(res->l_ex_holders,
785 			"Lockres %s has %u ex holders\n",
786 			res->l_name, res->l_ex_holders);
787 
788 	/* Need to clear out the lock status block for the dlm */
789 	memset(&res->l_lksb, 0, sizeof(res->l_lksb));
790 
791 	res->l_flags = 0UL;
792 }
793 
794 /*
795  * Keep a list of processes who have interest in a lockres.
796  * Note: this is now only uesed for check recursive cluster locking.
797  */
798 static inline void ocfs2_add_holder(struct ocfs2_lock_res *lockres,
799 				   struct ocfs2_lock_holder *oh)
800 {
801 	INIT_LIST_HEAD(&oh->oh_list);
802 	oh->oh_owner_pid = get_pid(task_pid(current));
803 
804 	spin_lock(&lockres->l_lock);
805 	list_add_tail(&oh->oh_list, &lockres->l_holders);
806 	spin_unlock(&lockres->l_lock);
807 }
808 
809 static struct ocfs2_lock_holder *
810 ocfs2_pid_holder(struct ocfs2_lock_res *lockres,
811 		struct pid *pid)
812 {
813 	struct ocfs2_lock_holder *oh;
814 
815 	spin_lock(&lockres->l_lock);
816 	list_for_each_entry(oh, &lockres->l_holders, oh_list) {
817 		if (oh->oh_owner_pid == pid) {
818 			spin_unlock(&lockres->l_lock);
819 			return oh;
820 		}
821 	}
822 	spin_unlock(&lockres->l_lock);
823 	return NULL;
824 }
825 
826 static inline void ocfs2_remove_holder(struct ocfs2_lock_res *lockres,
827 				       struct ocfs2_lock_holder *oh)
828 {
829 	spin_lock(&lockres->l_lock);
830 	list_del(&oh->oh_list);
831 	spin_unlock(&lockres->l_lock);
832 
833 	put_pid(oh->oh_owner_pid);
834 }
835 
836 
837 static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres,
838 				     int level)
839 {
840 	BUG_ON(!lockres);
841 
842 	switch(level) {
843 	case DLM_LOCK_EX:
844 		lockres->l_ex_holders++;
845 		break;
846 	case DLM_LOCK_PR:
847 		lockres->l_ro_holders++;
848 		break;
849 	default:
850 		BUG();
851 	}
852 }
853 
854 static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres,
855 				     int level)
856 {
857 	BUG_ON(!lockres);
858 
859 	switch(level) {
860 	case DLM_LOCK_EX:
861 		BUG_ON(!lockres->l_ex_holders);
862 		lockres->l_ex_holders--;
863 		break;
864 	case DLM_LOCK_PR:
865 		BUG_ON(!lockres->l_ro_holders);
866 		lockres->l_ro_holders--;
867 		break;
868 	default:
869 		BUG();
870 	}
871 }
872 
873 /* WARNING: This function lives in a world where the only three lock
874  * levels are EX, PR, and NL. It *will* have to be adjusted when more
875  * lock types are added. */
876 static inline int ocfs2_highest_compat_lock_level(int level)
877 {
878 	int new_level = DLM_LOCK_EX;
879 
880 	if (level == DLM_LOCK_EX)
881 		new_level = DLM_LOCK_NL;
882 	else if (level == DLM_LOCK_PR)
883 		new_level = DLM_LOCK_PR;
884 	return new_level;
885 }
886 
887 static void lockres_set_flags(struct ocfs2_lock_res *lockres,
888 			      unsigned long newflags)
889 {
890 	struct ocfs2_mask_waiter *mw, *tmp;
891 
892  	assert_spin_locked(&lockres->l_lock);
893 
894 	lockres->l_flags = newflags;
895 
896 	list_for_each_entry_safe(mw, tmp, &lockres->l_mask_waiters, mw_item) {
897 		if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
898 			continue;
899 
900 		list_del_init(&mw->mw_item);
901 		mw->mw_status = 0;
902 		complete(&mw->mw_complete);
903 		ocfs2_track_lock_wait(lockres);
904 	}
905 }
906 static void lockres_or_flags(struct ocfs2_lock_res *lockres, unsigned long or)
907 {
908 	lockres_set_flags(lockres, lockres->l_flags | or);
909 }
910 static void lockres_clear_flags(struct ocfs2_lock_res *lockres,
911 				unsigned long clear)
912 {
913 	lockres_set_flags(lockres, lockres->l_flags & ~clear);
914 }
915 
916 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres)
917 {
918 	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
919 	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
920 	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
921 	BUG_ON(lockres->l_blocking <= DLM_LOCK_NL);
922 
923 	lockres->l_level = lockres->l_requested;
924 	if (lockres->l_level <=
925 	    ocfs2_highest_compat_lock_level(lockres->l_blocking)) {
926 		lockres->l_blocking = DLM_LOCK_NL;
927 		lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED);
928 	}
929 	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
930 }
931 
932 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres)
933 {
934 	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
935 	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
936 
937 	/* Convert from RO to EX doesn't really need anything as our
938 	 * information is already up to data. Convert from NL to
939 	 * *anything* however should mark ourselves as needing an
940 	 * update */
941 	if (lockres->l_level == DLM_LOCK_NL &&
942 	    lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
943 		lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
944 
945 	lockres->l_level = lockres->l_requested;
946 
947 	/*
948 	 * We set the OCFS2_LOCK_UPCONVERT_FINISHING flag before clearing
949 	 * the OCFS2_LOCK_BUSY flag to prevent the dc thread from
950 	 * downconverting the lock before the upconvert has fully completed.
951 	 * Do not prevent the dc thread from downconverting if NONBLOCK lock
952 	 * had already returned.
953 	 */
954 	if (!(lockres->l_flags & OCFS2_LOCK_NONBLOCK_FINISHED))
955 		lockres_or_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING);
956 	else
957 		lockres_clear_flags(lockres, OCFS2_LOCK_NONBLOCK_FINISHED);
958 
959 	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
960 }
961 
962 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres)
963 {
964 	BUG_ON((!(lockres->l_flags & OCFS2_LOCK_BUSY)));
965 	BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
966 
967 	if (lockres->l_requested > DLM_LOCK_NL &&
968 	    !(lockres->l_flags & OCFS2_LOCK_LOCAL) &&
969 	    lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
970 		lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
971 
972 	lockres->l_level = lockres->l_requested;
973 	lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED);
974 	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
975 }
976 
977 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres,
978 				     int level)
979 {
980 	int needs_downconvert = 0;
981 
982 	assert_spin_locked(&lockres->l_lock);
983 
984 	if (level > lockres->l_blocking) {
985 		/* only schedule a downconvert if we haven't already scheduled
986 		 * one that goes low enough to satisfy the level we're
987 		 * blocking.  this also catches the case where we get
988 		 * duplicate BASTs */
989 		if (ocfs2_highest_compat_lock_level(level) <
990 		    ocfs2_highest_compat_lock_level(lockres->l_blocking))
991 			needs_downconvert = 1;
992 
993 		lockres->l_blocking = level;
994 	}
995 
996 	mlog(ML_BASTS, "lockres %s, block %d, level %d, l_block %d, dwn %d\n",
997 	     lockres->l_name, level, lockres->l_level, lockres->l_blocking,
998 	     needs_downconvert);
999 
1000 	if (needs_downconvert)
1001 		lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
1002 	mlog(0, "needs_downconvert = %d\n", needs_downconvert);
1003 	return needs_downconvert;
1004 }
1005 
1006 /*
1007  * OCFS2_LOCK_PENDING and l_pending_gen.
1008  *
1009  * Why does OCFS2_LOCK_PENDING exist?  To close a race between setting
1010  * OCFS2_LOCK_BUSY and calling ocfs2_dlm_lock().  See ocfs2_unblock_lock()
1011  * for more details on the race.
1012  *
1013  * OCFS2_LOCK_PENDING closes the race quite nicely.  However, it introduces
1014  * a race on itself.  In o2dlm, we can get the ast before ocfs2_dlm_lock()
1015  * returns.  The ast clears OCFS2_LOCK_BUSY, and must therefore clear
1016  * OCFS2_LOCK_PENDING at the same time.  When ocfs2_dlm_lock() returns,
1017  * the caller is going to try to clear PENDING again.  If nothing else is
1018  * happening, __lockres_clear_pending() sees PENDING is unset and does
1019  * nothing.
1020  *
1021  * But what if another path (eg downconvert thread) has just started a
1022  * new locking action?  The other path has re-set PENDING.  Our path
1023  * cannot clear PENDING, because that will re-open the original race
1024  * window.
1025  *
1026  * [Example]
1027  *
1028  * ocfs2_meta_lock()
1029  *  ocfs2_cluster_lock()
1030  *   set BUSY
1031  *   set PENDING
1032  *   drop l_lock
1033  *   ocfs2_dlm_lock()
1034  *    ocfs2_locking_ast()		ocfs2_downconvert_thread()
1035  *     clear PENDING			 ocfs2_unblock_lock()
1036  *					  take_l_lock
1037  *					  !BUSY
1038  *					  ocfs2_prepare_downconvert()
1039  *					   set BUSY
1040  *					   set PENDING
1041  *					  drop l_lock
1042  *   take l_lock
1043  *   clear PENDING
1044  *   drop l_lock
1045  *			<window>
1046  *					  ocfs2_dlm_lock()
1047  *
1048  * So as you can see, we now have a window where l_lock is not held,
1049  * PENDING is not set, and ocfs2_dlm_lock() has not been called.
1050  *
1051  * The core problem is that ocfs2_cluster_lock() has cleared the PENDING
1052  * set by ocfs2_prepare_downconvert().  That wasn't nice.
1053  *
1054  * To solve this we introduce l_pending_gen.  A call to
1055  * lockres_clear_pending() will only do so when it is passed a generation
1056  * number that matches the lockres.  lockres_set_pending() will return the
1057  * current generation number.  When ocfs2_cluster_lock() goes to clear
1058  * PENDING, it passes the generation it got from set_pending().  In our
1059  * example above, the generation numbers will *not* match.  Thus,
1060  * ocfs2_cluster_lock() will not clear the PENDING set by
1061  * ocfs2_prepare_downconvert().
1062  */
1063 
1064 /* Unlocked version for ocfs2_locking_ast() */
1065 static void __lockres_clear_pending(struct ocfs2_lock_res *lockres,
1066 				    unsigned int generation,
1067 				    struct ocfs2_super *osb)
1068 {
1069 	assert_spin_locked(&lockres->l_lock);
1070 
1071 	/*
1072 	 * The ast and locking functions can race us here.  The winner
1073 	 * will clear pending, the loser will not.
1074 	 */
1075 	if (!(lockres->l_flags & OCFS2_LOCK_PENDING) ||
1076 	    (lockres->l_pending_gen != generation))
1077 		return;
1078 
1079 	lockres_clear_flags(lockres, OCFS2_LOCK_PENDING);
1080 	lockres->l_pending_gen++;
1081 
1082 	/*
1083 	 * The downconvert thread may have skipped us because we
1084 	 * were PENDING.  Wake it up.
1085 	 */
1086 	if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
1087 		ocfs2_wake_downconvert_thread(osb);
1088 }
1089 
1090 /* Locked version for callers of ocfs2_dlm_lock() */
1091 static void lockres_clear_pending(struct ocfs2_lock_res *lockres,
1092 				  unsigned int generation,
1093 				  struct ocfs2_super *osb)
1094 {
1095 	unsigned long flags;
1096 
1097 	spin_lock_irqsave(&lockres->l_lock, flags);
1098 	__lockres_clear_pending(lockres, generation, osb);
1099 	spin_unlock_irqrestore(&lockres->l_lock, flags);
1100 }
1101 
1102 static unsigned int lockres_set_pending(struct ocfs2_lock_res *lockres)
1103 {
1104 	assert_spin_locked(&lockres->l_lock);
1105 	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
1106 
1107 	lockres_or_flags(lockres, OCFS2_LOCK_PENDING);
1108 
1109 	return lockres->l_pending_gen;
1110 }
1111 
1112 static void ocfs2_blocking_ast(struct ocfs2_dlm_lksb *lksb, int level)
1113 {
1114 	struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
1115 	struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
1116 	int needs_downconvert;
1117 	unsigned long flags;
1118 
1119 	BUG_ON(level <= DLM_LOCK_NL);
1120 
1121 	mlog(ML_BASTS, "BAST fired for lockres %s, blocking %d, level %d, "
1122 	     "type %s\n", lockres->l_name, level, lockres->l_level,
1123 	     ocfs2_lock_type_string(lockres->l_type));
1124 
1125 	/*
1126 	 * We can skip the bast for locks which don't enable caching -
1127 	 * they'll be dropped at the earliest possible time anyway.
1128 	 */
1129 	if (lockres->l_flags & OCFS2_LOCK_NOCACHE)
1130 		return;
1131 
1132 	spin_lock_irqsave(&lockres->l_lock, flags);
1133 	needs_downconvert = ocfs2_generic_handle_bast(lockres, level);
1134 	if (needs_downconvert)
1135 		ocfs2_schedule_blocked_lock(osb, lockres);
1136 	spin_unlock_irqrestore(&lockres->l_lock, flags);
1137 
1138 	wake_up(&lockres->l_event);
1139 
1140 	ocfs2_wake_downconvert_thread(osb);
1141 }
1142 
1143 static void ocfs2_locking_ast(struct ocfs2_dlm_lksb *lksb)
1144 {
1145 	struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
1146 	struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
1147 	unsigned long flags;
1148 	int status;
1149 
1150 	spin_lock_irqsave(&lockres->l_lock, flags);
1151 
1152 	status = ocfs2_dlm_lock_status(&lockres->l_lksb);
1153 
1154 	if (status == -EAGAIN) {
1155 		lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
1156 		goto out;
1157 	}
1158 
1159 	if (status) {
1160 		mlog(ML_ERROR, "lockres %s: lksb status value of %d!\n",
1161 		     lockres->l_name, status);
1162 		spin_unlock_irqrestore(&lockres->l_lock, flags);
1163 		return;
1164 	}
1165 
1166 	mlog(ML_BASTS, "AST fired for lockres %s, action %d, unlock %d, "
1167 	     "level %d => %d\n", lockres->l_name, lockres->l_action,
1168 	     lockres->l_unlock_action, lockres->l_level, lockres->l_requested);
1169 
1170 	switch(lockres->l_action) {
1171 	case OCFS2_AST_ATTACH:
1172 		ocfs2_generic_handle_attach_action(lockres);
1173 		lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL);
1174 		break;
1175 	case OCFS2_AST_CONVERT:
1176 		ocfs2_generic_handle_convert_action(lockres);
1177 		break;
1178 	case OCFS2_AST_DOWNCONVERT:
1179 		ocfs2_generic_handle_downconvert_action(lockres);
1180 		break;
1181 	default:
1182 		mlog(ML_ERROR, "lockres %s: AST fired with invalid action: %u, "
1183 		     "flags 0x%lx, unlock: %u\n",
1184 		     lockres->l_name, lockres->l_action, lockres->l_flags,
1185 		     lockres->l_unlock_action);
1186 		BUG();
1187 	}
1188 out:
1189 	/* set it to something invalid so if we get called again we
1190 	 * can catch it. */
1191 	lockres->l_action = OCFS2_AST_INVALID;
1192 
1193 	/* Did we try to cancel this lock?  Clear that state */
1194 	if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT)
1195 		lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
1196 
1197 	/*
1198 	 * We may have beaten the locking functions here.  We certainly
1199 	 * know that dlm_lock() has been called :-)
1200 	 * Because we can't have two lock calls in flight at once, we
1201 	 * can use lockres->l_pending_gen.
1202 	 */
1203 	__lockres_clear_pending(lockres, lockres->l_pending_gen,  osb);
1204 
1205 	wake_up(&lockres->l_event);
1206 	spin_unlock_irqrestore(&lockres->l_lock, flags);
1207 }
1208 
1209 static void ocfs2_unlock_ast(struct ocfs2_dlm_lksb *lksb, int error)
1210 {
1211 	struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
1212 	unsigned long flags;
1213 
1214 	mlog(ML_BASTS, "UNLOCK AST fired for lockres %s, action = %d\n",
1215 	     lockres->l_name, lockres->l_unlock_action);
1216 
1217 	spin_lock_irqsave(&lockres->l_lock, flags);
1218 	if (error) {
1219 		mlog(ML_ERROR, "Dlm passes error %d for lock %s, "
1220 		     "unlock_action %d\n", error, lockres->l_name,
1221 		     lockres->l_unlock_action);
1222 		spin_unlock_irqrestore(&lockres->l_lock, flags);
1223 		return;
1224 	}
1225 
1226 	switch(lockres->l_unlock_action) {
1227 	case OCFS2_UNLOCK_CANCEL_CONVERT:
1228 		mlog(0, "Cancel convert success for %s\n", lockres->l_name);
1229 		lockres->l_action = OCFS2_AST_INVALID;
1230 		/* Downconvert thread may have requeued this lock, we
1231 		 * need to wake it. */
1232 		if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
1233 			ocfs2_wake_downconvert_thread(ocfs2_get_lockres_osb(lockres));
1234 		break;
1235 	case OCFS2_UNLOCK_DROP_LOCK:
1236 		lockres->l_level = DLM_LOCK_IV;
1237 		break;
1238 	default:
1239 		BUG();
1240 	}
1241 
1242 	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
1243 	lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
1244 	wake_up(&lockres->l_event);
1245 	spin_unlock_irqrestore(&lockres->l_lock, flags);
1246 }
1247 
1248 /*
1249  * This is the filesystem locking protocol.  It provides the lock handling
1250  * hooks for the underlying DLM.  It has a maximum version number.
1251  * The version number allows interoperability with systems running at
1252  * the same major number and an equal or smaller minor number.
1253  *
1254  * Whenever the filesystem does new things with locks (adds or removes a
1255  * lock, orders them differently, does different things underneath a lock),
1256  * the version must be changed.  The protocol is negotiated when joining
1257  * the dlm domain.  A node may join the domain if its major version is
1258  * identical to all other nodes and its minor version is greater than
1259  * or equal to all other nodes.  When its minor version is greater than
1260  * the other nodes, it will run at the minor version specified by the
1261  * other nodes.
1262  *
1263  * If a locking change is made that will not be compatible with older
1264  * versions, the major number must be increased and the minor version set
1265  * to zero.  If a change merely adds a behavior that can be disabled when
1266  * speaking to older versions, the minor version must be increased.  If a
1267  * change adds a fully backwards compatible change (eg, LVB changes that
1268  * are just ignored by older versions), the version does not need to be
1269  * updated.
1270  */
1271 static struct ocfs2_locking_protocol lproto = {
1272 	.lp_max_version = {
1273 		.pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
1274 		.pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
1275 	},
1276 	.lp_lock_ast		= ocfs2_locking_ast,
1277 	.lp_blocking_ast	= ocfs2_blocking_ast,
1278 	.lp_unlock_ast		= ocfs2_unlock_ast,
1279 };
1280 
1281 void ocfs2_set_locking_protocol(void)
1282 {
1283 	ocfs2_stack_glue_set_max_proto_version(&lproto.lp_max_version);
1284 }
1285 
1286 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
1287 						int convert)
1288 {
1289 	unsigned long flags;
1290 
1291 	spin_lock_irqsave(&lockres->l_lock, flags);
1292 	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
1293 	lockres_clear_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING);
1294 	if (convert)
1295 		lockres->l_action = OCFS2_AST_INVALID;
1296 	else
1297 		lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
1298 	spin_unlock_irqrestore(&lockres->l_lock, flags);
1299 
1300 	wake_up(&lockres->l_event);
1301 }
1302 
1303 /* Note: If we detect another process working on the lock (i.e.,
1304  * OCFS2_LOCK_BUSY), we'll bail out returning 0. It's up to the caller
1305  * to do the right thing in that case.
1306  */
1307 static int ocfs2_lock_create(struct ocfs2_super *osb,
1308 			     struct ocfs2_lock_res *lockres,
1309 			     int level,
1310 			     u32 dlm_flags)
1311 {
1312 	int ret = 0;
1313 	unsigned long flags;
1314 	unsigned int gen;
1315 
1316 	mlog(0, "lock %s, level = %d, flags = %u\n", lockres->l_name, level,
1317 	     dlm_flags);
1318 
1319 	spin_lock_irqsave(&lockres->l_lock, flags);
1320 	if ((lockres->l_flags & OCFS2_LOCK_ATTACHED) ||
1321 	    (lockres->l_flags & OCFS2_LOCK_BUSY)) {
1322 		spin_unlock_irqrestore(&lockres->l_lock, flags);
1323 		goto bail;
1324 	}
1325 
1326 	lockres->l_action = OCFS2_AST_ATTACH;
1327 	lockres->l_requested = level;
1328 	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
1329 	gen = lockres_set_pending(lockres);
1330 	spin_unlock_irqrestore(&lockres->l_lock, flags);
1331 
1332 	ret = ocfs2_dlm_lock(osb->cconn,
1333 			     level,
1334 			     &lockres->l_lksb,
1335 			     dlm_flags,
1336 			     lockres->l_name,
1337 			     OCFS2_LOCK_ID_MAX_LEN - 1);
1338 	lockres_clear_pending(lockres, gen, osb);
1339 	if (ret) {
1340 		ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
1341 		ocfs2_recover_from_dlm_error(lockres, 1);
1342 	}
1343 
1344 	mlog(0, "lock %s, return from ocfs2_dlm_lock\n", lockres->l_name);
1345 
1346 bail:
1347 	return ret;
1348 }
1349 
1350 static inline int ocfs2_check_wait_flag(struct ocfs2_lock_res *lockres,
1351 					int flag)
1352 {
1353 	unsigned long flags;
1354 	int ret;
1355 
1356 	spin_lock_irqsave(&lockres->l_lock, flags);
1357 	ret = lockres->l_flags & flag;
1358 	spin_unlock_irqrestore(&lockres->l_lock, flags);
1359 
1360 	return ret;
1361 }
1362 
1363 static inline void ocfs2_wait_on_busy_lock(struct ocfs2_lock_res *lockres)
1364 
1365 {
1366 	wait_event(lockres->l_event,
1367 		   !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_BUSY));
1368 }
1369 
1370 static inline void ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res *lockres)
1371 
1372 {
1373 	wait_event(lockres->l_event,
1374 		   !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_REFRESHING));
1375 }
1376 
1377 /* predict what lock level we'll be dropping down to on behalf
1378  * of another node, and return true if the currently wanted
1379  * level will be compatible with it. */
1380 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
1381 						     int wanted)
1382 {
1383 	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
1384 
1385 	return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking);
1386 }
1387 
1388 static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw)
1389 {
1390 	INIT_LIST_HEAD(&mw->mw_item);
1391 	init_completion(&mw->mw_complete);
1392 	ocfs2_init_start_time(mw);
1393 }
1394 
1395 static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw)
1396 {
1397 	wait_for_completion(&mw->mw_complete);
1398 	/* Re-arm the completion in case we want to wait on it again */
1399 	reinit_completion(&mw->mw_complete);
1400 	return mw->mw_status;
1401 }
1402 
1403 static void lockres_add_mask_waiter(struct ocfs2_lock_res *lockres,
1404 				    struct ocfs2_mask_waiter *mw,
1405 				    unsigned long mask,
1406 				    unsigned long goal)
1407 {
1408 	BUG_ON(!list_empty(&mw->mw_item));
1409 
1410 	assert_spin_locked(&lockres->l_lock);
1411 
1412 	list_add_tail(&mw->mw_item, &lockres->l_mask_waiters);
1413 	mw->mw_mask = mask;
1414 	mw->mw_goal = goal;
1415 	ocfs2_track_lock_wait(lockres);
1416 }
1417 
1418 /* returns 0 if the mw that was removed was already satisfied, -EBUSY
1419  * if the mask still hadn't reached its goal */
1420 static int __lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres,
1421 				      struct ocfs2_mask_waiter *mw)
1422 {
1423 	int ret = 0;
1424 
1425 	assert_spin_locked(&lockres->l_lock);
1426 	if (!list_empty(&mw->mw_item)) {
1427 		if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
1428 			ret = -EBUSY;
1429 
1430 		list_del_init(&mw->mw_item);
1431 		init_completion(&mw->mw_complete);
1432 		ocfs2_track_lock_wait(lockres);
1433 	}
1434 
1435 	return ret;
1436 }
1437 
1438 static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres,
1439 				      struct ocfs2_mask_waiter *mw)
1440 {
1441 	unsigned long flags;
1442 	int ret = 0;
1443 
1444 	spin_lock_irqsave(&lockres->l_lock, flags);
1445 	ret = __lockres_remove_mask_waiter(lockres, mw);
1446 	spin_unlock_irqrestore(&lockres->l_lock, flags);
1447 
1448 	return ret;
1449 
1450 }
1451 
1452 static int ocfs2_wait_for_mask_interruptible(struct ocfs2_mask_waiter *mw,
1453 					     struct ocfs2_lock_res *lockres)
1454 {
1455 	int ret;
1456 
1457 	ret = wait_for_completion_interruptible(&mw->mw_complete);
1458 	if (ret)
1459 		lockres_remove_mask_waiter(lockres, mw);
1460 	else
1461 		ret = mw->mw_status;
1462 	/* Re-arm the completion in case we want to wait on it again */
1463 	reinit_completion(&mw->mw_complete);
1464 	return ret;
1465 }
1466 
1467 static int __ocfs2_cluster_lock(struct ocfs2_super *osb,
1468 				struct ocfs2_lock_res *lockres,
1469 				int level,
1470 				u32 lkm_flags,
1471 				int arg_flags,
1472 				int l_subclass,
1473 				unsigned long caller_ip)
1474 {
1475 	struct ocfs2_mask_waiter mw;
1476 	int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR);
1477 	int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */
1478 	unsigned long flags;
1479 	unsigned int gen;
1480 	int noqueue_attempted = 0;
1481 	int dlm_locked = 0;
1482 	int kick_dc = 0;
1483 
1484 	if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED)) {
1485 		mlog_errno(-EINVAL);
1486 		return -EINVAL;
1487 	}
1488 
1489 	ocfs2_init_mask_waiter(&mw);
1490 
1491 	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
1492 		lkm_flags |= DLM_LKF_VALBLK;
1493 
1494 again:
1495 	wait = 0;
1496 
1497 	spin_lock_irqsave(&lockres->l_lock, flags);
1498 
1499 	if (catch_signals && signal_pending(current)) {
1500 		ret = -ERESTARTSYS;
1501 		goto unlock;
1502 	}
1503 
1504 	mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING,
1505 			"Cluster lock called on freeing lockres %s! flags "
1506 			"0x%lx\n", lockres->l_name, lockres->l_flags);
1507 
1508 	/* We only compare against the currently granted level
1509 	 * here. If the lock is blocked waiting on a downconvert,
1510 	 * we'll get caught below. */
1511 	if (lockres->l_flags & OCFS2_LOCK_BUSY &&
1512 	    level > lockres->l_level) {
1513 		/* is someone sitting in dlm_lock? If so, wait on
1514 		 * them. */
1515 		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1516 		wait = 1;
1517 		goto unlock;
1518 	}
1519 
1520 	if (lockres->l_flags & OCFS2_LOCK_UPCONVERT_FINISHING) {
1521 		/*
1522 		 * We've upconverted. If the lock now has a level we can
1523 		 * work with, we take it. If, however, the lock is not at the
1524 		 * required level, we go thru the full cycle. One way this could
1525 		 * happen is if a process requesting an upconvert to PR is
1526 		 * closely followed by another requesting upconvert to an EX.
1527 		 * If the process requesting EX lands here, we want it to
1528 		 * continue attempting to upconvert and let the process
1529 		 * requesting PR take the lock.
1530 		 * If multiple processes request upconvert to PR, the first one
1531 		 * here will take the lock. The others will have to go thru the
1532 		 * OCFS2_LOCK_BLOCKED check to ensure that there is no pending
1533 		 * downconvert request.
1534 		 */
1535 		if (level <= lockres->l_level)
1536 			goto update_holders;
1537 	}
1538 
1539 	if (lockres->l_flags & OCFS2_LOCK_BLOCKED &&
1540 	    !ocfs2_may_continue_on_blocked_lock(lockres, level)) {
1541 		/* is the lock is currently blocked on behalf of
1542 		 * another node */
1543 		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BLOCKED, 0);
1544 		wait = 1;
1545 		goto unlock;
1546 	}
1547 
1548 	if (level > lockres->l_level) {
1549 		if (noqueue_attempted > 0) {
1550 			ret = -EAGAIN;
1551 			goto unlock;
1552 		}
1553 		if (lkm_flags & DLM_LKF_NOQUEUE)
1554 			noqueue_attempted = 1;
1555 
1556 		if (lockres->l_action != OCFS2_AST_INVALID)
1557 			mlog(ML_ERROR, "lockres %s has action %u pending\n",
1558 			     lockres->l_name, lockres->l_action);
1559 
1560 		if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
1561 			lockres->l_action = OCFS2_AST_ATTACH;
1562 			lkm_flags &= ~DLM_LKF_CONVERT;
1563 		} else {
1564 			lockres->l_action = OCFS2_AST_CONVERT;
1565 			lkm_flags |= DLM_LKF_CONVERT;
1566 		}
1567 
1568 		lockres->l_requested = level;
1569 		lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
1570 		gen = lockres_set_pending(lockres);
1571 		spin_unlock_irqrestore(&lockres->l_lock, flags);
1572 
1573 		BUG_ON(level == DLM_LOCK_IV);
1574 		BUG_ON(level == DLM_LOCK_NL);
1575 
1576 		mlog(ML_BASTS, "lockres %s, convert from %d to %d\n",
1577 		     lockres->l_name, lockres->l_level, level);
1578 
1579 		/* call dlm_lock to upgrade lock now */
1580 		ret = ocfs2_dlm_lock(osb->cconn,
1581 				     level,
1582 				     &lockres->l_lksb,
1583 				     lkm_flags,
1584 				     lockres->l_name,
1585 				     OCFS2_LOCK_ID_MAX_LEN - 1);
1586 		lockres_clear_pending(lockres, gen, osb);
1587 		if (ret) {
1588 			if (!(lkm_flags & DLM_LKF_NOQUEUE) ||
1589 			    (ret != -EAGAIN)) {
1590 				ocfs2_log_dlm_error("ocfs2_dlm_lock",
1591 						    ret, lockres);
1592 			}
1593 			ocfs2_recover_from_dlm_error(lockres, 1);
1594 			goto out;
1595 		}
1596 		dlm_locked = 1;
1597 
1598 		mlog(0, "lock %s, successful return from ocfs2_dlm_lock\n",
1599 		     lockres->l_name);
1600 
1601 		/* At this point we've gone inside the dlm and need to
1602 		 * complete our work regardless. */
1603 		catch_signals = 0;
1604 
1605 		/* wait for busy to clear and carry on */
1606 		goto again;
1607 	}
1608 
1609 update_holders:
1610 	/* Ok, if we get here then we're good to go. */
1611 	ocfs2_inc_holders(lockres, level);
1612 
1613 	ret = 0;
1614 unlock:
1615 	lockres_clear_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING);
1616 
1617 	/* ocfs2_unblock_lock reques on seeing OCFS2_LOCK_UPCONVERT_FINISHING */
1618 	kick_dc = (lockres->l_flags & OCFS2_LOCK_BLOCKED);
1619 
1620 	spin_unlock_irqrestore(&lockres->l_lock, flags);
1621 	if (kick_dc)
1622 		ocfs2_wake_downconvert_thread(osb);
1623 out:
1624 	/*
1625 	 * This is helping work around a lock inversion between the page lock
1626 	 * and dlm locks.  One path holds the page lock while calling aops
1627 	 * which block acquiring dlm locks.  The voting thread holds dlm
1628 	 * locks while acquiring page locks while down converting data locks.
1629 	 * This block is helping an aop path notice the inversion and back
1630 	 * off to unlock its page lock before trying the dlm lock again.
1631 	 */
1632 	if (wait && arg_flags & OCFS2_LOCK_NONBLOCK &&
1633 	    mw.mw_mask & (OCFS2_LOCK_BUSY|OCFS2_LOCK_BLOCKED)) {
1634 		wait = 0;
1635 		spin_lock_irqsave(&lockres->l_lock, flags);
1636 		if (__lockres_remove_mask_waiter(lockres, &mw)) {
1637 			if (dlm_locked)
1638 				lockres_or_flags(lockres,
1639 					OCFS2_LOCK_NONBLOCK_FINISHED);
1640 			spin_unlock_irqrestore(&lockres->l_lock, flags);
1641 			ret = -EAGAIN;
1642 		} else {
1643 			spin_unlock_irqrestore(&lockres->l_lock, flags);
1644 			goto again;
1645 		}
1646 	}
1647 	if (wait) {
1648 		ret = ocfs2_wait_for_mask(&mw);
1649 		if (ret == 0)
1650 			goto again;
1651 		mlog_errno(ret);
1652 	}
1653 	ocfs2_update_lock_stats(lockres, level, &mw, ret);
1654 
1655 #ifdef CONFIG_DEBUG_LOCK_ALLOC
1656 	if (!ret && lockres->l_lockdep_map.key != NULL) {
1657 		if (level == DLM_LOCK_PR)
1658 			rwsem_acquire_read(&lockres->l_lockdep_map, l_subclass,
1659 				!!(arg_flags & OCFS2_META_LOCK_NOQUEUE),
1660 				caller_ip);
1661 		else
1662 			rwsem_acquire(&lockres->l_lockdep_map, l_subclass,
1663 				!!(arg_flags & OCFS2_META_LOCK_NOQUEUE),
1664 				caller_ip);
1665 	}
1666 #endif
1667 	return ret;
1668 }
1669 
1670 static inline int ocfs2_cluster_lock(struct ocfs2_super *osb,
1671 				     struct ocfs2_lock_res *lockres,
1672 				     int level,
1673 				     u32 lkm_flags,
1674 				     int arg_flags)
1675 {
1676 	return __ocfs2_cluster_lock(osb, lockres, level, lkm_flags, arg_flags,
1677 				    0, _RET_IP_);
1678 }
1679 
1680 
1681 static void __ocfs2_cluster_unlock(struct ocfs2_super *osb,
1682 				   struct ocfs2_lock_res *lockres,
1683 				   int level,
1684 				   unsigned long caller_ip)
1685 {
1686 	unsigned long flags;
1687 
1688 	spin_lock_irqsave(&lockres->l_lock, flags);
1689 	ocfs2_dec_holders(lockres, level);
1690 	ocfs2_downconvert_on_unlock(osb, lockres);
1691 	spin_unlock_irqrestore(&lockres->l_lock, flags);
1692 #ifdef CONFIG_DEBUG_LOCK_ALLOC
1693 	if (lockres->l_lockdep_map.key != NULL)
1694 		rwsem_release(&lockres->l_lockdep_map, caller_ip);
1695 #endif
1696 }
1697 
1698 static int ocfs2_create_new_lock(struct ocfs2_super *osb,
1699 				 struct ocfs2_lock_res *lockres,
1700 				 int ex,
1701 				 int local)
1702 {
1703 	int level =  ex ? DLM_LOCK_EX : DLM_LOCK_PR;
1704 	unsigned long flags;
1705 	u32 lkm_flags = local ? DLM_LKF_LOCAL : 0;
1706 
1707 	spin_lock_irqsave(&lockres->l_lock, flags);
1708 	BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
1709 	lockres_or_flags(lockres, OCFS2_LOCK_LOCAL);
1710 	spin_unlock_irqrestore(&lockres->l_lock, flags);
1711 
1712 	return ocfs2_lock_create(osb, lockres, level, lkm_flags);
1713 }
1714 
1715 /* Grants us an EX lock on the data and metadata resources, skipping
1716  * the normal cluster directory lookup. Use this ONLY on newly created
1717  * inodes which other nodes can't possibly see, and which haven't been
1718  * hashed in the inode hash yet. This can give us a good performance
1719  * increase as it'll skip the network broadcast normally associated
1720  * with creating a new lock resource. */
1721 int ocfs2_create_new_inode_locks(struct inode *inode)
1722 {
1723 	int ret;
1724 	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1725 
1726 	BUG_ON(!ocfs2_inode_is_new(inode));
1727 
1728 	mlog(0, "Inode %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno);
1729 
1730 	/* NOTE: That we don't increment any of the holder counts, nor
1731 	 * do we add anything to a journal handle. Since this is
1732 	 * supposed to be a new inode which the cluster doesn't know
1733 	 * about yet, there is no need to.  As far as the LVB handling
1734 	 * is concerned, this is basically like acquiring an EX lock
1735 	 * on a resource which has an invalid one -- we'll set it
1736 	 * valid when we release the EX. */
1737 
1738 	ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1, 1);
1739 	if (ret) {
1740 		mlog_errno(ret);
1741 		goto bail;
1742 	}
1743 
1744 	/*
1745 	 * We don't want to use DLM_LKF_LOCAL on a meta data lock as they
1746 	 * don't use a generation in their lock names.
1747 	 */
1748 	ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_inode_lockres, 1, 0);
1749 	if (ret) {
1750 		mlog_errno(ret);
1751 		goto bail;
1752 	}
1753 
1754 	ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_open_lockres, 0, 0);
1755 	if (ret)
1756 		mlog_errno(ret);
1757 
1758 bail:
1759 	return ret;
1760 }
1761 
1762 int ocfs2_rw_lock(struct inode *inode, int write)
1763 {
1764 	int status, level;
1765 	struct ocfs2_lock_res *lockres;
1766 	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1767 
1768 	mlog(0, "inode %llu take %s RW lock\n",
1769 	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1770 	     write ? "EXMODE" : "PRMODE");
1771 
1772 	if (ocfs2_mount_local(osb))
1773 		return 0;
1774 
1775 	lockres = &OCFS2_I(inode)->ip_rw_lockres;
1776 
1777 	level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
1778 
1779 	status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
1780 	if (status < 0)
1781 		mlog_errno(status);
1782 
1783 	return status;
1784 }
1785 
1786 int ocfs2_try_rw_lock(struct inode *inode, int write)
1787 {
1788 	int status, level;
1789 	struct ocfs2_lock_res *lockres;
1790 	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1791 
1792 	mlog(0, "inode %llu try to take %s RW lock\n",
1793 	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1794 	     write ? "EXMODE" : "PRMODE");
1795 
1796 	if (ocfs2_mount_local(osb))
1797 		return 0;
1798 
1799 	lockres = &OCFS2_I(inode)->ip_rw_lockres;
1800 
1801 	level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
1802 
1803 	status = ocfs2_cluster_lock(osb, lockres, level, DLM_LKF_NOQUEUE, 0);
1804 	return status;
1805 }
1806 
1807 void ocfs2_rw_unlock(struct inode *inode, int write)
1808 {
1809 	int level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
1810 	struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres;
1811 	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1812 
1813 	mlog(0, "inode %llu drop %s RW lock\n",
1814 	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1815 	     write ? "EXMODE" : "PRMODE");
1816 
1817 	if (!ocfs2_mount_local(osb))
1818 		ocfs2_cluster_unlock(osb, lockres, level);
1819 }
1820 
1821 /*
1822  * ocfs2_open_lock always get PR mode lock.
1823  */
1824 int ocfs2_open_lock(struct inode *inode)
1825 {
1826 	int status = 0;
1827 	struct ocfs2_lock_res *lockres;
1828 	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1829 
1830 	mlog(0, "inode %llu take PRMODE open lock\n",
1831 	     (unsigned long long)OCFS2_I(inode)->ip_blkno);
1832 
1833 	if (ocfs2_is_hard_readonly(osb) || ocfs2_mount_local(osb))
1834 		goto out;
1835 
1836 	lockres = &OCFS2_I(inode)->ip_open_lockres;
1837 
1838 	status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_PR, 0, 0);
1839 	if (status < 0)
1840 		mlog_errno(status);
1841 
1842 out:
1843 	return status;
1844 }
1845 
1846 int ocfs2_try_open_lock(struct inode *inode, int write)
1847 {
1848 	int status = 0, level;
1849 	struct ocfs2_lock_res *lockres;
1850 	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1851 
1852 	mlog(0, "inode %llu try to take %s open lock\n",
1853 	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1854 	     write ? "EXMODE" : "PRMODE");
1855 
1856 	if (ocfs2_is_hard_readonly(osb)) {
1857 		if (write)
1858 			status = -EROFS;
1859 		goto out;
1860 	}
1861 
1862 	if (ocfs2_mount_local(osb))
1863 		goto out;
1864 
1865 	lockres = &OCFS2_I(inode)->ip_open_lockres;
1866 
1867 	level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
1868 
1869 	/*
1870 	 * The file system may already holding a PRMODE/EXMODE open lock.
1871 	 * Since we pass DLM_LKF_NOQUEUE, the request won't block waiting on
1872 	 * other nodes and the -EAGAIN will indicate to the caller that
1873 	 * this inode is still in use.
1874 	 */
1875 	status = ocfs2_cluster_lock(osb, lockres, level, DLM_LKF_NOQUEUE, 0);
1876 
1877 out:
1878 	return status;
1879 }
1880 
1881 /*
1882  * ocfs2_open_unlock unlock PR and EX mode open locks.
1883  */
1884 void ocfs2_open_unlock(struct inode *inode)
1885 {
1886 	struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_open_lockres;
1887 	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1888 
1889 	mlog(0, "inode %llu drop open lock\n",
1890 	     (unsigned long long)OCFS2_I(inode)->ip_blkno);
1891 
1892 	if (ocfs2_mount_local(osb))
1893 		goto out;
1894 
1895 	if(lockres->l_ro_holders)
1896 		ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_PR);
1897 	if(lockres->l_ex_holders)
1898 		ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX);
1899 
1900 out:
1901 	return;
1902 }
1903 
1904 static int ocfs2_flock_handle_signal(struct ocfs2_lock_res *lockres,
1905 				     int level)
1906 {
1907 	int ret;
1908 	struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
1909 	unsigned long flags;
1910 	struct ocfs2_mask_waiter mw;
1911 
1912 	ocfs2_init_mask_waiter(&mw);
1913 
1914 retry_cancel:
1915 	spin_lock_irqsave(&lockres->l_lock, flags);
1916 	if (lockres->l_flags & OCFS2_LOCK_BUSY) {
1917 		ret = ocfs2_prepare_cancel_convert(osb, lockres);
1918 		if (ret) {
1919 			spin_unlock_irqrestore(&lockres->l_lock, flags);
1920 			ret = ocfs2_cancel_convert(osb, lockres);
1921 			if (ret < 0) {
1922 				mlog_errno(ret);
1923 				goto out;
1924 			}
1925 			goto retry_cancel;
1926 		}
1927 		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1928 		spin_unlock_irqrestore(&lockres->l_lock, flags);
1929 
1930 		ocfs2_wait_for_mask(&mw);
1931 		goto retry_cancel;
1932 	}
1933 
1934 	ret = -ERESTARTSYS;
1935 	/*
1936 	 * We may still have gotten the lock, in which case there's no
1937 	 * point to restarting the syscall.
1938 	 */
1939 	if (lockres->l_level == level)
1940 		ret = 0;
1941 
1942 	mlog(0, "Cancel returning %d. flags: 0x%lx, level: %d, act: %d\n", ret,
1943 	     lockres->l_flags, lockres->l_level, lockres->l_action);
1944 
1945 	spin_unlock_irqrestore(&lockres->l_lock, flags);
1946 
1947 out:
1948 	return ret;
1949 }
1950 
1951 /*
1952  * ocfs2_file_lock() and ocfs2_file_unlock() map to a single pair of
1953  * flock() calls. The locking approach this requires is sufficiently
1954  * different from all other cluster lock types that we implement a
1955  * separate path to the "low-level" dlm calls. In particular:
1956  *
1957  * - No optimization of lock levels is done - we take at exactly
1958  *   what's been requested.
1959  *
1960  * - No lock caching is employed. We immediately downconvert to
1961  *   no-lock at unlock time. This also means flock locks never go on
1962  *   the blocking list).
1963  *
1964  * - Since userspace can trivially deadlock itself with flock, we make
1965  *   sure to allow cancellation of a misbehaving applications flock()
1966  *   request.
1967  *
1968  * - Access to any flock lockres doesn't require concurrency, so we
1969  *   can simplify the code by requiring the caller to guarantee
1970  *   serialization of dlmglue flock calls.
1971  */
1972 int ocfs2_file_lock(struct file *file, int ex, int trylock)
1973 {
1974 	int ret, level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
1975 	unsigned int lkm_flags = trylock ? DLM_LKF_NOQUEUE : 0;
1976 	unsigned long flags;
1977 	struct ocfs2_file_private *fp = file->private_data;
1978 	struct ocfs2_lock_res *lockres = &fp->fp_flock;
1979 	struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb);
1980 	struct ocfs2_mask_waiter mw;
1981 
1982 	ocfs2_init_mask_waiter(&mw);
1983 
1984 	if ((lockres->l_flags & OCFS2_LOCK_BUSY) ||
1985 	    (lockres->l_level > DLM_LOCK_NL)) {
1986 		mlog(ML_ERROR,
1987 		     "File lock \"%s\" has busy or locked state: flags: 0x%lx, "
1988 		     "level: %u\n", lockres->l_name, lockres->l_flags,
1989 		     lockres->l_level);
1990 		return -EINVAL;
1991 	}
1992 
1993 	spin_lock_irqsave(&lockres->l_lock, flags);
1994 	if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
1995 		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1996 		spin_unlock_irqrestore(&lockres->l_lock, flags);
1997 
1998 		/*
1999 		 * Get the lock at NLMODE to start - that way we
2000 		 * can cancel the upconvert request if need be.
2001 		 */
2002 		ret = ocfs2_lock_create(osb, lockres, DLM_LOCK_NL, 0);
2003 		if (ret < 0) {
2004 			mlog_errno(ret);
2005 			goto out;
2006 		}
2007 
2008 		ret = ocfs2_wait_for_mask(&mw);
2009 		if (ret) {
2010 			mlog_errno(ret);
2011 			goto out;
2012 		}
2013 		spin_lock_irqsave(&lockres->l_lock, flags);
2014 	}
2015 
2016 	lockres->l_action = OCFS2_AST_CONVERT;
2017 	lkm_flags |= DLM_LKF_CONVERT;
2018 	lockres->l_requested = level;
2019 	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
2020 
2021 	lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
2022 	spin_unlock_irqrestore(&lockres->l_lock, flags);
2023 
2024 	ret = ocfs2_dlm_lock(osb->cconn, level, &lockres->l_lksb, lkm_flags,
2025 			     lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1);
2026 	if (ret) {
2027 		if (!trylock || (ret != -EAGAIN)) {
2028 			ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
2029 			ret = -EINVAL;
2030 		}
2031 
2032 		ocfs2_recover_from_dlm_error(lockres, 1);
2033 		lockres_remove_mask_waiter(lockres, &mw);
2034 		goto out;
2035 	}
2036 
2037 	ret = ocfs2_wait_for_mask_interruptible(&mw, lockres);
2038 	if (ret == -ERESTARTSYS) {
2039 		/*
2040 		 * Userspace can cause deadlock itself with
2041 		 * flock(). Current behavior locally is to allow the
2042 		 * deadlock, but abort the system call if a signal is
2043 		 * received. We follow this example, otherwise a
2044 		 * poorly written program could sit in kernel until
2045 		 * reboot.
2046 		 *
2047 		 * Handling this is a bit more complicated for Ocfs2
2048 		 * though. We can't exit this function with an
2049 		 * outstanding lock request, so a cancel convert is
2050 		 * required. We intentionally overwrite 'ret' - if the
2051 		 * cancel fails and the lock was granted, it's easier
2052 		 * to just bubble success back up to the user.
2053 		 */
2054 		ret = ocfs2_flock_handle_signal(lockres, level);
2055 	} else if (!ret && (level > lockres->l_level)) {
2056 		/* Trylock failed asynchronously */
2057 		BUG_ON(!trylock);
2058 		ret = -EAGAIN;
2059 	}
2060 
2061 out:
2062 
2063 	mlog(0, "Lock: \"%s\" ex: %d, trylock: %d, returns: %d\n",
2064 	     lockres->l_name, ex, trylock, ret);
2065 	return ret;
2066 }
2067 
2068 void ocfs2_file_unlock(struct file *file)
2069 {
2070 	int ret;
2071 	unsigned int gen;
2072 	unsigned long flags;
2073 	struct ocfs2_file_private *fp = file->private_data;
2074 	struct ocfs2_lock_res *lockres = &fp->fp_flock;
2075 	struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb);
2076 	struct ocfs2_mask_waiter mw;
2077 
2078 	ocfs2_init_mask_waiter(&mw);
2079 
2080 	if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED))
2081 		return;
2082 
2083 	if (lockres->l_level == DLM_LOCK_NL)
2084 		return;
2085 
2086 	mlog(0, "Unlock: \"%s\" flags: 0x%lx, level: %d, act: %d\n",
2087 	     lockres->l_name, lockres->l_flags, lockres->l_level,
2088 	     lockres->l_action);
2089 
2090 	spin_lock_irqsave(&lockres->l_lock, flags);
2091 	/*
2092 	 * Fake a blocking ast for the downconvert code.
2093 	 */
2094 	lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
2095 	lockres->l_blocking = DLM_LOCK_EX;
2096 
2097 	gen = ocfs2_prepare_downconvert(lockres, DLM_LOCK_NL);
2098 	lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
2099 	spin_unlock_irqrestore(&lockres->l_lock, flags);
2100 
2101 	ret = ocfs2_downconvert_lock(osb, lockres, DLM_LOCK_NL, 0, gen);
2102 	if (ret) {
2103 		mlog_errno(ret);
2104 		return;
2105 	}
2106 
2107 	ret = ocfs2_wait_for_mask(&mw);
2108 	if (ret)
2109 		mlog_errno(ret);
2110 }
2111 
2112 static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb,
2113 					struct ocfs2_lock_res *lockres)
2114 {
2115 	int kick = 0;
2116 
2117 	/* If we know that another node is waiting on our lock, kick
2118 	 * the downconvert thread * pre-emptively when we reach a release
2119 	 * condition. */
2120 	if (lockres->l_flags & OCFS2_LOCK_BLOCKED) {
2121 		switch(lockres->l_blocking) {
2122 		case DLM_LOCK_EX:
2123 			if (!lockres->l_ex_holders && !lockres->l_ro_holders)
2124 				kick = 1;
2125 			break;
2126 		case DLM_LOCK_PR:
2127 			if (!lockres->l_ex_holders)
2128 				kick = 1;
2129 			break;
2130 		default:
2131 			BUG();
2132 		}
2133 	}
2134 
2135 	if (kick)
2136 		ocfs2_wake_downconvert_thread(osb);
2137 }
2138 
2139 #define OCFS2_SEC_BITS   34
2140 #define OCFS2_SEC_SHIFT  (64 - OCFS2_SEC_BITS)
2141 #define OCFS2_NSEC_MASK  ((1ULL << OCFS2_SEC_SHIFT) - 1)
2142 
2143 /* LVB only has room for 64 bits of time here so we pack it for
2144  * now. */
2145 static u64 ocfs2_pack_timespec(struct timespec64 *spec)
2146 {
2147 	u64 res;
2148 	u64 sec = clamp_t(time64_t, spec->tv_sec, 0, 0x3ffffffffull);
2149 	u32 nsec = spec->tv_nsec;
2150 
2151 	res = (sec << OCFS2_SEC_SHIFT) | (nsec & OCFS2_NSEC_MASK);
2152 
2153 	return res;
2154 }
2155 
2156 /* Call this with the lockres locked. I am reasonably sure we don't
2157  * need ip_lock in this function as anyone who would be changing those
2158  * values is supposed to be blocked in ocfs2_inode_lock right now. */
2159 static void __ocfs2_stuff_meta_lvb(struct inode *inode)
2160 {
2161 	struct ocfs2_inode_info *oi = OCFS2_I(inode);
2162 	struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
2163 	struct ocfs2_meta_lvb *lvb;
2164 
2165 	lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2166 
2167 	/*
2168 	 * Invalidate the LVB of a deleted inode - this way other
2169 	 * nodes are forced to go to disk and discover the new inode
2170 	 * status.
2171 	 */
2172 	if (oi->ip_flags & OCFS2_INODE_DELETED) {
2173 		lvb->lvb_version = 0;
2174 		goto out;
2175 	}
2176 
2177 	lvb->lvb_version   = OCFS2_LVB_VERSION;
2178 	lvb->lvb_isize	   = cpu_to_be64(i_size_read(inode));
2179 	lvb->lvb_iclusters = cpu_to_be32(oi->ip_clusters);
2180 	lvb->lvb_iuid      = cpu_to_be32(i_uid_read(inode));
2181 	lvb->lvb_igid      = cpu_to_be32(i_gid_read(inode));
2182 	lvb->lvb_imode     = cpu_to_be16(inode->i_mode);
2183 	lvb->lvb_inlink    = cpu_to_be16(inode->i_nlink);
2184 	lvb->lvb_iatime_packed  =
2185 		cpu_to_be64(ocfs2_pack_timespec(&inode->i_atime));
2186 	lvb->lvb_ictime_packed =
2187 		cpu_to_be64(ocfs2_pack_timespec(&inode->i_ctime));
2188 	lvb->lvb_imtime_packed =
2189 		cpu_to_be64(ocfs2_pack_timespec(&inode->i_mtime));
2190 	lvb->lvb_iattr    = cpu_to_be32(oi->ip_attr);
2191 	lvb->lvb_idynfeatures = cpu_to_be16(oi->ip_dyn_features);
2192 	lvb->lvb_igeneration = cpu_to_be32(inode->i_generation);
2193 
2194 out:
2195 	mlog_meta_lvb(0, lockres);
2196 }
2197 
2198 static void ocfs2_unpack_timespec(struct timespec64 *spec,
2199 				  u64 packed_time)
2200 {
2201 	spec->tv_sec = packed_time >> OCFS2_SEC_SHIFT;
2202 	spec->tv_nsec = packed_time & OCFS2_NSEC_MASK;
2203 }
2204 
2205 static int ocfs2_refresh_inode_from_lvb(struct inode *inode)
2206 {
2207 	struct ocfs2_inode_info *oi = OCFS2_I(inode);
2208 	struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
2209 	struct ocfs2_meta_lvb *lvb;
2210 
2211 	mlog_meta_lvb(0, lockres);
2212 
2213 	lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2214 	if (inode_wrong_type(inode, be16_to_cpu(lvb->lvb_imode)))
2215 		return -ESTALE;
2216 
2217 	/* We're safe here without the lockres lock... */
2218 	spin_lock(&oi->ip_lock);
2219 	oi->ip_clusters = be32_to_cpu(lvb->lvb_iclusters);
2220 	i_size_write(inode, be64_to_cpu(lvb->lvb_isize));
2221 
2222 	oi->ip_attr = be32_to_cpu(lvb->lvb_iattr);
2223 	oi->ip_dyn_features = be16_to_cpu(lvb->lvb_idynfeatures);
2224 	ocfs2_set_inode_flags(inode);
2225 
2226 	/* fast-symlinks are a special case */
2227 	if (S_ISLNK(inode->i_mode) && !oi->ip_clusters)
2228 		inode->i_blocks = 0;
2229 	else
2230 		inode->i_blocks = ocfs2_inode_sector_count(inode);
2231 
2232 	i_uid_write(inode, be32_to_cpu(lvb->lvb_iuid));
2233 	i_gid_write(inode, be32_to_cpu(lvb->lvb_igid));
2234 	inode->i_mode    = be16_to_cpu(lvb->lvb_imode);
2235 	set_nlink(inode, be16_to_cpu(lvb->lvb_inlink));
2236 	ocfs2_unpack_timespec(&inode->i_atime,
2237 			      be64_to_cpu(lvb->lvb_iatime_packed));
2238 	ocfs2_unpack_timespec(&inode->i_mtime,
2239 			      be64_to_cpu(lvb->lvb_imtime_packed));
2240 	ocfs2_unpack_timespec(&inode->i_ctime,
2241 			      be64_to_cpu(lvb->lvb_ictime_packed));
2242 	spin_unlock(&oi->ip_lock);
2243 	return 0;
2244 }
2245 
2246 static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode,
2247 					      struct ocfs2_lock_res *lockres)
2248 {
2249 	struct ocfs2_meta_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2250 
2251 	if (ocfs2_dlm_lvb_valid(&lockres->l_lksb)
2252 	    && lvb->lvb_version == OCFS2_LVB_VERSION
2253 	    && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation)
2254 		return 1;
2255 	return 0;
2256 }
2257 
2258 /* Determine whether a lock resource needs to be refreshed, and
2259  * arbitrate who gets to refresh it.
2260  *
2261  *   0 means no refresh needed.
2262  *
2263  *   > 0 means you need to refresh this and you MUST call
2264  *   ocfs2_complete_lock_res_refresh afterwards. */
2265 static int ocfs2_should_refresh_lock_res(struct ocfs2_lock_res *lockres)
2266 {
2267 	unsigned long flags;
2268 	int status = 0;
2269 
2270 refresh_check:
2271 	spin_lock_irqsave(&lockres->l_lock, flags);
2272 	if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) {
2273 		spin_unlock_irqrestore(&lockres->l_lock, flags);
2274 		goto bail;
2275 	}
2276 
2277 	if (lockres->l_flags & OCFS2_LOCK_REFRESHING) {
2278 		spin_unlock_irqrestore(&lockres->l_lock, flags);
2279 
2280 		ocfs2_wait_on_refreshing_lock(lockres);
2281 		goto refresh_check;
2282 	}
2283 
2284 	/* Ok, I'll be the one to refresh this lock. */
2285 	lockres_or_flags(lockres, OCFS2_LOCK_REFRESHING);
2286 	spin_unlock_irqrestore(&lockres->l_lock, flags);
2287 
2288 	status = 1;
2289 bail:
2290 	mlog(0, "status %d\n", status);
2291 	return status;
2292 }
2293 
2294 /* If status is non zero, I'll mark it as not being in refresh
2295  * anymroe, but i won't clear the needs refresh flag. */
2296 static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockres,
2297 						   int status)
2298 {
2299 	unsigned long flags;
2300 
2301 	spin_lock_irqsave(&lockres->l_lock, flags);
2302 	lockres_clear_flags(lockres, OCFS2_LOCK_REFRESHING);
2303 	if (!status)
2304 		lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
2305 	spin_unlock_irqrestore(&lockres->l_lock, flags);
2306 
2307 	wake_up(&lockres->l_event);
2308 }
2309 
2310 /* may or may not return a bh if it went to disk. */
2311 static int ocfs2_inode_lock_update(struct inode *inode,
2312 				  struct buffer_head **bh)
2313 {
2314 	int status = 0;
2315 	struct ocfs2_inode_info *oi = OCFS2_I(inode);
2316 	struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
2317 	struct ocfs2_dinode *fe;
2318 	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
2319 
2320 	if (ocfs2_mount_local(osb))
2321 		goto bail;
2322 
2323 	spin_lock(&oi->ip_lock);
2324 	if (oi->ip_flags & OCFS2_INODE_DELETED) {
2325 		mlog(0, "Orphaned inode %llu was deleted while we "
2326 		     "were waiting on a lock. ip_flags = 0x%x\n",
2327 		     (unsigned long long)oi->ip_blkno, oi->ip_flags);
2328 		spin_unlock(&oi->ip_lock);
2329 		status = -ENOENT;
2330 		goto bail;
2331 	}
2332 	spin_unlock(&oi->ip_lock);
2333 
2334 	if (!ocfs2_should_refresh_lock_res(lockres))
2335 		goto bail;
2336 
2337 	/* This will discard any caching information we might have had
2338 	 * for the inode metadata. */
2339 	ocfs2_metadata_cache_purge(INODE_CACHE(inode));
2340 
2341 	ocfs2_extent_map_trunc(inode, 0);
2342 
2343 	if (ocfs2_meta_lvb_is_trustable(inode, lockres)) {
2344 		mlog(0, "Trusting LVB on inode %llu\n",
2345 		     (unsigned long long)oi->ip_blkno);
2346 		status = ocfs2_refresh_inode_from_lvb(inode);
2347 		goto bail_refresh;
2348 	} else {
2349 		/* Boo, we have to go to disk. */
2350 		/* read bh, cast, ocfs2_refresh_inode */
2351 		status = ocfs2_read_inode_block(inode, bh);
2352 		if (status < 0) {
2353 			mlog_errno(status);
2354 			goto bail_refresh;
2355 		}
2356 		fe = (struct ocfs2_dinode *) (*bh)->b_data;
2357 		if (inode_wrong_type(inode, le16_to_cpu(fe->i_mode))) {
2358 			status = -ESTALE;
2359 			goto bail_refresh;
2360 		}
2361 
2362 		/* This is a good chance to make sure we're not
2363 		 * locking an invalid object.  ocfs2_read_inode_block()
2364 		 * already checked that the inode block is sane.
2365 		 *
2366 		 * We bug on a stale inode here because we checked
2367 		 * above whether it was wiped from disk. The wiping
2368 		 * node provides a guarantee that we receive that
2369 		 * message and can mark the inode before dropping any
2370 		 * locks associated with it. */
2371 		mlog_bug_on_msg(inode->i_generation !=
2372 				le32_to_cpu(fe->i_generation),
2373 				"Invalid dinode %llu disk generation: %u "
2374 				"inode->i_generation: %u\n",
2375 				(unsigned long long)oi->ip_blkno,
2376 				le32_to_cpu(fe->i_generation),
2377 				inode->i_generation);
2378 		mlog_bug_on_msg(le64_to_cpu(fe->i_dtime) ||
2379 				!(fe->i_flags & cpu_to_le32(OCFS2_VALID_FL)),
2380 				"Stale dinode %llu dtime: %llu flags: 0x%x\n",
2381 				(unsigned long long)oi->ip_blkno,
2382 				(unsigned long long)le64_to_cpu(fe->i_dtime),
2383 				le32_to_cpu(fe->i_flags));
2384 
2385 		ocfs2_refresh_inode(inode, fe);
2386 		ocfs2_track_lock_refresh(lockres);
2387 	}
2388 
2389 	status = 0;
2390 bail_refresh:
2391 	ocfs2_complete_lock_res_refresh(lockres, status);
2392 bail:
2393 	return status;
2394 }
2395 
2396 static int ocfs2_assign_bh(struct inode *inode,
2397 			   struct buffer_head **ret_bh,
2398 			   struct buffer_head *passed_bh)
2399 {
2400 	int status;
2401 
2402 	if (passed_bh) {
2403 		/* Ok, the update went to disk for us, use the
2404 		 * returned bh. */
2405 		*ret_bh = passed_bh;
2406 		get_bh(*ret_bh);
2407 
2408 		return 0;
2409 	}
2410 
2411 	status = ocfs2_read_inode_block(inode, ret_bh);
2412 	if (status < 0)
2413 		mlog_errno(status);
2414 
2415 	return status;
2416 }
2417 
2418 /*
2419  * returns < 0 error if the callback will never be called, otherwise
2420  * the result of the lock will be communicated via the callback.
2421  */
2422 int ocfs2_inode_lock_full_nested(struct inode *inode,
2423 				 struct buffer_head **ret_bh,
2424 				 int ex,
2425 				 int arg_flags,
2426 				 int subclass)
2427 {
2428 	int status, level, acquired;
2429 	u32 dlm_flags;
2430 	struct ocfs2_lock_res *lockres = NULL;
2431 	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
2432 	struct buffer_head *local_bh = NULL;
2433 
2434 	mlog(0, "inode %llu, take %s META lock\n",
2435 	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
2436 	     ex ? "EXMODE" : "PRMODE");
2437 
2438 	status = 0;
2439 	acquired = 0;
2440 	/* We'll allow faking a readonly metadata lock for
2441 	 * rodevices. */
2442 	if (ocfs2_is_hard_readonly(osb)) {
2443 		if (ex)
2444 			status = -EROFS;
2445 		goto getbh;
2446 	}
2447 
2448 	if ((arg_flags & OCFS2_META_LOCK_GETBH) ||
2449 	    ocfs2_mount_local(osb))
2450 		goto update;
2451 
2452 	if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
2453 		ocfs2_wait_for_recovery(osb);
2454 
2455 	lockres = &OCFS2_I(inode)->ip_inode_lockres;
2456 	level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2457 	dlm_flags = 0;
2458 	if (arg_flags & OCFS2_META_LOCK_NOQUEUE)
2459 		dlm_flags |= DLM_LKF_NOQUEUE;
2460 
2461 	status = __ocfs2_cluster_lock(osb, lockres, level, dlm_flags,
2462 				      arg_flags, subclass, _RET_IP_);
2463 	if (status < 0) {
2464 		if (status != -EAGAIN)
2465 			mlog_errno(status);
2466 		goto bail;
2467 	}
2468 
2469 	/* Notify the error cleanup path to drop the cluster lock. */
2470 	acquired = 1;
2471 
2472 	/* We wait twice because a node may have died while we were in
2473 	 * the lower dlm layers. The second time though, we've
2474 	 * committed to owning this lock so we don't allow signals to
2475 	 * abort the operation. */
2476 	if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
2477 		ocfs2_wait_for_recovery(osb);
2478 
2479 update:
2480 	/*
2481 	 * We only see this flag if we're being called from
2482 	 * ocfs2_read_locked_inode(). It means we're locking an inode
2483 	 * which hasn't been populated yet, so clear the refresh flag
2484 	 * and let the caller handle it.
2485 	 */
2486 	if (inode->i_state & I_NEW) {
2487 		status = 0;
2488 		if (lockres)
2489 			ocfs2_complete_lock_res_refresh(lockres, 0);
2490 		goto bail;
2491 	}
2492 
2493 	/* This is fun. The caller may want a bh back, or it may
2494 	 * not. ocfs2_inode_lock_update definitely wants one in, but
2495 	 * may or may not read one, depending on what's in the
2496 	 * LVB. The result of all of this is that we've *only* gone to
2497 	 * disk if we have to, so the complexity is worthwhile. */
2498 	status = ocfs2_inode_lock_update(inode, &local_bh);
2499 	if (status < 0) {
2500 		if (status != -ENOENT)
2501 			mlog_errno(status);
2502 		goto bail;
2503 	}
2504 getbh:
2505 	if (ret_bh) {
2506 		status = ocfs2_assign_bh(inode, ret_bh, local_bh);
2507 		if (status < 0) {
2508 			mlog_errno(status);
2509 			goto bail;
2510 		}
2511 	}
2512 
2513 bail:
2514 	if (status < 0) {
2515 		if (ret_bh && (*ret_bh)) {
2516 			brelse(*ret_bh);
2517 			*ret_bh = NULL;
2518 		}
2519 		if (acquired)
2520 			ocfs2_inode_unlock(inode, ex);
2521 	}
2522 
2523 	brelse(local_bh);
2524 	return status;
2525 }
2526 
2527 /*
2528  * This is working around a lock inversion between tasks acquiring DLM
2529  * locks while holding a page lock and the downconvert thread which
2530  * blocks dlm lock acquiry while acquiring page locks.
2531  *
2532  * ** These _with_page variantes are only intended to be called from aop
2533  * methods that hold page locks and return a very specific *positive* error
2534  * code that aop methods pass up to the VFS -- test for errors with != 0. **
2535  *
2536  * The DLM is called such that it returns -EAGAIN if it would have
2537  * blocked waiting for the downconvert thread.  In that case we unlock
2538  * our page so the downconvert thread can make progress.  Once we've
2539  * done this we have to return AOP_TRUNCATED_PAGE so the aop method
2540  * that called us can bubble that back up into the VFS who will then
2541  * immediately retry the aop call.
2542  */
2543 int ocfs2_inode_lock_with_page(struct inode *inode,
2544 			      struct buffer_head **ret_bh,
2545 			      int ex,
2546 			      struct page *page)
2547 {
2548 	int ret;
2549 
2550 	ret = ocfs2_inode_lock_full(inode, ret_bh, ex, OCFS2_LOCK_NONBLOCK);
2551 	if (ret == -EAGAIN) {
2552 		unlock_page(page);
2553 		/*
2554 		 * If we can't get inode lock immediately, we should not return
2555 		 * directly here, since this will lead to a softlockup problem.
2556 		 * The method is to get a blocking lock and immediately unlock
2557 		 * before returning, this can avoid CPU resource waste due to
2558 		 * lots of retries, and benefits fairness in getting lock.
2559 		 */
2560 		if (ocfs2_inode_lock(inode, ret_bh, ex) == 0)
2561 			ocfs2_inode_unlock(inode, ex);
2562 		ret = AOP_TRUNCATED_PAGE;
2563 	}
2564 
2565 	return ret;
2566 }
2567 
2568 int ocfs2_inode_lock_atime(struct inode *inode,
2569 			  struct vfsmount *vfsmnt,
2570 			  int *level, int wait)
2571 {
2572 	int ret;
2573 
2574 	if (wait)
2575 		ret = ocfs2_inode_lock(inode, NULL, 0);
2576 	else
2577 		ret = ocfs2_try_inode_lock(inode, NULL, 0);
2578 
2579 	if (ret < 0) {
2580 		if (ret != -EAGAIN)
2581 			mlog_errno(ret);
2582 		return ret;
2583 	}
2584 
2585 	/*
2586 	 * If we should update atime, we will get EX lock,
2587 	 * otherwise we just get PR lock.
2588 	 */
2589 	if (ocfs2_should_update_atime(inode, vfsmnt)) {
2590 		struct buffer_head *bh = NULL;
2591 
2592 		ocfs2_inode_unlock(inode, 0);
2593 		if (wait)
2594 			ret = ocfs2_inode_lock(inode, &bh, 1);
2595 		else
2596 			ret = ocfs2_try_inode_lock(inode, &bh, 1);
2597 
2598 		if (ret < 0) {
2599 			if (ret != -EAGAIN)
2600 				mlog_errno(ret);
2601 			return ret;
2602 		}
2603 		*level = 1;
2604 		if (ocfs2_should_update_atime(inode, vfsmnt))
2605 			ocfs2_update_inode_atime(inode, bh);
2606 		brelse(bh);
2607 	} else
2608 		*level = 0;
2609 
2610 	return ret;
2611 }
2612 
2613 void ocfs2_inode_unlock(struct inode *inode,
2614 		       int ex)
2615 {
2616 	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2617 	struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_inode_lockres;
2618 	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
2619 
2620 	mlog(0, "inode %llu drop %s META lock\n",
2621 	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
2622 	     ex ? "EXMODE" : "PRMODE");
2623 
2624 	if (!ocfs2_is_hard_readonly(osb) &&
2625 	    !ocfs2_mount_local(osb))
2626 		ocfs2_cluster_unlock(osb, lockres, level);
2627 }
2628 
2629 /*
2630  * This _tracker variantes are introduced to deal with the recursive cluster
2631  * locking issue. The idea is to keep track of a lock holder on the stack of
2632  * the current process. If there's a lock holder on the stack, we know the
2633  * task context is already protected by cluster locking. Currently, they're
2634  * used in some VFS entry routines.
2635  *
2636  * return < 0 on error, return == 0 if there's no lock holder on the stack
2637  * before this call, return == 1 if this call would be a recursive locking.
2638  * return == -1 if this lock attempt will cause an upgrade which is forbidden.
2639  *
2640  * When taking lock levels into account,we face some different situations.
2641  *
2642  * 1. no lock is held
2643  *    In this case, just lock the inode as requested and return 0
2644  *
2645  * 2. We are holding a lock
2646  *    For this situation, things diverges into several cases
2647  *
2648  *    wanted     holding	     what to do
2649  *    ex		ex	    see 2.1 below
2650  *    ex		pr	    see 2.2 below
2651  *    pr		ex	    see 2.1 below
2652  *    pr		pr	    see 2.1 below
2653  *
2654  *    2.1 lock level that is been held is compatible
2655  *    with the wanted level, so no lock action will be tacken.
2656  *
2657  *    2.2 Otherwise, an upgrade is needed, but it is forbidden.
2658  *
2659  * Reason why upgrade within a process is forbidden is that
2660  * lock upgrade may cause dead lock. The following illustrates
2661  * how it happens.
2662  *
2663  *         thread on node1                             thread on node2
2664  * ocfs2_inode_lock_tracker(ex=0)
2665  *
2666  *                                <======   ocfs2_inode_lock_tracker(ex=1)
2667  *
2668  * ocfs2_inode_lock_tracker(ex=1)
2669  */
2670 int ocfs2_inode_lock_tracker(struct inode *inode,
2671 			     struct buffer_head **ret_bh,
2672 			     int ex,
2673 			     struct ocfs2_lock_holder *oh)
2674 {
2675 	int status = 0;
2676 	struct ocfs2_lock_res *lockres;
2677 	struct ocfs2_lock_holder *tmp_oh;
2678 	struct pid *pid = task_pid(current);
2679 
2680 
2681 	lockres = &OCFS2_I(inode)->ip_inode_lockres;
2682 	tmp_oh = ocfs2_pid_holder(lockres, pid);
2683 
2684 	if (!tmp_oh) {
2685 		/*
2686 		 * This corresponds to the case 1.
2687 		 * We haven't got any lock before.
2688 		 */
2689 		status = ocfs2_inode_lock_full(inode, ret_bh, ex, 0);
2690 		if (status < 0) {
2691 			if (status != -ENOENT)
2692 				mlog_errno(status);
2693 			return status;
2694 		}
2695 
2696 		oh->oh_ex = ex;
2697 		ocfs2_add_holder(lockres, oh);
2698 		return 0;
2699 	}
2700 
2701 	if (unlikely(ex && !tmp_oh->oh_ex)) {
2702 		/*
2703 		 * case 2.2 upgrade may cause dead lock, forbid it.
2704 		 */
2705 		mlog(ML_ERROR, "Recursive locking is not permitted to "
2706 		     "upgrade to EX level from PR level.\n");
2707 		dump_stack();
2708 		return -EINVAL;
2709 	}
2710 
2711 	/*
2712 	 *  case 2.1 OCFS2_META_LOCK_GETBH flag make ocfs2_inode_lock_full.
2713 	 *  ignore the lock level and just update it.
2714 	 */
2715 	if (ret_bh) {
2716 		status = ocfs2_inode_lock_full(inode, ret_bh, ex,
2717 					       OCFS2_META_LOCK_GETBH);
2718 		if (status < 0) {
2719 			if (status != -ENOENT)
2720 				mlog_errno(status);
2721 			return status;
2722 		}
2723 	}
2724 	return tmp_oh ? 1 : 0;
2725 }
2726 
2727 void ocfs2_inode_unlock_tracker(struct inode *inode,
2728 				int ex,
2729 				struct ocfs2_lock_holder *oh,
2730 				int had_lock)
2731 {
2732 	struct ocfs2_lock_res *lockres;
2733 
2734 	lockres = &OCFS2_I(inode)->ip_inode_lockres;
2735 	/* had_lock means that the currect process already takes the cluster
2736 	 * lock previously.
2737 	 * If had_lock is 1, we have nothing to do here.
2738 	 * If had_lock is 0, we will release the lock.
2739 	 */
2740 	if (!had_lock) {
2741 		ocfs2_inode_unlock(inode, oh->oh_ex);
2742 		ocfs2_remove_holder(lockres, oh);
2743 	}
2744 }
2745 
2746 int ocfs2_orphan_scan_lock(struct ocfs2_super *osb, u32 *seqno)
2747 {
2748 	struct ocfs2_lock_res *lockres;
2749 	struct ocfs2_orphan_scan_lvb *lvb;
2750 	int status = 0;
2751 
2752 	if (ocfs2_is_hard_readonly(osb))
2753 		return -EROFS;
2754 
2755 	if (ocfs2_mount_local(osb))
2756 		return 0;
2757 
2758 	lockres = &osb->osb_orphan_scan.os_lockres;
2759 	status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0);
2760 	if (status < 0)
2761 		return status;
2762 
2763 	lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2764 	if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) &&
2765 	    lvb->lvb_version == OCFS2_ORPHAN_LVB_VERSION)
2766 		*seqno = be32_to_cpu(lvb->lvb_os_seqno);
2767 	else
2768 		*seqno = osb->osb_orphan_scan.os_seqno + 1;
2769 
2770 	return status;
2771 }
2772 
2773 void ocfs2_orphan_scan_unlock(struct ocfs2_super *osb, u32 seqno)
2774 {
2775 	struct ocfs2_lock_res *lockres;
2776 	struct ocfs2_orphan_scan_lvb *lvb;
2777 
2778 	if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb)) {
2779 		lockres = &osb->osb_orphan_scan.os_lockres;
2780 		lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2781 		lvb->lvb_version = OCFS2_ORPHAN_LVB_VERSION;
2782 		lvb->lvb_os_seqno = cpu_to_be32(seqno);
2783 		ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX);
2784 	}
2785 }
2786 
2787 int ocfs2_super_lock(struct ocfs2_super *osb,
2788 		     int ex)
2789 {
2790 	int status = 0;
2791 	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2792 	struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
2793 
2794 	if (ocfs2_is_hard_readonly(osb))
2795 		return -EROFS;
2796 
2797 	if (ocfs2_mount_local(osb))
2798 		goto bail;
2799 
2800 	status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
2801 	if (status < 0) {
2802 		mlog_errno(status);
2803 		goto bail;
2804 	}
2805 
2806 	/* The super block lock path is really in the best position to
2807 	 * know when resources covered by the lock need to be
2808 	 * refreshed, so we do it here. Of course, making sense of
2809 	 * everything is up to the caller :) */
2810 	status = ocfs2_should_refresh_lock_res(lockres);
2811 	if (status) {
2812 		status = ocfs2_refresh_slot_info(osb);
2813 
2814 		ocfs2_complete_lock_res_refresh(lockres, status);
2815 
2816 		if (status < 0) {
2817 			ocfs2_cluster_unlock(osb, lockres, level);
2818 			mlog_errno(status);
2819 		}
2820 		ocfs2_track_lock_refresh(lockres);
2821 	}
2822 bail:
2823 	return status;
2824 }
2825 
2826 void ocfs2_super_unlock(struct ocfs2_super *osb,
2827 			int ex)
2828 {
2829 	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2830 	struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
2831 
2832 	if (!ocfs2_mount_local(osb))
2833 		ocfs2_cluster_unlock(osb, lockres, level);
2834 }
2835 
2836 int ocfs2_rename_lock(struct ocfs2_super *osb)
2837 {
2838 	int status;
2839 	struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
2840 
2841 	if (ocfs2_is_hard_readonly(osb))
2842 		return -EROFS;
2843 
2844 	if (ocfs2_mount_local(osb))
2845 		return 0;
2846 
2847 	status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0);
2848 	if (status < 0)
2849 		mlog_errno(status);
2850 
2851 	return status;
2852 }
2853 
2854 void ocfs2_rename_unlock(struct ocfs2_super *osb)
2855 {
2856 	struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
2857 
2858 	if (!ocfs2_mount_local(osb))
2859 		ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX);
2860 }
2861 
2862 int ocfs2_nfs_sync_lock(struct ocfs2_super *osb, int ex)
2863 {
2864 	int status;
2865 	struct ocfs2_lock_res *lockres = &osb->osb_nfs_sync_lockres;
2866 
2867 	if (ocfs2_is_hard_readonly(osb))
2868 		return -EROFS;
2869 
2870 	if (ex)
2871 		down_write(&osb->nfs_sync_rwlock);
2872 	else
2873 		down_read(&osb->nfs_sync_rwlock);
2874 
2875 	if (ocfs2_mount_local(osb))
2876 		return 0;
2877 
2878 	status = ocfs2_cluster_lock(osb, lockres, ex ? LKM_EXMODE : LKM_PRMODE,
2879 				    0, 0);
2880 	if (status < 0) {
2881 		mlog(ML_ERROR, "lock on nfs sync lock failed %d\n", status);
2882 
2883 		if (ex)
2884 			up_write(&osb->nfs_sync_rwlock);
2885 		else
2886 			up_read(&osb->nfs_sync_rwlock);
2887 	}
2888 
2889 	return status;
2890 }
2891 
2892 void ocfs2_nfs_sync_unlock(struct ocfs2_super *osb, int ex)
2893 {
2894 	struct ocfs2_lock_res *lockres = &osb->osb_nfs_sync_lockres;
2895 
2896 	if (!ocfs2_mount_local(osb))
2897 		ocfs2_cluster_unlock(osb, lockres,
2898 				     ex ? LKM_EXMODE : LKM_PRMODE);
2899 	if (ex)
2900 		up_write(&osb->nfs_sync_rwlock);
2901 	else
2902 		up_read(&osb->nfs_sync_rwlock);
2903 }
2904 
2905 int ocfs2_trim_fs_lock(struct ocfs2_super *osb,
2906 		       struct ocfs2_trim_fs_info *info, int trylock)
2907 {
2908 	int status;
2909 	struct ocfs2_trim_fs_lvb *lvb;
2910 	struct ocfs2_lock_res *lockres = &osb->osb_trim_fs_lockres;
2911 
2912 	if (info)
2913 		info->tf_valid = 0;
2914 
2915 	if (ocfs2_is_hard_readonly(osb))
2916 		return -EROFS;
2917 
2918 	if (ocfs2_mount_local(osb))
2919 		return 0;
2920 
2921 	status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX,
2922 				    trylock ? DLM_LKF_NOQUEUE : 0, 0);
2923 	if (status < 0) {
2924 		if (status != -EAGAIN)
2925 			mlog_errno(status);
2926 		return status;
2927 	}
2928 
2929 	if (info) {
2930 		lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2931 		if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) &&
2932 		    lvb->lvb_version == OCFS2_TRIMFS_LVB_VERSION) {
2933 			info->tf_valid = 1;
2934 			info->tf_success = lvb->lvb_success;
2935 			info->tf_nodenum = be32_to_cpu(lvb->lvb_nodenum);
2936 			info->tf_start = be64_to_cpu(lvb->lvb_start);
2937 			info->tf_len = be64_to_cpu(lvb->lvb_len);
2938 			info->tf_minlen = be64_to_cpu(lvb->lvb_minlen);
2939 			info->tf_trimlen = be64_to_cpu(lvb->lvb_trimlen);
2940 		}
2941 	}
2942 
2943 	return status;
2944 }
2945 
2946 void ocfs2_trim_fs_unlock(struct ocfs2_super *osb,
2947 			  struct ocfs2_trim_fs_info *info)
2948 {
2949 	struct ocfs2_trim_fs_lvb *lvb;
2950 	struct ocfs2_lock_res *lockres = &osb->osb_trim_fs_lockres;
2951 
2952 	if (ocfs2_mount_local(osb))
2953 		return;
2954 
2955 	if (info) {
2956 		lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2957 		lvb->lvb_version = OCFS2_TRIMFS_LVB_VERSION;
2958 		lvb->lvb_success = info->tf_success;
2959 		lvb->lvb_nodenum = cpu_to_be32(info->tf_nodenum);
2960 		lvb->lvb_start = cpu_to_be64(info->tf_start);
2961 		lvb->lvb_len = cpu_to_be64(info->tf_len);
2962 		lvb->lvb_minlen = cpu_to_be64(info->tf_minlen);
2963 		lvb->lvb_trimlen = cpu_to_be64(info->tf_trimlen);
2964 	}
2965 
2966 	ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX);
2967 }
2968 
2969 int ocfs2_dentry_lock(struct dentry *dentry, int ex)
2970 {
2971 	int ret;
2972 	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2973 	struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
2974 	struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
2975 
2976 	BUG_ON(!dl);
2977 
2978 	if (ocfs2_is_hard_readonly(osb)) {
2979 		if (ex)
2980 			return -EROFS;
2981 		return 0;
2982 	}
2983 
2984 	if (ocfs2_mount_local(osb))
2985 		return 0;
2986 
2987 	ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0);
2988 	if (ret < 0)
2989 		mlog_errno(ret);
2990 
2991 	return ret;
2992 }
2993 
2994 void ocfs2_dentry_unlock(struct dentry *dentry, int ex)
2995 {
2996 	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2997 	struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
2998 	struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
2999 
3000 	if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb))
3001 		ocfs2_cluster_unlock(osb, &dl->dl_lockres, level);
3002 }
3003 
3004 /* Reference counting of the dlm debug structure. We want this because
3005  * open references on the debug inodes can live on after a mount, so
3006  * we can't rely on the ocfs2_super to always exist. */
3007 static void ocfs2_dlm_debug_free(struct kref *kref)
3008 {
3009 	struct ocfs2_dlm_debug *dlm_debug;
3010 
3011 	dlm_debug = container_of(kref, struct ocfs2_dlm_debug, d_refcnt);
3012 
3013 	kfree(dlm_debug);
3014 }
3015 
3016 void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug)
3017 {
3018 	if (dlm_debug)
3019 		kref_put(&dlm_debug->d_refcnt, ocfs2_dlm_debug_free);
3020 }
3021 
3022 static void ocfs2_get_dlm_debug(struct ocfs2_dlm_debug *debug)
3023 {
3024 	kref_get(&debug->d_refcnt);
3025 }
3026 
3027 struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void)
3028 {
3029 	struct ocfs2_dlm_debug *dlm_debug;
3030 
3031 	dlm_debug = kmalloc(sizeof(struct ocfs2_dlm_debug), GFP_KERNEL);
3032 	if (!dlm_debug) {
3033 		mlog_errno(-ENOMEM);
3034 		goto out;
3035 	}
3036 
3037 	kref_init(&dlm_debug->d_refcnt);
3038 	INIT_LIST_HEAD(&dlm_debug->d_lockres_tracking);
3039 	dlm_debug->d_filter_secs = 0;
3040 out:
3041 	return dlm_debug;
3042 }
3043 
3044 /* Access to this is arbitrated for us via seq_file->sem. */
3045 struct ocfs2_dlm_seq_priv {
3046 	struct ocfs2_dlm_debug *p_dlm_debug;
3047 	struct ocfs2_lock_res p_iter_res;
3048 	struct ocfs2_lock_res p_tmp_res;
3049 };
3050 
3051 static struct ocfs2_lock_res *ocfs2_dlm_next_res(struct ocfs2_lock_res *start,
3052 						 struct ocfs2_dlm_seq_priv *priv)
3053 {
3054 	struct ocfs2_lock_res *iter, *ret = NULL;
3055 	struct ocfs2_dlm_debug *dlm_debug = priv->p_dlm_debug;
3056 
3057 	assert_spin_locked(&ocfs2_dlm_tracking_lock);
3058 
3059 	list_for_each_entry(iter, &start->l_debug_list, l_debug_list) {
3060 		/* discover the head of the list */
3061 		if (&iter->l_debug_list == &dlm_debug->d_lockres_tracking) {
3062 			mlog(0, "End of list found, %p\n", ret);
3063 			break;
3064 		}
3065 
3066 		/* We track our "dummy" iteration lockres' by a NULL
3067 		 * l_ops field. */
3068 		if (iter->l_ops != NULL) {
3069 			ret = iter;
3070 			break;
3071 		}
3072 	}
3073 
3074 	return ret;
3075 }
3076 
3077 static void *ocfs2_dlm_seq_start(struct seq_file *m, loff_t *pos)
3078 {
3079 	struct ocfs2_dlm_seq_priv *priv = m->private;
3080 	struct ocfs2_lock_res *iter;
3081 
3082 	spin_lock(&ocfs2_dlm_tracking_lock);
3083 	iter = ocfs2_dlm_next_res(&priv->p_iter_res, priv);
3084 	if (iter) {
3085 		/* Since lockres' have the lifetime of their container
3086 		 * (which can be inodes, ocfs2_supers, etc) we want to
3087 		 * copy this out to a temporary lockres while still
3088 		 * under the spinlock. Obviously after this we can't
3089 		 * trust any pointers on the copy returned, but that's
3090 		 * ok as the information we want isn't typically held
3091 		 * in them. */
3092 		priv->p_tmp_res = *iter;
3093 		iter = &priv->p_tmp_res;
3094 	}
3095 	spin_unlock(&ocfs2_dlm_tracking_lock);
3096 
3097 	return iter;
3098 }
3099 
3100 static void ocfs2_dlm_seq_stop(struct seq_file *m, void *v)
3101 {
3102 }
3103 
3104 static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos)
3105 {
3106 	struct ocfs2_dlm_seq_priv *priv = m->private;
3107 	struct ocfs2_lock_res *iter = v;
3108 	struct ocfs2_lock_res *dummy = &priv->p_iter_res;
3109 
3110 	spin_lock(&ocfs2_dlm_tracking_lock);
3111 	iter = ocfs2_dlm_next_res(iter, priv);
3112 	list_del_init(&dummy->l_debug_list);
3113 	if (iter) {
3114 		list_add(&dummy->l_debug_list, &iter->l_debug_list);
3115 		priv->p_tmp_res = *iter;
3116 		iter = &priv->p_tmp_res;
3117 	}
3118 	spin_unlock(&ocfs2_dlm_tracking_lock);
3119 
3120 	return iter;
3121 }
3122 
3123 /*
3124  * Version is used by debugfs.ocfs2 to determine the format being used
3125  *
3126  * New in version 2
3127  *	- Lock stats printed
3128  * New in version 3
3129  *	- Max time in lock stats is in usecs (instead of nsecs)
3130  * New in version 4
3131  *	- Add last pr/ex unlock times and first lock wait time in usecs
3132  */
3133 #define OCFS2_DLM_DEBUG_STR_VERSION 4
3134 static int ocfs2_dlm_seq_show(struct seq_file *m, void *v)
3135 {
3136 	int i;
3137 	char *lvb;
3138 	struct ocfs2_lock_res *lockres = v;
3139 #ifdef CONFIG_OCFS2_FS_STATS
3140 	u64 now, last;
3141 	struct ocfs2_dlm_debug *dlm_debug =
3142 			((struct ocfs2_dlm_seq_priv *)m->private)->p_dlm_debug;
3143 #endif
3144 
3145 	if (!lockres)
3146 		return -EINVAL;
3147 
3148 #ifdef CONFIG_OCFS2_FS_STATS
3149 	if (!lockres->l_lock_wait && dlm_debug->d_filter_secs) {
3150 		now = ktime_to_us(ktime_get_real());
3151 		if (lockres->l_lock_prmode.ls_last >
3152 		    lockres->l_lock_exmode.ls_last)
3153 			last = lockres->l_lock_prmode.ls_last;
3154 		else
3155 			last = lockres->l_lock_exmode.ls_last;
3156 		/*
3157 		 * Use d_filter_secs field to filter lock resources dump,
3158 		 * the default d_filter_secs(0) value filters nothing,
3159 		 * otherwise, only dump the last N seconds active lock
3160 		 * resources.
3161 		 */
3162 		if (div_u64(now - last, 1000000) > dlm_debug->d_filter_secs)
3163 			return 0;
3164 	}
3165 #endif
3166 
3167 	seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION);
3168 
3169 	if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY)
3170 		seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1,
3171 			   lockres->l_name,
3172 			   (unsigned int)ocfs2_get_dentry_lock_ino(lockres));
3173 	else
3174 		seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name);
3175 
3176 	seq_printf(m, "%d\t"
3177 		   "0x%lx\t"
3178 		   "0x%x\t"
3179 		   "0x%x\t"
3180 		   "%u\t"
3181 		   "%u\t"
3182 		   "%d\t"
3183 		   "%d\t",
3184 		   lockres->l_level,
3185 		   lockres->l_flags,
3186 		   lockres->l_action,
3187 		   lockres->l_unlock_action,
3188 		   lockres->l_ro_holders,
3189 		   lockres->l_ex_holders,
3190 		   lockres->l_requested,
3191 		   lockres->l_blocking);
3192 
3193 	/* Dump the raw LVB */
3194 	lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
3195 	for(i = 0; i < DLM_LVB_LEN; i++)
3196 		seq_printf(m, "0x%x\t", lvb[i]);
3197 
3198 #ifdef CONFIG_OCFS2_FS_STATS
3199 # define lock_num_prmode(_l)		((_l)->l_lock_prmode.ls_gets)
3200 # define lock_num_exmode(_l)		((_l)->l_lock_exmode.ls_gets)
3201 # define lock_num_prmode_failed(_l)	((_l)->l_lock_prmode.ls_fail)
3202 # define lock_num_exmode_failed(_l)	((_l)->l_lock_exmode.ls_fail)
3203 # define lock_total_prmode(_l)		((_l)->l_lock_prmode.ls_total)
3204 # define lock_total_exmode(_l)		((_l)->l_lock_exmode.ls_total)
3205 # define lock_max_prmode(_l)		((_l)->l_lock_prmode.ls_max)
3206 # define lock_max_exmode(_l)		((_l)->l_lock_exmode.ls_max)
3207 # define lock_refresh(_l)		((_l)->l_lock_refresh)
3208 # define lock_last_prmode(_l)		((_l)->l_lock_prmode.ls_last)
3209 # define lock_last_exmode(_l)		((_l)->l_lock_exmode.ls_last)
3210 # define lock_wait(_l)			((_l)->l_lock_wait)
3211 #else
3212 # define lock_num_prmode(_l)		(0)
3213 # define lock_num_exmode(_l)		(0)
3214 # define lock_num_prmode_failed(_l)	(0)
3215 # define lock_num_exmode_failed(_l)	(0)
3216 # define lock_total_prmode(_l)		(0ULL)
3217 # define lock_total_exmode(_l)		(0ULL)
3218 # define lock_max_prmode(_l)		(0)
3219 # define lock_max_exmode(_l)		(0)
3220 # define lock_refresh(_l)		(0)
3221 # define lock_last_prmode(_l)		(0ULL)
3222 # define lock_last_exmode(_l)		(0ULL)
3223 # define lock_wait(_l)			(0ULL)
3224 #endif
3225 	/* The following seq_print was added in version 2 of this output */
3226 	seq_printf(m, "%u\t"
3227 		   "%u\t"
3228 		   "%u\t"
3229 		   "%u\t"
3230 		   "%llu\t"
3231 		   "%llu\t"
3232 		   "%u\t"
3233 		   "%u\t"
3234 		   "%u\t"
3235 		   "%llu\t"
3236 		   "%llu\t"
3237 		   "%llu\t",
3238 		   lock_num_prmode(lockres),
3239 		   lock_num_exmode(lockres),
3240 		   lock_num_prmode_failed(lockres),
3241 		   lock_num_exmode_failed(lockres),
3242 		   lock_total_prmode(lockres),
3243 		   lock_total_exmode(lockres),
3244 		   lock_max_prmode(lockres),
3245 		   lock_max_exmode(lockres),
3246 		   lock_refresh(lockres),
3247 		   lock_last_prmode(lockres),
3248 		   lock_last_exmode(lockres),
3249 		   lock_wait(lockres));
3250 
3251 	/* End the line */
3252 	seq_printf(m, "\n");
3253 	return 0;
3254 }
3255 
3256 static const struct seq_operations ocfs2_dlm_seq_ops = {
3257 	.start =	ocfs2_dlm_seq_start,
3258 	.stop =		ocfs2_dlm_seq_stop,
3259 	.next =		ocfs2_dlm_seq_next,
3260 	.show =		ocfs2_dlm_seq_show,
3261 };
3262 
3263 static int ocfs2_dlm_debug_release(struct inode *inode, struct file *file)
3264 {
3265 	struct seq_file *seq = file->private_data;
3266 	struct ocfs2_dlm_seq_priv *priv = seq->private;
3267 	struct ocfs2_lock_res *res = &priv->p_iter_res;
3268 
3269 	ocfs2_remove_lockres_tracking(res);
3270 	ocfs2_put_dlm_debug(priv->p_dlm_debug);
3271 	return seq_release_private(inode, file);
3272 }
3273 
3274 static int ocfs2_dlm_debug_open(struct inode *inode, struct file *file)
3275 {
3276 	struct ocfs2_dlm_seq_priv *priv;
3277 	struct ocfs2_super *osb;
3278 
3279 	priv = __seq_open_private(file, &ocfs2_dlm_seq_ops, sizeof(*priv));
3280 	if (!priv) {
3281 		mlog_errno(-ENOMEM);
3282 		return -ENOMEM;
3283 	}
3284 
3285 	osb = inode->i_private;
3286 	ocfs2_get_dlm_debug(osb->osb_dlm_debug);
3287 	priv->p_dlm_debug = osb->osb_dlm_debug;
3288 	INIT_LIST_HEAD(&priv->p_iter_res.l_debug_list);
3289 
3290 	ocfs2_add_lockres_tracking(&priv->p_iter_res,
3291 				   priv->p_dlm_debug);
3292 
3293 	return 0;
3294 }
3295 
3296 static const struct file_operations ocfs2_dlm_debug_fops = {
3297 	.open =		ocfs2_dlm_debug_open,
3298 	.release =	ocfs2_dlm_debug_release,
3299 	.read =		seq_read,
3300 	.llseek =	seq_lseek,
3301 };
3302 
3303 static void ocfs2_dlm_init_debug(struct ocfs2_super *osb)
3304 {
3305 	struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
3306 
3307 	debugfs_create_file("locking_state", S_IFREG|S_IRUSR,
3308 			    osb->osb_debug_root, osb, &ocfs2_dlm_debug_fops);
3309 
3310 	debugfs_create_u32("locking_filter", 0600, osb->osb_debug_root,
3311 			   &dlm_debug->d_filter_secs);
3312 	ocfs2_get_dlm_debug(dlm_debug);
3313 }
3314 
3315 static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb)
3316 {
3317 	struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
3318 
3319 	if (dlm_debug)
3320 		ocfs2_put_dlm_debug(dlm_debug);
3321 }
3322 
3323 int ocfs2_dlm_init(struct ocfs2_super *osb)
3324 {
3325 	int status = 0;
3326 	struct ocfs2_cluster_connection *conn = NULL;
3327 
3328 	if (ocfs2_mount_local(osb)) {
3329 		osb->node_num = 0;
3330 		goto local;
3331 	}
3332 
3333 	ocfs2_dlm_init_debug(osb);
3334 
3335 	/* launch downconvert thread */
3336 	osb->dc_task = kthread_run(ocfs2_downconvert_thread, osb, "ocfs2dc-%s",
3337 			osb->uuid_str);
3338 	if (IS_ERR(osb->dc_task)) {
3339 		status = PTR_ERR(osb->dc_task);
3340 		osb->dc_task = NULL;
3341 		mlog_errno(status);
3342 		goto bail;
3343 	}
3344 
3345 	/* for now, uuid == domain */
3346 	status = ocfs2_cluster_connect(osb->osb_cluster_stack,
3347 				       osb->osb_cluster_name,
3348 				       strlen(osb->osb_cluster_name),
3349 				       osb->uuid_str,
3350 				       strlen(osb->uuid_str),
3351 				       &lproto, ocfs2_do_node_down, osb,
3352 				       &conn);
3353 	if (status) {
3354 		mlog_errno(status);
3355 		goto bail;
3356 	}
3357 
3358 	status = ocfs2_cluster_this_node(conn, &osb->node_num);
3359 	if (status < 0) {
3360 		mlog_errno(status);
3361 		mlog(ML_ERROR,
3362 		     "could not find this host's node number\n");
3363 		ocfs2_cluster_disconnect(conn, 0);
3364 		goto bail;
3365 	}
3366 
3367 local:
3368 	ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb);
3369 	ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb);
3370 	ocfs2_nfs_sync_lock_init(osb);
3371 	ocfs2_orphan_scan_lock_res_init(&osb->osb_orphan_scan.os_lockres, osb);
3372 
3373 	osb->cconn = conn;
3374 bail:
3375 	if (status < 0) {
3376 		ocfs2_dlm_shutdown_debug(osb);
3377 		if (osb->dc_task)
3378 			kthread_stop(osb->dc_task);
3379 	}
3380 
3381 	return status;
3382 }
3383 
3384 void ocfs2_dlm_shutdown(struct ocfs2_super *osb,
3385 			int hangup_pending)
3386 {
3387 	ocfs2_drop_osb_locks(osb);
3388 
3389 	/*
3390 	 * Now that we have dropped all locks and ocfs2_dismount_volume()
3391 	 * has disabled recovery, the DLM won't be talking to us.  It's
3392 	 * safe to tear things down before disconnecting the cluster.
3393 	 */
3394 
3395 	if (osb->dc_task) {
3396 		kthread_stop(osb->dc_task);
3397 		osb->dc_task = NULL;
3398 	}
3399 
3400 	ocfs2_lock_res_free(&osb->osb_super_lockres);
3401 	ocfs2_lock_res_free(&osb->osb_rename_lockres);
3402 	ocfs2_lock_res_free(&osb->osb_nfs_sync_lockres);
3403 	ocfs2_lock_res_free(&osb->osb_orphan_scan.os_lockres);
3404 
3405 	ocfs2_cluster_disconnect(osb->cconn, hangup_pending);
3406 	osb->cconn = NULL;
3407 
3408 	ocfs2_dlm_shutdown_debug(osb);
3409 }
3410 
3411 static int ocfs2_drop_lock(struct ocfs2_super *osb,
3412 			   struct ocfs2_lock_res *lockres)
3413 {
3414 	int ret;
3415 	unsigned long flags;
3416 	u32 lkm_flags = 0;
3417 
3418 	/* We didn't get anywhere near actually using this lockres. */
3419 	if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED))
3420 		goto out;
3421 
3422 	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
3423 		lkm_flags |= DLM_LKF_VALBLK;
3424 
3425 	spin_lock_irqsave(&lockres->l_lock, flags);
3426 
3427 	mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_FREEING),
3428 			"lockres %s, flags 0x%lx\n",
3429 			lockres->l_name, lockres->l_flags);
3430 
3431 	while (lockres->l_flags & OCFS2_LOCK_BUSY) {
3432 		mlog(0, "waiting on busy lock \"%s\": flags = %lx, action = "
3433 		     "%u, unlock_action = %u\n",
3434 		     lockres->l_name, lockres->l_flags, lockres->l_action,
3435 		     lockres->l_unlock_action);
3436 
3437 		spin_unlock_irqrestore(&lockres->l_lock, flags);
3438 
3439 		/* XXX: Today we just wait on any busy
3440 		 * locks... Perhaps we need to cancel converts in the
3441 		 * future? */
3442 		ocfs2_wait_on_busy_lock(lockres);
3443 
3444 		spin_lock_irqsave(&lockres->l_lock, flags);
3445 	}
3446 
3447 	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
3448 		if (lockres->l_flags & OCFS2_LOCK_ATTACHED &&
3449 		    lockres->l_level == DLM_LOCK_EX &&
3450 		    !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
3451 			lockres->l_ops->set_lvb(lockres);
3452 	}
3453 
3454 	if (lockres->l_flags & OCFS2_LOCK_BUSY)
3455 		mlog(ML_ERROR, "destroying busy lock: \"%s\"\n",
3456 		     lockres->l_name);
3457 	if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
3458 		mlog(0, "destroying blocked lock: \"%s\"\n", lockres->l_name);
3459 
3460 	if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
3461 		spin_unlock_irqrestore(&lockres->l_lock, flags);
3462 		goto out;
3463 	}
3464 
3465 	lockres_clear_flags(lockres, OCFS2_LOCK_ATTACHED);
3466 
3467 	/* make sure we never get here while waiting for an ast to
3468 	 * fire. */
3469 	BUG_ON(lockres->l_action != OCFS2_AST_INVALID);
3470 
3471 	/* is this necessary? */
3472 	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
3473 	lockres->l_unlock_action = OCFS2_UNLOCK_DROP_LOCK;
3474 	spin_unlock_irqrestore(&lockres->l_lock, flags);
3475 
3476 	mlog(0, "lock %s\n", lockres->l_name);
3477 
3478 	ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, lkm_flags);
3479 	if (ret) {
3480 		ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
3481 		mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags);
3482 		ocfs2_dlm_dump_lksb(&lockres->l_lksb);
3483 		BUG();
3484 	}
3485 	mlog(0, "lock %s, successful return from ocfs2_dlm_unlock\n",
3486 	     lockres->l_name);
3487 
3488 	ocfs2_wait_on_busy_lock(lockres);
3489 out:
3490 	return 0;
3491 }
3492 
3493 static void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
3494 				       struct ocfs2_lock_res *lockres);
3495 
3496 /* Mark the lockres as being dropped. It will no longer be
3497  * queued if blocking, but we still may have to wait on it
3498  * being dequeued from the downconvert thread before we can consider
3499  * it safe to drop.
3500  *
3501  * You can *not* attempt to call cluster_lock on this lockres anymore. */
3502 void ocfs2_mark_lockres_freeing(struct ocfs2_super *osb,
3503 				struct ocfs2_lock_res *lockres)
3504 {
3505 	int status;
3506 	struct ocfs2_mask_waiter mw;
3507 	unsigned long flags, flags2;
3508 
3509 	ocfs2_init_mask_waiter(&mw);
3510 
3511 	spin_lock_irqsave(&lockres->l_lock, flags);
3512 	lockres->l_flags |= OCFS2_LOCK_FREEING;
3513 	if (lockres->l_flags & OCFS2_LOCK_QUEUED && current == osb->dc_task) {
3514 		/*
3515 		 * We know the downconvert is queued but not in progress
3516 		 * because we are the downconvert thread and processing
3517 		 * different lock. So we can just remove the lock from the
3518 		 * queue. This is not only an optimization but also a way
3519 		 * to avoid the following deadlock:
3520 		 *   ocfs2_dentry_post_unlock()
3521 		 *     ocfs2_dentry_lock_put()
3522 		 *       ocfs2_drop_dentry_lock()
3523 		 *         iput()
3524 		 *           ocfs2_evict_inode()
3525 		 *             ocfs2_clear_inode()
3526 		 *               ocfs2_mark_lockres_freeing()
3527 		 *                 ... blocks waiting for OCFS2_LOCK_QUEUED
3528 		 *                 since we are the downconvert thread which
3529 		 *                 should clear the flag.
3530 		 */
3531 		spin_unlock_irqrestore(&lockres->l_lock, flags);
3532 		spin_lock_irqsave(&osb->dc_task_lock, flags2);
3533 		list_del_init(&lockres->l_blocked_list);
3534 		osb->blocked_lock_count--;
3535 		spin_unlock_irqrestore(&osb->dc_task_lock, flags2);
3536 		/*
3537 		 * Warn if we recurse into another post_unlock call.  Strictly
3538 		 * speaking it isn't a problem but we need to be careful if
3539 		 * that happens (stack overflow, deadlocks, ...) so warn if
3540 		 * ocfs2 grows a path for which this can happen.
3541 		 */
3542 		WARN_ON_ONCE(lockres->l_ops->post_unlock);
3543 		/* Since the lock is freeing we don't do much in the fn below */
3544 		ocfs2_process_blocked_lock(osb, lockres);
3545 		return;
3546 	}
3547 	while (lockres->l_flags & OCFS2_LOCK_QUEUED) {
3548 		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_QUEUED, 0);
3549 		spin_unlock_irqrestore(&lockres->l_lock, flags);
3550 
3551 		mlog(0, "Waiting on lockres %s\n", lockres->l_name);
3552 
3553 		status = ocfs2_wait_for_mask(&mw);
3554 		if (status)
3555 			mlog_errno(status);
3556 
3557 		spin_lock_irqsave(&lockres->l_lock, flags);
3558 	}
3559 	spin_unlock_irqrestore(&lockres->l_lock, flags);
3560 }
3561 
3562 void ocfs2_simple_drop_lockres(struct ocfs2_super *osb,
3563 			       struct ocfs2_lock_res *lockres)
3564 {
3565 	int ret;
3566 
3567 	ocfs2_mark_lockres_freeing(osb, lockres);
3568 	ret = ocfs2_drop_lock(osb, lockres);
3569 	if (ret)
3570 		mlog_errno(ret);
3571 }
3572 
3573 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb)
3574 {
3575 	ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres);
3576 	ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres);
3577 	ocfs2_simple_drop_lockres(osb, &osb->osb_nfs_sync_lockres);
3578 	ocfs2_simple_drop_lockres(osb, &osb->osb_orphan_scan.os_lockres);
3579 }
3580 
3581 int ocfs2_drop_inode_locks(struct inode *inode)
3582 {
3583 	int status, err;
3584 
3585 	/* No need to call ocfs2_mark_lockres_freeing here -
3586 	 * ocfs2_clear_inode has done it for us. */
3587 
3588 	err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
3589 			      &OCFS2_I(inode)->ip_open_lockres);
3590 	if (err < 0)
3591 		mlog_errno(err);
3592 
3593 	status = err;
3594 
3595 	err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
3596 			      &OCFS2_I(inode)->ip_inode_lockres);
3597 	if (err < 0)
3598 		mlog_errno(err);
3599 	if (err < 0 && !status)
3600 		status = err;
3601 
3602 	err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
3603 			      &OCFS2_I(inode)->ip_rw_lockres);
3604 	if (err < 0)
3605 		mlog_errno(err);
3606 	if (err < 0 && !status)
3607 		status = err;
3608 
3609 	return status;
3610 }
3611 
3612 static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
3613 					      int new_level)
3614 {
3615 	assert_spin_locked(&lockres->l_lock);
3616 
3617 	BUG_ON(lockres->l_blocking <= DLM_LOCK_NL);
3618 
3619 	if (lockres->l_level <= new_level) {
3620 		mlog(ML_ERROR, "lockres %s, lvl %d <= %d, blcklst %d, mask %d, "
3621 		     "type %d, flags 0x%lx, hold %d %d, act %d %d, req %d, "
3622 		     "block %d, pgen %d\n", lockres->l_name, lockres->l_level,
3623 		     new_level, list_empty(&lockres->l_blocked_list),
3624 		     list_empty(&lockres->l_mask_waiters), lockres->l_type,
3625 		     lockres->l_flags, lockres->l_ro_holders,
3626 		     lockres->l_ex_holders, lockres->l_action,
3627 		     lockres->l_unlock_action, lockres->l_requested,
3628 		     lockres->l_blocking, lockres->l_pending_gen);
3629 		BUG();
3630 	}
3631 
3632 	mlog(ML_BASTS, "lockres %s, level %d => %d, blocking %d\n",
3633 	     lockres->l_name, lockres->l_level, new_level, lockres->l_blocking);
3634 
3635 	lockres->l_action = OCFS2_AST_DOWNCONVERT;
3636 	lockres->l_requested = new_level;
3637 	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
3638 	return lockres_set_pending(lockres);
3639 }
3640 
3641 static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
3642 				  struct ocfs2_lock_res *lockres,
3643 				  int new_level,
3644 				  int lvb,
3645 				  unsigned int generation)
3646 {
3647 	int ret;
3648 	u32 dlm_flags = DLM_LKF_CONVERT;
3649 
3650 	mlog(ML_BASTS, "lockres %s, level %d => %d\n", lockres->l_name,
3651 	     lockres->l_level, new_level);
3652 
3653 	/*
3654 	 * On DLM_LKF_VALBLK, fsdlm behaves differently with o2cb. It always
3655 	 * expects DLM_LKF_VALBLK being set if the LKB has LVB, so that
3656 	 * we can recover correctly from node failure. Otherwise, we may get
3657 	 * invalid LVB in LKB, but without DLM_SBF_VALNOTVALID being set.
3658 	 */
3659 	if (ocfs2_userspace_stack(osb) &&
3660 	    lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
3661 		lvb = 1;
3662 
3663 	if (lvb)
3664 		dlm_flags |= DLM_LKF_VALBLK;
3665 
3666 	ret = ocfs2_dlm_lock(osb->cconn,
3667 			     new_level,
3668 			     &lockres->l_lksb,
3669 			     dlm_flags,
3670 			     lockres->l_name,
3671 			     OCFS2_LOCK_ID_MAX_LEN - 1);
3672 	lockres_clear_pending(lockres, generation, osb);
3673 	if (ret) {
3674 		ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
3675 		ocfs2_recover_from_dlm_error(lockres, 1);
3676 		goto bail;
3677 	}
3678 
3679 	ret = 0;
3680 bail:
3681 	return ret;
3682 }
3683 
3684 /* returns 1 when the caller should unlock and call ocfs2_dlm_unlock */
3685 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
3686 				        struct ocfs2_lock_res *lockres)
3687 {
3688 	assert_spin_locked(&lockres->l_lock);
3689 
3690 	if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
3691 		/* If we're already trying to cancel a lock conversion
3692 		 * then just drop the spinlock and allow the caller to
3693 		 * requeue this lock. */
3694 		mlog(ML_BASTS, "lockres %s, skip convert\n", lockres->l_name);
3695 		return 0;
3696 	}
3697 
3698 	/* were we in a convert when we got the bast fire? */
3699 	BUG_ON(lockres->l_action != OCFS2_AST_CONVERT &&
3700 	       lockres->l_action != OCFS2_AST_DOWNCONVERT);
3701 	/* set things up for the unlockast to know to just
3702 	 * clear out the ast_action and unset busy, etc. */
3703 	lockres->l_unlock_action = OCFS2_UNLOCK_CANCEL_CONVERT;
3704 
3705 	mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_BUSY),
3706 			"lock %s, invalid flags: 0x%lx\n",
3707 			lockres->l_name, lockres->l_flags);
3708 
3709 	mlog(ML_BASTS, "lockres %s\n", lockres->l_name);
3710 
3711 	return 1;
3712 }
3713 
3714 static int ocfs2_cancel_convert(struct ocfs2_super *osb,
3715 				struct ocfs2_lock_res *lockres)
3716 {
3717 	int ret;
3718 
3719 	ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb,
3720 			       DLM_LKF_CANCEL);
3721 	if (ret) {
3722 		ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
3723 		ocfs2_recover_from_dlm_error(lockres, 0);
3724 	}
3725 
3726 	mlog(ML_BASTS, "lockres %s\n", lockres->l_name);
3727 
3728 	return ret;
3729 }
3730 
3731 static int ocfs2_unblock_lock(struct ocfs2_super *osb,
3732 			      struct ocfs2_lock_res *lockres,
3733 			      struct ocfs2_unblock_ctl *ctl)
3734 {
3735 	unsigned long flags;
3736 	int blocking;
3737 	int new_level;
3738 	int level;
3739 	int ret = 0;
3740 	int set_lvb = 0;
3741 	unsigned int gen;
3742 
3743 	spin_lock_irqsave(&lockres->l_lock, flags);
3744 
3745 recheck:
3746 	/*
3747 	 * Is it still blocking? If not, we have no more work to do.
3748 	 */
3749 	if (!(lockres->l_flags & OCFS2_LOCK_BLOCKED)) {
3750 		BUG_ON(lockres->l_blocking != DLM_LOCK_NL);
3751 		spin_unlock_irqrestore(&lockres->l_lock, flags);
3752 		ret = 0;
3753 		goto leave;
3754 	}
3755 
3756 	if (lockres->l_flags & OCFS2_LOCK_BUSY) {
3757 		/* XXX
3758 		 * This is a *big* race.  The OCFS2_LOCK_PENDING flag
3759 		 * exists entirely for one reason - another thread has set
3760 		 * OCFS2_LOCK_BUSY, but has *NOT* yet called dlm_lock().
3761 		 *
3762 		 * If we do ocfs2_cancel_convert() before the other thread
3763 		 * calls dlm_lock(), our cancel will do nothing.  We will
3764 		 * get no ast, and we will have no way of knowing the
3765 		 * cancel failed.  Meanwhile, the other thread will call
3766 		 * into dlm_lock() and wait...forever.
3767 		 *
3768 		 * Why forever?  Because another node has asked for the
3769 		 * lock first; that's why we're here in unblock_lock().
3770 		 *
3771 		 * The solution is OCFS2_LOCK_PENDING.  When PENDING is
3772 		 * set, we just requeue the unblock.  Only when the other
3773 		 * thread has called dlm_lock() and cleared PENDING will
3774 		 * we then cancel their request.
3775 		 *
3776 		 * All callers of dlm_lock() must set OCFS2_DLM_PENDING
3777 		 * at the same time they set OCFS2_DLM_BUSY.  They must
3778 		 * clear OCFS2_DLM_PENDING after dlm_lock() returns.
3779 		 */
3780 		if (lockres->l_flags & OCFS2_LOCK_PENDING) {
3781 			mlog(ML_BASTS, "lockres %s, ReQ: Pending\n",
3782 			     lockres->l_name);
3783 			goto leave_requeue;
3784 		}
3785 
3786 		ctl->requeue = 1;
3787 		ret = ocfs2_prepare_cancel_convert(osb, lockres);
3788 		spin_unlock_irqrestore(&lockres->l_lock, flags);
3789 		if (ret) {
3790 			ret = ocfs2_cancel_convert(osb, lockres);
3791 			if (ret < 0)
3792 				mlog_errno(ret);
3793 		}
3794 		goto leave;
3795 	}
3796 
3797 	/*
3798 	 * This prevents livelocks. OCFS2_LOCK_UPCONVERT_FINISHING flag is
3799 	 * set when the ast is received for an upconvert just before the
3800 	 * OCFS2_LOCK_BUSY flag is cleared. Now if the fs received a bast
3801 	 * on the heels of the ast, we want to delay the downconvert just
3802 	 * enough to allow the up requestor to do its task. Because this
3803 	 * lock is in the blocked queue, the lock will be downconverted
3804 	 * as soon as the requestor is done with the lock.
3805 	 */
3806 	if (lockres->l_flags & OCFS2_LOCK_UPCONVERT_FINISHING)
3807 		goto leave_requeue;
3808 
3809 	/*
3810 	 * How can we block and yet be at NL?  We were trying to upconvert
3811 	 * from NL and got canceled.  The code comes back here, and now
3812 	 * we notice and clear BLOCKING.
3813 	 */
3814 	if (lockres->l_level == DLM_LOCK_NL) {
3815 		BUG_ON(lockres->l_ex_holders || lockres->l_ro_holders);
3816 		mlog(ML_BASTS, "lockres %s, Aborting dc\n", lockres->l_name);
3817 		lockres->l_blocking = DLM_LOCK_NL;
3818 		lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED);
3819 		spin_unlock_irqrestore(&lockres->l_lock, flags);
3820 		goto leave;
3821 	}
3822 
3823 	/* if we're blocking an exclusive and we have *any* holders,
3824 	 * then requeue. */
3825 	if ((lockres->l_blocking == DLM_LOCK_EX)
3826 	    && (lockres->l_ex_holders || lockres->l_ro_holders)) {
3827 		mlog(ML_BASTS, "lockres %s, ReQ: EX/PR Holders %u,%u\n",
3828 		     lockres->l_name, lockres->l_ex_holders,
3829 		     lockres->l_ro_holders);
3830 		goto leave_requeue;
3831 	}
3832 
3833 	/* If it's a PR we're blocking, then only
3834 	 * requeue if we've got any EX holders */
3835 	if (lockres->l_blocking == DLM_LOCK_PR &&
3836 	    lockres->l_ex_holders) {
3837 		mlog(ML_BASTS, "lockres %s, ReQ: EX Holders %u\n",
3838 		     lockres->l_name, lockres->l_ex_holders);
3839 		goto leave_requeue;
3840 	}
3841 
3842 	/*
3843 	 * Can we get a lock in this state if the holder counts are
3844 	 * zero? The meta data unblock code used to check this.
3845 	 */
3846 	if ((lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
3847 	    && (lockres->l_flags & OCFS2_LOCK_REFRESHING)) {
3848 		mlog(ML_BASTS, "lockres %s, ReQ: Lock Refreshing\n",
3849 		     lockres->l_name);
3850 		goto leave_requeue;
3851 	}
3852 
3853 	new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking);
3854 
3855 	if (lockres->l_ops->check_downconvert
3856 	    && !lockres->l_ops->check_downconvert(lockres, new_level)) {
3857 		mlog(ML_BASTS, "lockres %s, ReQ: Checkpointing\n",
3858 		     lockres->l_name);
3859 		goto leave_requeue;
3860 	}
3861 
3862 	/* If we get here, then we know that there are no more
3863 	 * incompatible holders (and anyone asking for an incompatible
3864 	 * lock is blocked). We can now downconvert the lock */
3865 	if (!lockres->l_ops->downconvert_worker)
3866 		goto downconvert;
3867 
3868 	/* Some lockres types want to do a bit of work before
3869 	 * downconverting a lock. Allow that here. The worker function
3870 	 * may sleep, so we save off a copy of what we're blocking as
3871 	 * it may change while we're not holding the spin lock. */
3872 	blocking = lockres->l_blocking;
3873 	level = lockres->l_level;
3874 	spin_unlock_irqrestore(&lockres->l_lock, flags);
3875 
3876 	ctl->unblock_action = lockres->l_ops->downconvert_worker(lockres, blocking);
3877 
3878 	if (ctl->unblock_action == UNBLOCK_STOP_POST) {
3879 		mlog(ML_BASTS, "lockres %s, UNBLOCK_STOP_POST\n",
3880 		     lockres->l_name);
3881 		goto leave;
3882 	}
3883 
3884 	spin_lock_irqsave(&lockres->l_lock, flags);
3885 	if ((blocking != lockres->l_blocking) || (level != lockres->l_level)) {
3886 		/* If this changed underneath us, then we can't drop
3887 		 * it just yet. */
3888 		mlog(ML_BASTS, "lockres %s, block=%d:%d, level=%d:%d, "
3889 		     "Recheck\n", lockres->l_name, blocking,
3890 		     lockres->l_blocking, level, lockres->l_level);
3891 		goto recheck;
3892 	}
3893 
3894 downconvert:
3895 	ctl->requeue = 0;
3896 
3897 	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
3898 		if (lockres->l_level == DLM_LOCK_EX)
3899 			set_lvb = 1;
3900 
3901 		/*
3902 		 * We only set the lvb if the lock has been fully
3903 		 * refreshed - otherwise we risk setting stale
3904 		 * data. Otherwise, there's no need to actually clear
3905 		 * out the lvb here as it's value is still valid.
3906 		 */
3907 		if (set_lvb && !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
3908 			lockres->l_ops->set_lvb(lockres);
3909 	}
3910 
3911 	gen = ocfs2_prepare_downconvert(lockres, new_level);
3912 	spin_unlock_irqrestore(&lockres->l_lock, flags);
3913 	ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb,
3914 				     gen);
3915 
3916 leave:
3917 	if (ret)
3918 		mlog_errno(ret);
3919 	return ret;
3920 
3921 leave_requeue:
3922 	spin_unlock_irqrestore(&lockres->l_lock, flags);
3923 	ctl->requeue = 1;
3924 
3925 	return 0;
3926 }
3927 
3928 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
3929 				     int blocking)
3930 {
3931 	struct inode *inode;
3932 	struct address_space *mapping;
3933 	struct ocfs2_inode_info *oi;
3934 
3935        	inode = ocfs2_lock_res_inode(lockres);
3936 	mapping = inode->i_mapping;
3937 
3938 	if (S_ISDIR(inode->i_mode)) {
3939 		oi = OCFS2_I(inode);
3940 		oi->ip_dir_lock_gen++;
3941 		mlog(0, "generation: %u\n", oi->ip_dir_lock_gen);
3942 		goto out;
3943 	}
3944 
3945 	if (!S_ISREG(inode->i_mode))
3946 		goto out;
3947 
3948 	/*
3949 	 * We need this before the filemap_fdatawrite() so that it can
3950 	 * transfer the dirty bit from the PTE to the
3951 	 * page. Unfortunately this means that even for EX->PR
3952 	 * downconverts, we'll lose our mappings and have to build
3953 	 * them up again.
3954 	 */
3955 	unmap_mapping_range(mapping, 0, 0, 0);
3956 
3957 	if (filemap_fdatawrite(mapping)) {
3958 		mlog(ML_ERROR, "Could not sync inode %llu for downconvert!",
3959 		     (unsigned long long)OCFS2_I(inode)->ip_blkno);
3960 	}
3961 	sync_mapping_buffers(mapping);
3962 	if (blocking == DLM_LOCK_EX) {
3963 		truncate_inode_pages(mapping, 0);
3964 	} else {
3965 		/* We only need to wait on the I/O if we're not also
3966 		 * truncating pages because truncate_inode_pages waits
3967 		 * for us above. We don't truncate pages if we're
3968 		 * blocking anything < EXMODE because we want to keep
3969 		 * them around in that case. */
3970 		filemap_fdatawait(mapping);
3971 	}
3972 
3973 	forget_all_cached_acls(inode);
3974 
3975 out:
3976 	return UNBLOCK_CONTINUE;
3977 }
3978 
3979 static int ocfs2_ci_checkpointed(struct ocfs2_caching_info *ci,
3980 				 struct ocfs2_lock_res *lockres,
3981 				 int new_level)
3982 {
3983 	int checkpointed = ocfs2_ci_fully_checkpointed(ci);
3984 
3985 	BUG_ON(new_level != DLM_LOCK_NL && new_level != DLM_LOCK_PR);
3986 	BUG_ON(lockres->l_level != DLM_LOCK_EX && !checkpointed);
3987 
3988 	if (checkpointed)
3989 		return 1;
3990 
3991 	ocfs2_start_checkpoint(OCFS2_SB(ocfs2_metadata_cache_get_super(ci)));
3992 	return 0;
3993 }
3994 
3995 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
3996 					int new_level)
3997 {
3998 	struct inode *inode = ocfs2_lock_res_inode(lockres);
3999 
4000 	return ocfs2_ci_checkpointed(INODE_CACHE(inode), lockres, new_level);
4001 }
4002 
4003 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres)
4004 {
4005 	struct inode *inode = ocfs2_lock_res_inode(lockres);
4006 
4007 	__ocfs2_stuff_meta_lvb(inode);
4008 }
4009 
4010 /*
4011  * Does the final reference drop on our dentry lock. Right now this
4012  * happens in the downconvert thread, but we could choose to simplify the
4013  * dlmglue API and push these off to the ocfs2_wq in the future.
4014  */
4015 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
4016 				     struct ocfs2_lock_res *lockres)
4017 {
4018 	struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
4019 	ocfs2_dentry_lock_put(osb, dl);
4020 }
4021 
4022 /*
4023  * d_delete() matching dentries before the lock downconvert.
4024  *
4025  * At this point, any process waiting to destroy the
4026  * dentry_lock due to last ref count is stopped by the
4027  * OCFS2_LOCK_QUEUED flag.
4028  *
4029  * We have two potential problems
4030  *
4031  * 1) If we do the last reference drop on our dentry_lock (via dput)
4032  *    we'll wind up in ocfs2_release_dentry_lock(), waiting on
4033  *    the downconvert to finish. Instead we take an elevated
4034  *    reference and push the drop until after we've completed our
4035  *    unblock processing.
4036  *
4037  * 2) There might be another process with a final reference,
4038  *    waiting on us to finish processing. If this is the case, we
4039  *    detect it and exit out - there's no more dentries anyway.
4040  */
4041 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
4042 				       int blocking)
4043 {
4044 	struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
4045 	struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode);
4046 	struct dentry *dentry;
4047 	unsigned long flags;
4048 	int extra_ref = 0;
4049 
4050 	/*
4051 	 * This node is blocking another node from getting a read
4052 	 * lock. This happens when we've renamed within a
4053 	 * directory. We've forced the other nodes to d_delete(), but
4054 	 * we never actually dropped our lock because it's still
4055 	 * valid. The downconvert code will retain a PR for this node,
4056 	 * so there's no further work to do.
4057 	 */
4058 	if (blocking == DLM_LOCK_PR)
4059 		return UNBLOCK_CONTINUE;
4060 
4061 	/*
4062 	 * Mark this inode as potentially orphaned. The code in
4063 	 * ocfs2_delete_inode() will figure out whether it actually
4064 	 * needs to be freed or not.
4065 	 */
4066 	spin_lock(&oi->ip_lock);
4067 	oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED;
4068 	spin_unlock(&oi->ip_lock);
4069 
4070 	/*
4071 	 * Yuck. We need to make sure however that the check of
4072 	 * OCFS2_LOCK_FREEING and the extra reference are atomic with
4073 	 * respect to a reference decrement or the setting of that
4074 	 * flag.
4075 	 */
4076 	spin_lock_irqsave(&lockres->l_lock, flags);
4077 	spin_lock(&dentry_attach_lock);
4078 	if (!(lockres->l_flags & OCFS2_LOCK_FREEING)
4079 	    && dl->dl_count) {
4080 		dl->dl_count++;
4081 		extra_ref = 1;
4082 	}
4083 	spin_unlock(&dentry_attach_lock);
4084 	spin_unlock_irqrestore(&lockres->l_lock, flags);
4085 
4086 	mlog(0, "extra_ref = %d\n", extra_ref);
4087 
4088 	/*
4089 	 * We have a process waiting on us in ocfs2_dentry_iput(),
4090 	 * which means we can't have any more outstanding
4091 	 * aliases. There's no need to do any more work.
4092 	 */
4093 	if (!extra_ref)
4094 		return UNBLOCK_CONTINUE;
4095 
4096 	spin_lock(&dentry_attach_lock);
4097 	while (1) {
4098 		dentry = ocfs2_find_local_alias(dl->dl_inode,
4099 						dl->dl_parent_blkno, 1);
4100 		if (!dentry)
4101 			break;
4102 		spin_unlock(&dentry_attach_lock);
4103 
4104 		if (S_ISDIR(dl->dl_inode->i_mode))
4105 			shrink_dcache_parent(dentry);
4106 
4107 		mlog(0, "d_delete(%pd);\n", dentry);
4108 
4109 		/*
4110 		 * The following dcache calls may do an
4111 		 * iput(). Normally we don't want that from the
4112 		 * downconverting thread, but in this case it's ok
4113 		 * because the requesting node already has an
4114 		 * exclusive lock on the inode, so it can't be queued
4115 		 * for a downconvert.
4116 		 */
4117 		d_delete(dentry);
4118 		dput(dentry);
4119 
4120 		spin_lock(&dentry_attach_lock);
4121 	}
4122 	spin_unlock(&dentry_attach_lock);
4123 
4124 	/*
4125 	 * If we are the last holder of this dentry lock, there is no
4126 	 * reason to downconvert so skip straight to the unlock.
4127 	 */
4128 	if (dl->dl_count == 1)
4129 		return UNBLOCK_STOP_POST;
4130 
4131 	return UNBLOCK_CONTINUE_POST;
4132 }
4133 
4134 static int ocfs2_check_refcount_downconvert(struct ocfs2_lock_res *lockres,
4135 					    int new_level)
4136 {
4137 	struct ocfs2_refcount_tree *tree =
4138 				ocfs2_lock_res_refcount_tree(lockres);
4139 
4140 	return ocfs2_ci_checkpointed(&tree->rf_ci, lockres, new_level);
4141 }
4142 
4143 static int ocfs2_refcount_convert_worker(struct ocfs2_lock_res *lockres,
4144 					 int blocking)
4145 {
4146 	struct ocfs2_refcount_tree *tree =
4147 				ocfs2_lock_res_refcount_tree(lockres);
4148 
4149 	ocfs2_metadata_cache_purge(&tree->rf_ci);
4150 
4151 	return UNBLOCK_CONTINUE;
4152 }
4153 
4154 static void ocfs2_set_qinfo_lvb(struct ocfs2_lock_res *lockres)
4155 {
4156 	struct ocfs2_qinfo_lvb *lvb;
4157 	struct ocfs2_mem_dqinfo *oinfo = ocfs2_lock_res_qinfo(lockres);
4158 	struct mem_dqinfo *info = sb_dqinfo(oinfo->dqi_gi.dqi_sb,
4159 					    oinfo->dqi_gi.dqi_type);
4160 
4161 	lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
4162 	lvb->lvb_version = OCFS2_QINFO_LVB_VERSION;
4163 	lvb->lvb_bgrace = cpu_to_be32(info->dqi_bgrace);
4164 	lvb->lvb_igrace = cpu_to_be32(info->dqi_igrace);
4165 	lvb->lvb_syncms = cpu_to_be32(oinfo->dqi_syncms);
4166 	lvb->lvb_blocks = cpu_to_be32(oinfo->dqi_gi.dqi_blocks);
4167 	lvb->lvb_free_blk = cpu_to_be32(oinfo->dqi_gi.dqi_free_blk);
4168 	lvb->lvb_free_entry = cpu_to_be32(oinfo->dqi_gi.dqi_free_entry);
4169 }
4170 
4171 void ocfs2_qinfo_unlock(struct ocfs2_mem_dqinfo *oinfo, int ex)
4172 {
4173 	struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock;
4174 	struct ocfs2_super *osb = OCFS2_SB(oinfo->dqi_gi.dqi_sb);
4175 	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
4176 
4177 	if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb))
4178 		ocfs2_cluster_unlock(osb, lockres, level);
4179 }
4180 
4181 static int ocfs2_refresh_qinfo(struct ocfs2_mem_dqinfo *oinfo)
4182 {
4183 	struct mem_dqinfo *info = sb_dqinfo(oinfo->dqi_gi.dqi_sb,
4184 					    oinfo->dqi_gi.dqi_type);
4185 	struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock;
4186 	struct ocfs2_qinfo_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
4187 	struct buffer_head *bh = NULL;
4188 	struct ocfs2_global_disk_dqinfo *gdinfo;
4189 	int status = 0;
4190 
4191 	if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) &&
4192 	    lvb->lvb_version == OCFS2_QINFO_LVB_VERSION) {
4193 		info->dqi_bgrace = be32_to_cpu(lvb->lvb_bgrace);
4194 		info->dqi_igrace = be32_to_cpu(lvb->lvb_igrace);
4195 		oinfo->dqi_syncms = be32_to_cpu(lvb->lvb_syncms);
4196 		oinfo->dqi_gi.dqi_blocks = be32_to_cpu(lvb->lvb_blocks);
4197 		oinfo->dqi_gi.dqi_free_blk = be32_to_cpu(lvb->lvb_free_blk);
4198 		oinfo->dqi_gi.dqi_free_entry =
4199 					be32_to_cpu(lvb->lvb_free_entry);
4200 	} else {
4201 		status = ocfs2_read_quota_phys_block(oinfo->dqi_gqinode,
4202 						     oinfo->dqi_giblk, &bh);
4203 		if (status) {
4204 			mlog_errno(status);
4205 			goto bail;
4206 		}
4207 		gdinfo = (struct ocfs2_global_disk_dqinfo *)
4208 					(bh->b_data + OCFS2_GLOBAL_INFO_OFF);
4209 		info->dqi_bgrace = le32_to_cpu(gdinfo->dqi_bgrace);
4210 		info->dqi_igrace = le32_to_cpu(gdinfo->dqi_igrace);
4211 		oinfo->dqi_syncms = le32_to_cpu(gdinfo->dqi_syncms);
4212 		oinfo->dqi_gi.dqi_blocks = le32_to_cpu(gdinfo->dqi_blocks);
4213 		oinfo->dqi_gi.dqi_free_blk = le32_to_cpu(gdinfo->dqi_free_blk);
4214 		oinfo->dqi_gi.dqi_free_entry =
4215 					le32_to_cpu(gdinfo->dqi_free_entry);
4216 		brelse(bh);
4217 		ocfs2_track_lock_refresh(lockres);
4218 	}
4219 
4220 bail:
4221 	return status;
4222 }
4223 
4224 /* Lock quota info, this function expects at least shared lock on the quota file
4225  * so that we can safely refresh quota info from disk. */
4226 int ocfs2_qinfo_lock(struct ocfs2_mem_dqinfo *oinfo, int ex)
4227 {
4228 	struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock;
4229 	struct ocfs2_super *osb = OCFS2_SB(oinfo->dqi_gi.dqi_sb);
4230 	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
4231 	int status = 0;
4232 
4233 	/* On RO devices, locking really isn't needed... */
4234 	if (ocfs2_is_hard_readonly(osb)) {
4235 		if (ex)
4236 			status = -EROFS;
4237 		goto bail;
4238 	}
4239 	if (ocfs2_mount_local(osb))
4240 		goto bail;
4241 
4242 	status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
4243 	if (status < 0) {
4244 		mlog_errno(status);
4245 		goto bail;
4246 	}
4247 	if (!ocfs2_should_refresh_lock_res(lockres))
4248 		goto bail;
4249 	/* OK, we have the lock but we need to refresh the quota info */
4250 	status = ocfs2_refresh_qinfo(oinfo);
4251 	if (status)
4252 		ocfs2_qinfo_unlock(oinfo, ex);
4253 	ocfs2_complete_lock_res_refresh(lockres, status);
4254 bail:
4255 	return status;
4256 }
4257 
4258 int ocfs2_refcount_lock(struct ocfs2_refcount_tree *ref_tree, int ex)
4259 {
4260 	int status;
4261 	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
4262 	struct ocfs2_lock_res *lockres = &ref_tree->rf_lockres;
4263 	struct ocfs2_super *osb = lockres->l_priv;
4264 
4265 
4266 	if (ocfs2_is_hard_readonly(osb))
4267 		return -EROFS;
4268 
4269 	if (ocfs2_mount_local(osb))
4270 		return 0;
4271 
4272 	status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
4273 	if (status < 0)
4274 		mlog_errno(status);
4275 
4276 	return status;
4277 }
4278 
4279 void ocfs2_refcount_unlock(struct ocfs2_refcount_tree *ref_tree, int ex)
4280 {
4281 	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
4282 	struct ocfs2_lock_res *lockres = &ref_tree->rf_lockres;
4283 	struct ocfs2_super *osb = lockres->l_priv;
4284 
4285 	if (!ocfs2_mount_local(osb))
4286 		ocfs2_cluster_unlock(osb, lockres, level);
4287 }
4288 
4289 static void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
4290 				       struct ocfs2_lock_res *lockres)
4291 {
4292 	int status;
4293 	struct ocfs2_unblock_ctl ctl = {0, 0,};
4294 	unsigned long flags;
4295 
4296 	/* Our reference to the lockres in this function can be
4297 	 * considered valid until we remove the OCFS2_LOCK_QUEUED
4298 	 * flag. */
4299 
4300 	BUG_ON(!lockres);
4301 	BUG_ON(!lockres->l_ops);
4302 
4303 	mlog(ML_BASTS, "lockres %s blocked\n", lockres->l_name);
4304 
4305 	/* Detect whether a lock has been marked as going away while
4306 	 * the downconvert thread was processing other things. A lock can
4307 	 * still be marked with OCFS2_LOCK_FREEING after this check,
4308 	 * but short circuiting here will still save us some
4309 	 * performance. */
4310 	spin_lock_irqsave(&lockres->l_lock, flags);
4311 	if (lockres->l_flags & OCFS2_LOCK_FREEING)
4312 		goto unqueue;
4313 	spin_unlock_irqrestore(&lockres->l_lock, flags);
4314 
4315 	status = ocfs2_unblock_lock(osb, lockres, &ctl);
4316 	if (status < 0)
4317 		mlog_errno(status);
4318 
4319 	spin_lock_irqsave(&lockres->l_lock, flags);
4320 unqueue:
4321 	if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) {
4322 		lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED);
4323 	} else
4324 		ocfs2_schedule_blocked_lock(osb, lockres);
4325 
4326 	mlog(ML_BASTS, "lockres %s, requeue = %s.\n", lockres->l_name,
4327 	     ctl.requeue ? "yes" : "no");
4328 	spin_unlock_irqrestore(&lockres->l_lock, flags);
4329 
4330 	if (ctl.unblock_action != UNBLOCK_CONTINUE
4331 	    && lockres->l_ops->post_unlock)
4332 		lockres->l_ops->post_unlock(osb, lockres);
4333 }
4334 
4335 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
4336 					struct ocfs2_lock_res *lockres)
4337 {
4338 	unsigned long flags;
4339 
4340 	assert_spin_locked(&lockres->l_lock);
4341 
4342 	if (lockres->l_flags & OCFS2_LOCK_FREEING) {
4343 		/* Do not schedule a lock for downconvert when it's on
4344 		 * the way to destruction - any nodes wanting access
4345 		 * to the resource will get it soon. */
4346 		mlog(ML_BASTS, "lockres %s won't be scheduled: flags 0x%lx\n",
4347 		     lockres->l_name, lockres->l_flags);
4348 		return;
4349 	}
4350 
4351 	lockres_or_flags(lockres, OCFS2_LOCK_QUEUED);
4352 
4353 	spin_lock_irqsave(&osb->dc_task_lock, flags);
4354 	if (list_empty(&lockres->l_blocked_list)) {
4355 		list_add_tail(&lockres->l_blocked_list,
4356 			      &osb->blocked_lock_list);
4357 		osb->blocked_lock_count++;
4358 	}
4359 	spin_unlock_irqrestore(&osb->dc_task_lock, flags);
4360 }
4361 
4362 static void ocfs2_downconvert_thread_do_work(struct ocfs2_super *osb)
4363 {
4364 	unsigned long processed;
4365 	unsigned long flags;
4366 	struct ocfs2_lock_res *lockres;
4367 
4368 	spin_lock_irqsave(&osb->dc_task_lock, flags);
4369 	/* grab this early so we know to try again if a state change and
4370 	 * wake happens part-way through our work  */
4371 	osb->dc_work_sequence = osb->dc_wake_sequence;
4372 
4373 	processed = osb->blocked_lock_count;
4374 	/*
4375 	 * blocked lock processing in this loop might call iput which can
4376 	 * remove items off osb->blocked_lock_list. Downconvert up to
4377 	 * 'processed' number of locks, but stop short if we had some
4378 	 * removed in ocfs2_mark_lockres_freeing when downconverting.
4379 	 */
4380 	while (processed && !list_empty(&osb->blocked_lock_list)) {
4381 		lockres = list_entry(osb->blocked_lock_list.next,
4382 				     struct ocfs2_lock_res, l_blocked_list);
4383 		list_del_init(&lockres->l_blocked_list);
4384 		osb->blocked_lock_count--;
4385 		spin_unlock_irqrestore(&osb->dc_task_lock, flags);
4386 
4387 		BUG_ON(!processed);
4388 		processed--;
4389 
4390 		ocfs2_process_blocked_lock(osb, lockres);
4391 
4392 		spin_lock_irqsave(&osb->dc_task_lock, flags);
4393 	}
4394 	spin_unlock_irqrestore(&osb->dc_task_lock, flags);
4395 }
4396 
4397 static int ocfs2_downconvert_thread_lists_empty(struct ocfs2_super *osb)
4398 {
4399 	int empty = 0;
4400 	unsigned long flags;
4401 
4402 	spin_lock_irqsave(&osb->dc_task_lock, flags);
4403 	if (list_empty(&osb->blocked_lock_list))
4404 		empty = 1;
4405 
4406 	spin_unlock_irqrestore(&osb->dc_task_lock, flags);
4407 	return empty;
4408 }
4409 
4410 static int ocfs2_downconvert_thread_should_wake(struct ocfs2_super *osb)
4411 {
4412 	int should_wake = 0;
4413 	unsigned long flags;
4414 
4415 	spin_lock_irqsave(&osb->dc_task_lock, flags);
4416 	if (osb->dc_work_sequence != osb->dc_wake_sequence)
4417 		should_wake = 1;
4418 	spin_unlock_irqrestore(&osb->dc_task_lock, flags);
4419 
4420 	return should_wake;
4421 }
4422 
4423 static int ocfs2_downconvert_thread(void *arg)
4424 {
4425 	struct ocfs2_super *osb = arg;
4426 
4427 	/* only quit once we've been asked to stop and there is no more
4428 	 * work available */
4429 	while (!(kthread_should_stop() &&
4430 		ocfs2_downconvert_thread_lists_empty(osb))) {
4431 
4432 		wait_event_interruptible(osb->dc_event,
4433 					 ocfs2_downconvert_thread_should_wake(osb) ||
4434 					 kthread_should_stop());
4435 
4436 		mlog(0, "downconvert_thread: awoken\n");
4437 
4438 		ocfs2_downconvert_thread_do_work(osb);
4439 	}
4440 
4441 	osb->dc_task = NULL;
4442 	return 0;
4443 }
4444 
4445 void ocfs2_wake_downconvert_thread(struct ocfs2_super *osb)
4446 {
4447 	unsigned long flags;
4448 
4449 	spin_lock_irqsave(&osb->dc_task_lock, flags);
4450 	/* make sure the voting thread gets a swipe at whatever changes
4451 	 * the caller may have made to the voting state */
4452 	osb->dc_wake_sequence++;
4453 	spin_unlock_irqrestore(&osb->dc_task_lock, flags);
4454 	wake_up(&osb->dc_event);
4455 }
4456