xref: /openbmc/linux/fs/ocfs2/dlmglue.c (revision fd589a8f)
1 /* -*- mode: c; c-basic-offset: 8; -*-
2  * vim: noexpandtab sw=8 ts=8 sts=0:
3  *
4  * dlmglue.c
5  *
6  * Code which implements an OCFS2 specific interface to our DLM.
7  *
8  * Copyright (C) 2003, 2004 Oracle.  All rights reserved.
9  *
10  * This program is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public
21  * License along with this program; if not, write to the
22  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23  * Boston, MA 021110-1307, USA.
24  */
25 
26 #include <linux/types.h>
27 #include <linux/slab.h>
28 #include <linux/highmem.h>
29 #include <linux/mm.h>
30 #include <linux/kthread.h>
31 #include <linux/pagemap.h>
32 #include <linux/debugfs.h>
33 #include <linux/seq_file.h>
34 #include <linux/time.h>
35 #include <linux/quotaops.h>
36 
37 #define MLOG_MASK_PREFIX ML_DLM_GLUE
38 #include <cluster/masklog.h>
39 
40 #include "ocfs2.h"
41 #include "ocfs2_lockingver.h"
42 
43 #include "alloc.h"
44 #include "dcache.h"
45 #include "dlmglue.h"
46 #include "extent_map.h"
47 #include "file.h"
48 #include "heartbeat.h"
49 #include "inode.h"
50 #include "journal.h"
51 #include "stackglue.h"
52 #include "slot_map.h"
53 #include "super.h"
54 #include "uptodate.h"
55 #include "quota.h"
56 
57 #include "buffer_head_io.h"
58 
59 struct ocfs2_mask_waiter {
60 	struct list_head	mw_item;
61 	int			mw_status;
62 	struct completion	mw_complete;
63 	unsigned long		mw_mask;
64 	unsigned long		mw_goal;
65 #ifdef CONFIG_OCFS2_FS_STATS
66 	unsigned long long 	mw_lock_start;
67 #endif
68 };
69 
70 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres);
71 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres);
72 static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres);
73 static struct ocfs2_super *ocfs2_get_qinfo_osb(struct ocfs2_lock_res *lockres);
74 
75 /*
76  * Return value from ->downconvert_worker functions.
77  *
78  * These control the precise actions of ocfs2_unblock_lock()
79  * and ocfs2_process_blocked_lock()
80  *
81  */
82 enum ocfs2_unblock_action {
83 	UNBLOCK_CONTINUE	= 0, /* Continue downconvert */
84 	UNBLOCK_CONTINUE_POST	= 1, /* Continue downconvert, fire
85 				      * ->post_unlock callback */
86 	UNBLOCK_STOP_POST	= 2, /* Do not downconvert, fire
87 				      * ->post_unlock() callback. */
88 };
89 
90 struct ocfs2_unblock_ctl {
91 	int requeue;
92 	enum ocfs2_unblock_action unblock_action;
93 };
94 
95 /* Lockdep class keys */
96 struct lock_class_key lockdep_keys[OCFS2_NUM_LOCK_TYPES];
97 
98 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
99 					int new_level);
100 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres);
101 
102 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
103 				     int blocking);
104 
105 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
106 				       int blocking);
107 
108 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
109 				     struct ocfs2_lock_res *lockres);
110 
111 static void ocfs2_set_qinfo_lvb(struct ocfs2_lock_res *lockres);
112 
113 #define mlog_meta_lvb(__level, __lockres) ocfs2_dump_meta_lvb_info(__level, __PRETTY_FUNCTION__, __LINE__, __lockres)
114 
115 /* This aids in debugging situations where a bad LVB might be involved. */
116 static void ocfs2_dump_meta_lvb_info(u64 level,
117 				     const char *function,
118 				     unsigned int line,
119 				     struct ocfs2_lock_res *lockres)
120 {
121 	struct ocfs2_meta_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
122 
123 	mlog(level, "LVB information for %s (called from %s:%u):\n",
124 	     lockres->l_name, function, line);
125 	mlog(level, "version: %u, clusters: %u, generation: 0x%x\n",
126 	     lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters),
127 	     be32_to_cpu(lvb->lvb_igeneration));
128 	mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n",
129 	     (unsigned long long)be64_to_cpu(lvb->lvb_isize),
130 	     be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid),
131 	     be16_to_cpu(lvb->lvb_imode));
132 	mlog(level, "nlink %u, atime_packed 0x%llx, ctime_packed 0x%llx, "
133 	     "mtime_packed 0x%llx iattr 0x%x\n", be16_to_cpu(lvb->lvb_inlink),
134 	     (long long)be64_to_cpu(lvb->lvb_iatime_packed),
135 	     (long long)be64_to_cpu(lvb->lvb_ictime_packed),
136 	     (long long)be64_to_cpu(lvb->lvb_imtime_packed),
137 	     be32_to_cpu(lvb->lvb_iattr));
138 }
139 
140 
141 /*
142  * OCFS2 Lock Resource Operations
143  *
144  * These fine tune the behavior of the generic dlmglue locking infrastructure.
145  *
146  * The most basic of lock types can point ->l_priv to their respective
147  * struct ocfs2_super and allow the default actions to manage things.
148  *
149  * Right now, each lock type also needs to implement an init function,
150  * and trivial lock/unlock wrappers. ocfs2_simple_drop_lockres()
151  * should be called when the lock is no longer needed (i.e., object
152  * destruction time).
153  */
154 struct ocfs2_lock_res_ops {
155 	/*
156 	 * Translate an ocfs2_lock_res * into an ocfs2_super *. Define
157 	 * this callback if ->l_priv is not an ocfs2_super pointer
158 	 */
159 	struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *);
160 
161 	/*
162 	 * Optionally called in the downconvert thread after a
163 	 * successful downconvert. The lockres will not be referenced
164 	 * after this callback is called, so it is safe to free
165 	 * memory, etc.
166 	 *
167 	 * The exact semantics of when this is called are controlled
168 	 * by ->downconvert_worker()
169 	 */
170 	void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *);
171 
172 	/*
173 	 * Allow a lock type to add checks to determine whether it is
174 	 * safe to downconvert a lock. Return 0 to re-queue the
175 	 * downconvert at a later time, nonzero to continue.
176 	 *
177 	 * For most locks, the default checks that there are no
178 	 * incompatible holders are sufficient.
179 	 *
180 	 * Called with the lockres spinlock held.
181 	 */
182 	int (*check_downconvert)(struct ocfs2_lock_res *, int);
183 
184 	/*
185 	 * Allows a lock type to populate the lock value block. This
186 	 * is called on downconvert, and when we drop a lock.
187 	 *
188 	 * Locks that want to use this should set LOCK_TYPE_USES_LVB
189 	 * in the flags field.
190 	 *
191 	 * Called with the lockres spinlock held.
192 	 */
193 	void (*set_lvb)(struct ocfs2_lock_res *);
194 
195 	/*
196 	 * Called from the downconvert thread when it is determined
197 	 * that a lock will be downconverted. This is called without
198 	 * any locks held so the function can do work that might
199 	 * schedule (syncing out data, etc).
200 	 *
201 	 * This should return any one of the ocfs2_unblock_action
202 	 * values, depending on what it wants the thread to do.
203 	 */
204 	int (*downconvert_worker)(struct ocfs2_lock_res *, int);
205 
206 	/*
207 	 * LOCK_TYPE_* flags which describe the specific requirements
208 	 * of a lock type. Descriptions of each individual flag follow.
209 	 */
210 	int flags;
211 };
212 
213 /*
214  * Some locks want to "refresh" potentially stale data when a
215  * meaningful (PRMODE or EXMODE) lock level is first obtained. If this
216  * flag is set, the OCFS2_LOCK_NEEDS_REFRESH flag will be set on the
217  * individual lockres l_flags member from the ast function. It is
218  * expected that the locking wrapper will clear the
219  * OCFS2_LOCK_NEEDS_REFRESH flag when done.
220  */
221 #define LOCK_TYPE_REQUIRES_REFRESH 0x1
222 
223 /*
224  * Indicate that a lock type makes use of the lock value block. The
225  * ->set_lvb lock type callback must be defined.
226  */
227 #define LOCK_TYPE_USES_LVB		0x2
228 
229 static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = {
230 	.get_osb	= ocfs2_get_inode_osb,
231 	.flags		= 0,
232 };
233 
234 static struct ocfs2_lock_res_ops ocfs2_inode_inode_lops = {
235 	.get_osb	= ocfs2_get_inode_osb,
236 	.check_downconvert = ocfs2_check_meta_downconvert,
237 	.set_lvb	= ocfs2_set_meta_lvb,
238 	.downconvert_worker = ocfs2_data_convert_worker,
239 	.flags		= LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB,
240 };
241 
242 static struct ocfs2_lock_res_ops ocfs2_super_lops = {
243 	.flags		= LOCK_TYPE_REQUIRES_REFRESH,
244 };
245 
246 static struct ocfs2_lock_res_ops ocfs2_rename_lops = {
247 	.flags		= 0,
248 };
249 
250 static struct ocfs2_lock_res_ops ocfs2_nfs_sync_lops = {
251 	.flags		= 0,
252 };
253 
254 static struct ocfs2_lock_res_ops ocfs2_orphan_scan_lops = {
255 	.flags		= LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB,
256 };
257 
258 static struct ocfs2_lock_res_ops ocfs2_dentry_lops = {
259 	.get_osb	= ocfs2_get_dentry_osb,
260 	.post_unlock	= ocfs2_dentry_post_unlock,
261 	.downconvert_worker = ocfs2_dentry_convert_worker,
262 	.flags		= 0,
263 };
264 
265 static struct ocfs2_lock_res_ops ocfs2_inode_open_lops = {
266 	.get_osb	= ocfs2_get_inode_osb,
267 	.flags		= 0,
268 };
269 
270 static struct ocfs2_lock_res_ops ocfs2_flock_lops = {
271 	.get_osb	= ocfs2_get_file_osb,
272 	.flags		= 0,
273 };
274 
275 static struct ocfs2_lock_res_ops ocfs2_qinfo_lops = {
276 	.set_lvb	= ocfs2_set_qinfo_lvb,
277 	.get_osb	= ocfs2_get_qinfo_osb,
278 	.flags		= LOCK_TYPE_REQUIRES_REFRESH | LOCK_TYPE_USES_LVB,
279 };
280 
281 static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres)
282 {
283 	return lockres->l_type == OCFS2_LOCK_TYPE_META ||
284 		lockres->l_type == OCFS2_LOCK_TYPE_RW ||
285 		lockres->l_type == OCFS2_LOCK_TYPE_OPEN;
286 }
287 
288 static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres)
289 {
290 	BUG_ON(!ocfs2_is_inode_lock(lockres));
291 
292 	return (struct inode *) lockres->l_priv;
293 }
294 
295 static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres)
296 {
297 	BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY);
298 
299 	return (struct ocfs2_dentry_lock *)lockres->l_priv;
300 }
301 
302 static inline struct ocfs2_mem_dqinfo *ocfs2_lock_res_qinfo(struct ocfs2_lock_res *lockres)
303 {
304 	BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_QINFO);
305 
306 	return (struct ocfs2_mem_dqinfo *)lockres->l_priv;
307 }
308 
309 static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *lockres)
310 {
311 	if (lockres->l_ops->get_osb)
312 		return lockres->l_ops->get_osb(lockres);
313 
314 	return (struct ocfs2_super *)lockres->l_priv;
315 }
316 
317 static int ocfs2_lock_create(struct ocfs2_super *osb,
318 			     struct ocfs2_lock_res *lockres,
319 			     int level,
320 			     u32 dlm_flags);
321 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
322 						     int wanted);
323 static void __ocfs2_cluster_unlock(struct ocfs2_super *osb,
324 				   struct ocfs2_lock_res *lockres,
325 				   int level, unsigned long caller_ip);
326 static inline void ocfs2_cluster_unlock(struct ocfs2_super *osb,
327 					struct ocfs2_lock_res *lockres,
328 					int level)
329 {
330 	__ocfs2_cluster_unlock(osb, lockres, level, _RET_IP_);
331 }
332 
333 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres);
334 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres);
335 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres);
336 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, int level);
337 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
338 					struct ocfs2_lock_res *lockres);
339 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
340 						int convert);
341 #define ocfs2_log_dlm_error(_func, _err, _lockres) do {					\
342 	if ((_lockres)->l_type != OCFS2_LOCK_TYPE_DENTRY)				\
343 		mlog(ML_ERROR, "DLM error %d while calling %s on resource %s\n",	\
344 		     _err, _func, _lockres->l_name);					\
345 	else										\
346 		mlog(ML_ERROR, "DLM error %d while calling %s on resource %.*s%08x\n",	\
347 		     _err, _func, OCFS2_DENTRY_LOCK_INO_START - 1, (_lockres)->l_name,	\
348 		     (unsigned int)ocfs2_get_dentry_lock_ino(_lockres));		\
349 } while (0)
350 static int ocfs2_downconvert_thread(void *arg);
351 static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb,
352 					struct ocfs2_lock_res *lockres);
353 static int ocfs2_inode_lock_update(struct inode *inode,
354 				  struct buffer_head **bh);
355 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb);
356 static inline int ocfs2_highest_compat_lock_level(int level);
357 static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
358 					      int new_level);
359 static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
360 				  struct ocfs2_lock_res *lockres,
361 				  int new_level,
362 				  int lvb,
363 				  unsigned int generation);
364 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
365 				        struct ocfs2_lock_res *lockres);
366 static int ocfs2_cancel_convert(struct ocfs2_super *osb,
367 				struct ocfs2_lock_res *lockres);
368 
369 
370 static void ocfs2_build_lock_name(enum ocfs2_lock_type type,
371 				  u64 blkno,
372 				  u32 generation,
373 				  char *name)
374 {
375 	int len;
376 
377 	mlog_entry_void();
378 
379 	BUG_ON(type >= OCFS2_NUM_LOCK_TYPES);
380 
381 	len = snprintf(name, OCFS2_LOCK_ID_MAX_LEN, "%c%s%016llx%08x",
382 		       ocfs2_lock_type_char(type), OCFS2_LOCK_ID_PAD,
383 		       (long long)blkno, generation);
384 
385 	BUG_ON(len != (OCFS2_LOCK_ID_MAX_LEN - 1));
386 
387 	mlog(0, "built lock resource with name: %s\n", name);
388 
389 	mlog_exit_void();
390 }
391 
392 static DEFINE_SPINLOCK(ocfs2_dlm_tracking_lock);
393 
394 static void ocfs2_add_lockres_tracking(struct ocfs2_lock_res *res,
395 				       struct ocfs2_dlm_debug *dlm_debug)
396 {
397 	mlog(0, "Add tracking for lockres %s\n", res->l_name);
398 
399 	spin_lock(&ocfs2_dlm_tracking_lock);
400 	list_add(&res->l_debug_list, &dlm_debug->d_lockres_tracking);
401 	spin_unlock(&ocfs2_dlm_tracking_lock);
402 }
403 
404 static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res)
405 {
406 	spin_lock(&ocfs2_dlm_tracking_lock);
407 	if (!list_empty(&res->l_debug_list))
408 		list_del_init(&res->l_debug_list);
409 	spin_unlock(&ocfs2_dlm_tracking_lock);
410 }
411 
412 #ifdef CONFIG_OCFS2_FS_STATS
413 static void ocfs2_init_lock_stats(struct ocfs2_lock_res *res)
414 {
415 	res->l_lock_num_prmode = 0;
416 	res->l_lock_num_prmode_failed = 0;
417 	res->l_lock_total_prmode = 0;
418 	res->l_lock_max_prmode = 0;
419 	res->l_lock_num_exmode = 0;
420 	res->l_lock_num_exmode_failed = 0;
421 	res->l_lock_total_exmode = 0;
422 	res->l_lock_max_exmode = 0;
423 	res->l_lock_refresh = 0;
424 }
425 
426 static void ocfs2_update_lock_stats(struct ocfs2_lock_res *res, int level,
427 				    struct ocfs2_mask_waiter *mw, int ret)
428 {
429 	unsigned long long *num, *sum;
430 	unsigned int *max, *failed;
431 	struct timespec ts = current_kernel_time();
432 	unsigned long long time = timespec_to_ns(&ts) - mw->mw_lock_start;
433 
434 	if (level == LKM_PRMODE) {
435 		num = &res->l_lock_num_prmode;
436 		sum = &res->l_lock_total_prmode;
437 		max = &res->l_lock_max_prmode;
438 		failed = &res->l_lock_num_prmode_failed;
439 	} else if (level == LKM_EXMODE) {
440 		num = &res->l_lock_num_exmode;
441 		sum = &res->l_lock_total_exmode;
442 		max = &res->l_lock_max_exmode;
443 		failed = &res->l_lock_num_exmode_failed;
444 	} else
445 		return;
446 
447 	(*num)++;
448 	(*sum) += time;
449 	if (time > *max)
450 		*max = time;
451 	if (ret)
452 		(*failed)++;
453 }
454 
455 static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres)
456 {
457 	lockres->l_lock_refresh++;
458 }
459 
460 static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw)
461 {
462 	struct timespec ts = current_kernel_time();
463 	mw->mw_lock_start = timespec_to_ns(&ts);
464 }
465 #else
466 static inline void ocfs2_init_lock_stats(struct ocfs2_lock_res *res)
467 {
468 }
469 static inline void ocfs2_update_lock_stats(struct ocfs2_lock_res *res,
470 			   int level, struct ocfs2_mask_waiter *mw, int ret)
471 {
472 }
473 static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres)
474 {
475 }
476 static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw)
477 {
478 }
479 #endif
480 
481 static void ocfs2_lock_res_init_common(struct ocfs2_super *osb,
482 				       struct ocfs2_lock_res *res,
483 				       enum ocfs2_lock_type type,
484 				       struct ocfs2_lock_res_ops *ops,
485 				       void *priv)
486 {
487 	res->l_type          = type;
488 	res->l_ops           = ops;
489 	res->l_priv          = priv;
490 
491 	res->l_level         = DLM_LOCK_IV;
492 	res->l_requested     = DLM_LOCK_IV;
493 	res->l_blocking      = DLM_LOCK_IV;
494 	res->l_action        = OCFS2_AST_INVALID;
495 	res->l_unlock_action = OCFS2_UNLOCK_INVALID;
496 
497 	res->l_flags         = OCFS2_LOCK_INITIALIZED;
498 
499 	ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug);
500 
501 	ocfs2_init_lock_stats(res);
502 #ifdef CONFIG_DEBUG_LOCK_ALLOC
503 	if (type != OCFS2_LOCK_TYPE_OPEN)
504 		lockdep_init_map(&res->l_lockdep_map, ocfs2_lock_type_strings[type],
505 				 &lockdep_keys[type], 0);
506 	else
507 		res->l_lockdep_map.key = NULL;
508 #endif
509 }
510 
511 void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res)
512 {
513 	/* This also clears out the lock status block */
514 	memset(res, 0, sizeof(struct ocfs2_lock_res));
515 	spin_lock_init(&res->l_lock);
516 	init_waitqueue_head(&res->l_event);
517 	INIT_LIST_HEAD(&res->l_blocked_list);
518 	INIT_LIST_HEAD(&res->l_mask_waiters);
519 }
520 
521 void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res,
522 			       enum ocfs2_lock_type type,
523 			       unsigned int generation,
524 			       struct inode *inode)
525 {
526 	struct ocfs2_lock_res_ops *ops;
527 
528 	switch(type) {
529 		case OCFS2_LOCK_TYPE_RW:
530 			ops = &ocfs2_inode_rw_lops;
531 			break;
532 		case OCFS2_LOCK_TYPE_META:
533 			ops = &ocfs2_inode_inode_lops;
534 			break;
535 		case OCFS2_LOCK_TYPE_OPEN:
536 			ops = &ocfs2_inode_open_lops;
537 			break;
538 		default:
539 			mlog_bug_on_msg(1, "type: %d\n", type);
540 			ops = NULL; /* thanks, gcc */
541 			break;
542 	};
543 
544 	ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno,
545 			      generation, res->l_name);
546 	ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode);
547 }
548 
549 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres)
550 {
551 	struct inode *inode = ocfs2_lock_res_inode(lockres);
552 
553 	return OCFS2_SB(inode->i_sb);
554 }
555 
556 static struct ocfs2_super *ocfs2_get_qinfo_osb(struct ocfs2_lock_res *lockres)
557 {
558 	struct ocfs2_mem_dqinfo *info = lockres->l_priv;
559 
560 	return OCFS2_SB(info->dqi_gi.dqi_sb);
561 }
562 
563 static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres)
564 {
565 	struct ocfs2_file_private *fp = lockres->l_priv;
566 
567 	return OCFS2_SB(fp->fp_file->f_mapping->host->i_sb);
568 }
569 
570 static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres)
571 {
572 	__be64 inode_blkno_be;
573 
574 	memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START],
575 	       sizeof(__be64));
576 
577 	return be64_to_cpu(inode_blkno_be);
578 }
579 
580 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres)
581 {
582 	struct ocfs2_dentry_lock *dl = lockres->l_priv;
583 
584 	return OCFS2_SB(dl->dl_inode->i_sb);
585 }
586 
587 void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl,
588 				u64 parent, struct inode *inode)
589 {
590 	int len;
591 	u64 inode_blkno = OCFS2_I(inode)->ip_blkno;
592 	__be64 inode_blkno_be = cpu_to_be64(inode_blkno);
593 	struct ocfs2_lock_res *lockres = &dl->dl_lockres;
594 
595 	ocfs2_lock_res_init_once(lockres);
596 
597 	/*
598 	 * Unfortunately, the standard lock naming scheme won't work
599 	 * here because we have two 16 byte values to use. Instead,
600 	 * we'll stuff the inode number as a binary value. We still
601 	 * want error prints to show something without garbling the
602 	 * display, so drop a null byte in there before the inode
603 	 * number. A future version of OCFS2 will likely use all
604 	 * binary lock names. The stringified names have been a
605 	 * tremendous aid in debugging, but now that the debugfs
606 	 * interface exists, we can mangle things there if need be.
607 	 *
608 	 * NOTE: We also drop the standard "pad" value (the total lock
609 	 * name size stays the same though - the last part is all
610 	 * zeros due to the memset in ocfs2_lock_res_init_once()
611 	 */
612 	len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START,
613 		       "%c%016llx",
614 		       ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY),
615 		       (long long)parent);
616 
617 	BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1));
618 
619 	memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be,
620 	       sizeof(__be64));
621 
622 	ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
623 				   OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops,
624 				   dl);
625 }
626 
627 static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res,
628 				      struct ocfs2_super *osb)
629 {
630 	/* Superblock lockres doesn't come from a slab so we call init
631 	 * once on it manually.  */
632 	ocfs2_lock_res_init_once(res);
633 	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO,
634 			      0, res->l_name);
635 	ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER,
636 				   &ocfs2_super_lops, osb);
637 }
638 
639 static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res,
640 				       struct ocfs2_super *osb)
641 {
642 	/* Rename lockres doesn't come from a slab so we call init
643 	 * once on it manually.  */
644 	ocfs2_lock_res_init_once(res);
645 	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name);
646 	ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME,
647 				   &ocfs2_rename_lops, osb);
648 }
649 
650 static void ocfs2_nfs_sync_lock_res_init(struct ocfs2_lock_res *res,
651 					 struct ocfs2_super *osb)
652 {
653 	/* nfs_sync lockres doesn't come from a slab so we call init
654 	 * once on it manually.  */
655 	ocfs2_lock_res_init_once(res);
656 	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_NFS_SYNC, 0, 0, res->l_name);
657 	ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_NFS_SYNC,
658 				   &ocfs2_nfs_sync_lops, osb);
659 }
660 
661 static void ocfs2_orphan_scan_lock_res_init(struct ocfs2_lock_res *res,
662 					    struct ocfs2_super *osb)
663 {
664 	ocfs2_lock_res_init_once(res);
665 	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_ORPHAN_SCAN, 0, 0, res->l_name);
666 	ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_ORPHAN_SCAN,
667 				   &ocfs2_orphan_scan_lops, osb);
668 }
669 
670 void ocfs2_file_lock_res_init(struct ocfs2_lock_res *lockres,
671 			      struct ocfs2_file_private *fp)
672 {
673 	struct inode *inode = fp->fp_file->f_mapping->host;
674 	struct ocfs2_inode_info *oi = OCFS2_I(inode);
675 
676 	ocfs2_lock_res_init_once(lockres);
677 	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_FLOCK, oi->ip_blkno,
678 			      inode->i_generation, lockres->l_name);
679 	ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
680 				   OCFS2_LOCK_TYPE_FLOCK, &ocfs2_flock_lops,
681 				   fp);
682 	lockres->l_flags |= OCFS2_LOCK_NOCACHE;
683 }
684 
685 void ocfs2_qinfo_lock_res_init(struct ocfs2_lock_res *lockres,
686 			       struct ocfs2_mem_dqinfo *info)
687 {
688 	ocfs2_lock_res_init_once(lockres);
689 	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_QINFO, info->dqi_gi.dqi_type,
690 			      0, lockres->l_name);
691 	ocfs2_lock_res_init_common(OCFS2_SB(info->dqi_gi.dqi_sb), lockres,
692 				   OCFS2_LOCK_TYPE_QINFO, &ocfs2_qinfo_lops,
693 				   info);
694 }
695 
696 void ocfs2_lock_res_free(struct ocfs2_lock_res *res)
697 {
698 	mlog_entry_void();
699 
700 	if (!(res->l_flags & OCFS2_LOCK_INITIALIZED))
701 		return;
702 
703 	ocfs2_remove_lockres_tracking(res);
704 
705 	mlog_bug_on_msg(!list_empty(&res->l_blocked_list),
706 			"Lockres %s is on the blocked list\n",
707 			res->l_name);
708 	mlog_bug_on_msg(!list_empty(&res->l_mask_waiters),
709 			"Lockres %s has mask waiters pending\n",
710 			res->l_name);
711 	mlog_bug_on_msg(spin_is_locked(&res->l_lock),
712 			"Lockres %s is locked\n",
713 			res->l_name);
714 	mlog_bug_on_msg(res->l_ro_holders,
715 			"Lockres %s has %u ro holders\n",
716 			res->l_name, res->l_ro_holders);
717 	mlog_bug_on_msg(res->l_ex_holders,
718 			"Lockres %s has %u ex holders\n",
719 			res->l_name, res->l_ex_holders);
720 
721 	/* Need to clear out the lock status block for the dlm */
722 	memset(&res->l_lksb, 0, sizeof(res->l_lksb));
723 
724 	res->l_flags = 0UL;
725 	mlog_exit_void();
726 }
727 
728 static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres,
729 				     int level)
730 {
731 	mlog_entry_void();
732 
733 	BUG_ON(!lockres);
734 
735 	switch(level) {
736 	case DLM_LOCK_EX:
737 		lockres->l_ex_holders++;
738 		break;
739 	case DLM_LOCK_PR:
740 		lockres->l_ro_holders++;
741 		break;
742 	default:
743 		BUG();
744 	}
745 
746 	mlog_exit_void();
747 }
748 
749 static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres,
750 				     int level)
751 {
752 	mlog_entry_void();
753 
754 	BUG_ON(!lockres);
755 
756 	switch(level) {
757 	case DLM_LOCK_EX:
758 		BUG_ON(!lockres->l_ex_holders);
759 		lockres->l_ex_holders--;
760 		break;
761 	case DLM_LOCK_PR:
762 		BUG_ON(!lockres->l_ro_holders);
763 		lockres->l_ro_holders--;
764 		break;
765 	default:
766 		BUG();
767 	}
768 	mlog_exit_void();
769 }
770 
771 /* WARNING: This function lives in a world where the only three lock
772  * levels are EX, PR, and NL. It *will* have to be adjusted when more
773  * lock types are added. */
774 static inline int ocfs2_highest_compat_lock_level(int level)
775 {
776 	int new_level = DLM_LOCK_EX;
777 
778 	if (level == DLM_LOCK_EX)
779 		new_level = DLM_LOCK_NL;
780 	else if (level == DLM_LOCK_PR)
781 		new_level = DLM_LOCK_PR;
782 	return new_level;
783 }
784 
785 static void lockres_set_flags(struct ocfs2_lock_res *lockres,
786 			      unsigned long newflags)
787 {
788 	struct ocfs2_mask_waiter *mw, *tmp;
789 
790  	assert_spin_locked(&lockres->l_lock);
791 
792 	lockres->l_flags = newflags;
793 
794 	list_for_each_entry_safe(mw, tmp, &lockres->l_mask_waiters, mw_item) {
795 		if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
796 			continue;
797 
798 		list_del_init(&mw->mw_item);
799 		mw->mw_status = 0;
800 		complete(&mw->mw_complete);
801 	}
802 }
803 static void lockres_or_flags(struct ocfs2_lock_res *lockres, unsigned long or)
804 {
805 	lockres_set_flags(lockres, lockres->l_flags | or);
806 }
807 static void lockres_clear_flags(struct ocfs2_lock_res *lockres,
808 				unsigned long clear)
809 {
810 	lockres_set_flags(lockres, lockres->l_flags & ~clear);
811 }
812 
813 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres)
814 {
815 	mlog_entry_void();
816 
817 	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
818 	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
819 	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
820 	BUG_ON(lockres->l_blocking <= DLM_LOCK_NL);
821 
822 	lockres->l_level = lockres->l_requested;
823 	if (lockres->l_level <=
824 	    ocfs2_highest_compat_lock_level(lockres->l_blocking)) {
825 		lockres->l_blocking = DLM_LOCK_NL;
826 		lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED);
827 	}
828 	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
829 
830 	mlog_exit_void();
831 }
832 
833 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres)
834 {
835 	mlog_entry_void();
836 
837 	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
838 	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
839 
840 	/* Convert from RO to EX doesn't really need anything as our
841 	 * information is already up to data. Convert from NL to
842 	 * *anything* however should mark ourselves as needing an
843 	 * update */
844 	if (lockres->l_level == DLM_LOCK_NL &&
845 	    lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
846 		lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
847 
848 	lockres->l_level = lockres->l_requested;
849 	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
850 
851 	mlog_exit_void();
852 }
853 
854 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres)
855 {
856 	mlog_entry_void();
857 
858 	BUG_ON((!(lockres->l_flags & OCFS2_LOCK_BUSY)));
859 	BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
860 
861 	if (lockres->l_requested > DLM_LOCK_NL &&
862 	    !(lockres->l_flags & OCFS2_LOCK_LOCAL) &&
863 	    lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
864 		lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
865 
866 	lockres->l_level = lockres->l_requested;
867 	lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED);
868 	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
869 
870 	mlog_exit_void();
871 }
872 
873 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres,
874 				     int level)
875 {
876 	int needs_downconvert = 0;
877 	mlog_entry_void();
878 
879 	assert_spin_locked(&lockres->l_lock);
880 
881 	lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
882 
883 	if (level > lockres->l_blocking) {
884 		/* only schedule a downconvert if we haven't already scheduled
885 		 * one that goes low enough to satisfy the level we're
886 		 * blocking.  this also catches the case where we get
887 		 * duplicate BASTs */
888 		if (ocfs2_highest_compat_lock_level(level) <
889 		    ocfs2_highest_compat_lock_level(lockres->l_blocking))
890 			needs_downconvert = 1;
891 
892 		lockres->l_blocking = level;
893 	}
894 
895 	mlog_exit(needs_downconvert);
896 	return needs_downconvert;
897 }
898 
899 /*
900  * OCFS2_LOCK_PENDING and l_pending_gen.
901  *
902  * Why does OCFS2_LOCK_PENDING exist?  To close a race between setting
903  * OCFS2_LOCK_BUSY and calling ocfs2_dlm_lock().  See ocfs2_unblock_lock()
904  * for more details on the race.
905  *
906  * OCFS2_LOCK_PENDING closes the race quite nicely.  However, it introduces
907  * a race on itself.  In o2dlm, we can get the ast before ocfs2_dlm_lock()
908  * returns.  The ast clears OCFS2_LOCK_BUSY, and must therefore clear
909  * OCFS2_LOCK_PENDING at the same time.  When ocfs2_dlm_lock() returns,
910  * the caller is going to try to clear PENDING again.  If nothing else is
911  * happening, __lockres_clear_pending() sees PENDING is unset and does
912  * nothing.
913  *
914  * But what if another path (eg downconvert thread) has just started a
915  * new locking action?  The other path has re-set PENDING.  Our path
916  * cannot clear PENDING, because that will re-open the original race
917  * window.
918  *
919  * [Example]
920  *
921  * ocfs2_meta_lock()
922  *  ocfs2_cluster_lock()
923  *   set BUSY
924  *   set PENDING
925  *   drop l_lock
926  *   ocfs2_dlm_lock()
927  *    ocfs2_locking_ast()		ocfs2_downconvert_thread()
928  *     clear PENDING			 ocfs2_unblock_lock()
929  *					  take_l_lock
930  *					  !BUSY
931  *					  ocfs2_prepare_downconvert()
932  *					   set BUSY
933  *					   set PENDING
934  *					  drop l_lock
935  *   take l_lock
936  *   clear PENDING
937  *   drop l_lock
938  *			<window>
939  *					  ocfs2_dlm_lock()
940  *
941  * So as you can see, we now have a window where l_lock is not held,
942  * PENDING is not set, and ocfs2_dlm_lock() has not been called.
943  *
944  * The core problem is that ocfs2_cluster_lock() has cleared the PENDING
945  * set by ocfs2_prepare_downconvert().  That wasn't nice.
946  *
947  * To solve this we introduce l_pending_gen.  A call to
948  * lockres_clear_pending() will only do so when it is passed a generation
949  * number that matches the lockres.  lockres_set_pending() will return the
950  * current generation number.  When ocfs2_cluster_lock() goes to clear
951  * PENDING, it passes the generation it got from set_pending().  In our
952  * example above, the generation numbers will *not* match.  Thus,
953  * ocfs2_cluster_lock() will not clear the PENDING set by
954  * ocfs2_prepare_downconvert().
955  */
956 
957 /* Unlocked version for ocfs2_locking_ast() */
958 static void __lockres_clear_pending(struct ocfs2_lock_res *lockres,
959 				    unsigned int generation,
960 				    struct ocfs2_super *osb)
961 {
962 	assert_spin_locked(&lockres->l_lock);
963 
964 	/*
965 	 * The ast and locking functions can race us here.  The winner
966 	 * will clear pending, the loser will not.
967 	 */
968 	if (!(lockres->l_flags & OCFS2_LOCK_PENDING) ||
969 	    (lockres->l_pending_gen != generation))
970 		return;
971 
972 	lockres_clear_flags(lockres, OCFS2_LOCK_PENDING);
973 	lockres->l_pending_gen++;
974 
975 	/*
976 	 * The downconvert thread may have skipped us because we
977 	 * were PENDING.  Wake it up.
978 	 */
979 	if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
980 		ocfs2_wake_downconvert_thread(osb);
981 }
982 
983 /* Locked version for callers of ocfs2_dlm_lock() */
984 static void lockres_clear_pending(struct ocfs2_lock_res *lockres,
985 				  unsigned int generation,
986 				  struct ocfs2_super *osb)
987 {
988 	unsigned long flags;
989 
990 	spin_lock_irqsave(&lockres->l_lock, flags);
991 	__lockres_clear_pending(lockres, generation, osb);
992 	spin_unlock_irqrestore(&lockres->l_lock, flags);
993 }
994 
995 static unsigned int lockres_set_pending(struct ocfs2_lock_res *lockres)
996 {
997 	assert_spin_locked(&lockres->l_lock);
998 	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
999 
1000 	lockres_or_flags(lockres, OCFS2_LOCK_PENDING);
1001 
1002 	return lockres->l_pending_gen;
1003 }
1004 
1005 
1006 static void ocfs2_blocking_ast(void *opaque, int level)
1007 {
1008 	struct ocfs2_lock_res *lockres = opaque;
1009 	struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
1010 	int needs_downconvert;
1011 	unsigned long flags;
1012 
1013 	BUG_ON(level <= DLM_LOCK_NL);
1014 
1015 	mlog(0, "BAST fired for lockres %s, blocking %d, level %d type %s\n",
1016 	     lockres->l_name, level, lockres->l_level,
1017 	     ocfs2_lock_type_string(lockres->l_type));
1018 
1019 	/*
1020 	 * We can skip the bast for locks which don't enable caching -
1021 	 * they'll be dropped at the earliest possible time anyway.
1022 	 */
1023 	if (lockres->l_flags & OCFS2_LOCK_NOCACHE)
1024 		return;
1025 
1026 	spin_lock_irqsave(&lockres->l_lock, flags);
1027 	needs_downconvert = ocfs2_generic_handle_bast(lockres, level);
1028 	if (needs_downconvert)
1029 		ocfs2_schedule_blocked_lock(osb, lockres);
1030 	spin_unlock_irqrestore(&lockres->l_lock, flags);
1031 
1032 	wake_up(&lockres->l_event);
1033 
1034 	ocfs2_wake_downconvert_thread(osb);
1035 }
1036 
1037 static void ocfs2_locking_ast(void *opaque)
1038 {
1039 	struct ocfs2_lock_res *lockres = opaque;
1040 	struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
1041 	unsigned long flags;
1042 	int status;
1043 
1044 	spin_lock_irqsave(&lockres->l_lock, flags);
1045 
1046 	status = ocfs2_dlm_lock_status(&lockres->l_lksb);
1047 
1048 	if (status == -EAGAIN) {
1049 		lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
1050 		goto out;
1051 	}
1052 
1053 	if (status) {
1054 		mlog(ML_ERROR, "lockres %s: lksb status value of %d!\n",
1055 		     lockres->l_name, status);
1056 		spin_unlock_irqrestore(&lockres->l_lock, flags);
1057 		return;
1058 	}
1059 
1060 	switch(lockres->l_action) {
1061 	case OCFS2_AST_ATTACH:
1062 		ocfs2_generic_handle_attach_action(lockres);
1063 		lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL);
1064 		break;
1065 	case OCFS2_AST_CONVERT:
1066 		ocfs2_generic_handle_convert_action(lockres);
1067 		break;
1068 	case OCFS2_AST_DOWNCONVERT:
1069 		ocfs2_generic_handle_downconvert_action(lockres);
1070 		break;
1071 	default:
1072 		mlog(ML_ERROR, "lockres %s: ast fired with invalid action: %u "
1073 		     "lockres flags = 0x%lx, unlock action: %u\n",
1074 		     lockres->l_name, lockres->l_action, lockres->l_flags,
1075 		     lockres->l_unlock_action);
1076 		BUG();
1077 	}
1078 out:
1079 	/* set it to something invalid so if we get called again we
1080 	 * can catch it. */
1081 	lockres->l_action = OCFS2_AST_INVALID;
1082 
1083 	/* Did we try to cancel this lock?  Clear that state */
1084 	if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT)
1085 		lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
1086 
1087 	/*
1088 	 * We may have beaten the locking functions here.  We certainly
1089 	 * know that dlm_lock() has been called :-)
1090 	 * Because we can't have two lock calls in flight at once, we
1091 	 * can use lockres->l_pending_gen.
1092 	 */
1093 	__lockres_clear_pending(lockres, lockres->l_pending_gen,  osb);
1094 
1095 	wake_up(&lockres->l_event);
1096 	spin_unlock_irqrestore(&lockres->l_lock, flags);
1097 }
1098 
1099 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
1100 						int convert)
1101 {
1102 	unsigned long flags;
1103 
1104 	mlog_entry_void();
1105 	spin_lock_irqsave(&lockres->l_lock, flags);
1106 	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
1107 	if (convert)
1108 		lockres->l_action = OCFS2_AST_INVALID;
1109 	else
1110 		lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
1111 	spin_unlock_irqrestore(&lockres->l_lock, flags);
1112 
1113 	wake_up(&lockres->l_event);
1114 	mlog_exit_void();
1115 }
1116 
1117 /* Note: If we detect another process working on the lock (i.e.,
1118  * OCFS2_LOCK_BUSY), we'll bail out returning 0. It's up to the caller
1119  * to do the right thing in that case.
1120  */
1121 static int ocfs2_lock_create(struct ocfs2_super *osb,
1122 			     struct ocfs2_lock_res *lockres,
1123 			     int level,
1124 			     u32 dlm_flags)
1125 {
1126 	int ret = 0;
1127 	unsigned long flags;
1128 	unsigned int gen;
1129 
1130 	mlog_entry_void();
1131 
1132 	mlog(0, "lock %s, level = %d, flags = %u\n", lockres->l_name, level,
1133 	     dlm_flags);
1134 
1135 	spin_lock_irqsave(&lockres->l_lock, flags);
1136 	if ((lockres->l_flags & OCFS2_LOCK_ATTACHED) ||
1137 	    (lockres->l_flags & OCFS2_LOCK_BUSY)) {
1138 		spin_unlock_irqrestore(&lockres->l_lock, flags);
1139 		goto bail;
1140 	}
1141 
1142 	lockres->l_action = OCFS2_AST_ATTACH;
1143 	lockres->l_requested = level;
1144 	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
1145 	gen = lockres_set_pending(lockres);
1146 	spin_unlock_irqrestore(&lockres->l_lock, flags);
1147 
1148 	ret = ocfs2_dlm_lock(osb->cconn,
1149 			     level,
1150 			     &lockres->l_lksb,
1151 			     dlm_flags,
1152 			     lockres->l_name,
1153 			     OCFS2_LOCK_ID_MAX_LEN - 1,
1154 			     lockres);
1155 	lockres_clear_pending(lockres, gen, osb);
1156 	if (ret) {
1157 		ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
1158 		ocfs2_recover_from_dlm_error(lockres, 1);
1159 	}
1160 
1161 	mlog(0, "lock %s, return from ocfs2_dlm_lock\n", lockres->l_name);
1162 
1163 bail:
1164 	mlog_exit(ret);
1165 	return ret;
1166 }
1167 
1168 static inline int ocfs2_check_wait_flag(struct ocfs2_lock_res *lockres,
1169 					int flag)
1170 {
1171 	unsigned long flags;
1172 	int ret;
1173 
1174 	spin_lock_irqsave(&lockres->l_lock, flags);
1175 	ret = lockres->l_flags & flag;
1176 	spin_unlock_irqrestore(&lockres->l_lock, flags);
1177 
1178 	return ret;
1179 }
1180 
1181 static inline void ocfs2_wait_on_busy_lock(struct ocfs2_lock_res *lockres)
1182 
1183 {
1184 	wait_event(lockres->l_event,
1185 		   !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_BUSY));
1186 }
1187 
1188 static inline void ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res *lockres)
1189 
1190 {
1191 	wait_event(lockres->l_event,
1192 		   !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_REFRESHING));
1193 }
1194 
1195 /* predict what lock level we'll be dropping down to on behalf
1196  * of another node, and return true if the currently wanted
1197  * level will be compatible with it. */
1198 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
1199 						     int wanted)
1200 {
1201 	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
1202 
1203 	return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking);
1204 }
1205 
1206 static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw)
1207 {
1208 	INIT_LIST_HEAD(&mw->mw_item);
1209 	init_completion(&mw->mw_complete);
1210 	ocfs2_init_start_time(mw);
1211 }
1212 
1213 static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw)
1214 {
1215 	wait_for_completion(&mw->mw_complete);
1216 	/* Re-arm the completion in case we want to wait on it again */
1217 	INIT_COMPLETION(mw->mw_complete);
1218 	return mw->mw_status;
1219 }
1220 
1221 static void lockres_add_mask_waiter(struct ocfs2_lock_res *lockres,
1222 				    struct ocfs2_mask_waiter *mw,
1223 				    unsigned long mask,
1224 				    unsigned long goal)
1225 {
1226 	BUG_ON(!list_empty(&mw->mw_item));
1227 
1228 	assert_spin_locked(&lockres->l_lock);
1229 
1230 	list_add_tail(&mw->mw_item, &lockres->l_mask_waiters);
1231 	mw->mw_mask = mask;
1232 	mw->mw_goal = goal;
1233 }
1234 
1235 /* returns 0 if the mw that was removed was already satisfied, -EBUSY
1236  * if the mask still hadn't reached its goal */
1237 static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres,
1238 				      struct ocfs2_mask_waiter *mw)
1239 {
1240 	unsigned long flags;
1241 	int ret = 0;
1242 
1243 	spin_lock_irqsave(&lockres->l_lock, flags);
1244 	if (!list_empty(&mw->mw_item)) {
1245 		if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
1246 			ret = -EBUSY;
1247 
1248 		list_del_init(&mw->mw_item);
1249 		init_completion(&mw->mw_complete);
1250 	}
1251 	spin_unlock_irqrestore(&lockres->l_lock, flags);
1252 
1253 	return ret;
1254 
1255 }
1256 
1257 static int ocfs2_wait_for_mask_interruptible(struct ocfs2_mask_waiter *mw,
1258 					     struct ocfs2_lock_res *lockres)
1259 {
1260 	int ret;
1261 
1262 	ret = wait_for_completion_interruptible(&mw->mw_complete);
1263 	if (ret)
1264 		lockres_remove_mask_waiter(lockres, mw);
1265 	else
1266 		ret = mw->mw_status;
1267 	/* Re-arm the completion in case we want to wait on it again */
1268 	INIT_COMPLETION(mw->mw_complete);
1269 	return ret;
1270 }
1271 
1272 static int __ocfs2_cluster_lock(struct ocfs2_super *osb,
1273 				struct ocfs2_lock_res *lockres,
1274 				int level,
1275 				u32 lkm_flags,
1276 				int arg_flags,
1277 				int l_subclass,
1278 				unsigned long caller_ip)
1279 {
1280 	struct ocfs2_mask_waiter mw;
1281 	int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR);
1282 	int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */
1283 	unsigned long flags;
1284 	unsigned int gen;
1285 	int noqueue_attempted = 0;
1286 
1287 	mlog_entry_void();
1288 
1289 	ocfs2_init_mask_waiter(&mw);
1290 
1291 	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
1292 		lkm_flags |= DLM_LKF_VALBLK;
1293 
1294 again:
1295 	wait = 0;
1296 
1297 	if (catch_signals && signal_pending(current)) {
1298 		ret = -ERESTARTSYS;
1299 		goto out;
1300 	}
1301 
1302 	spin_lock_irqsave(&lockres->l_lock, flags);
1303 
1304 	mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING,
1305 			"Cluster lock called on freeing lockres %s! flags "
1306 			"0x%lx\n", lockres->l_name, lockres->l_flags);
1307 
1308 	/* We only compare against the currently granted level
1309 	 * here. If the lock is blocked waiting on a downconvert,
1310 	 * we'll get caught below. */
1311 	if (lockres->l_flags & OCFS2_LOCK_BUSY &&
1312 	    level > lockres->l_level) {
1313 		/* is someone sitting in dlm_lock? If so, wait on
1314 		 * them. */
1315 		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1316 		wait = 1;
1317 		goto unlock;
1318 	}
1319 
1320 	if (lockres->l_flags & OCFS2_LOCK_BLOCKED &&
1321 	    !ocfs2_may_continue_on_blocked_lock(lockres, level)) {
1322 		/* is the lock is currently blocked on behalf of
1323 		 * another node */
1324 		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BLOCKED, 0);
1325 		wait = 1;
1326 		goto unlock;
1327 	}
1328 
1329 	if (level > lockres->l_level) {
1330 		if (noqueue_attempted > 0) {
1331 			ret = -EAGAIN;
1332 			goto unlock;
1333 		}
1334 		if (lkm_flags & DLM_LKF_NOQUEUE)
1335 			noqueue_attempted = 1;
1336 
1337 		if (lockres->l_action != OCFS2_AST_INVALID)
1338 			mlog(ML_ERROR, "lockres %s has action %u pending\n",
1339 			     lockres->l_name, lockres->l_action);
1340 
1341 		if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
1342 			lockres->l_action = OCFS2_AST_ATTACH;
1343 			lkm_flags &= ~DLM_LKF_CONVERT;
1344 		} else {
1345 			lockres->l_action = OCFS2_AST_CONVERT;
1346 			lkm_flags |= DLM_LKF_CONVERT;
1347 		}
1348 
1349 		lockres->l_requested = level;
1350 		lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
1351 		gen = lockres_set_pending(lockres);
1352 		spin_unlock_irqrestore(&lockres->l_lock, flags);
1353 
1354 		BUG_ON(level == DLM_LOCK_IV);
1355 		BUG_ON(level == DLM_LOCK_NL);
1356 
1357 		mlog(0, "lock %s, convert from %d to level = %d\n",
1358 		     lockres->l_name, lockres->l_level, level);
1359 
1360 		/* call dlm_lock to upgrade lock now */
1361 		ret = ocfs2_dlm_lock(osb->cconn,
1362 				     level,
1363 				     &lockres->l_lksb,
1364 				     lkm_flags,
1365 				     lockres->l_name,
1366 				     OCFS2_LOCK_ID_MAX_LEN - 1,
1367 				     lockres);
1368 		lockres_clear_pending(lockres, gen, osb);
1369 		if (ret) {
1370 			if (!(lkm_flags & DLM_LKF_NOQUEUE) ||
1371 			    (ret != -EAGAIN)) {
1372 				ocfs2_log_dlm_error("ocfs2_dlm_lock",
1373 						    ret, lockres);
1374 			}
1375 			ocfs2_recover_from_dlm_error(lockres, 1);
1376 			goto out;
1377 		}
1378 
1379 		mlog(0, "lock %s, successful return from ocfs2_dlm_lock\n",
1380 		     lockres->l_name);
1381 
1382 		/* At this point we've gone inside the dlm and need to
1383 		 * complete our work regardless. */
1384 		catch_signals = 0;
1385 
1386 		/* wait for busy to clear and carry on */
1387 		goto again;
1388 	}
1389 
1390 	/* Ok, if we get here then we're good to go. */
1391 	ocfs2_inc_holders(lockres, level);
1392 
1393 	ret = 0;
1394 unlock:
1395 	spin_unlock_irqrestore(&lockres->l_lock, flags);
1396 out:
1397 	/*
1398 	 * This is helping work around a lock inversion between the page lock
1399 	 * and dlm locks.  One path holds the page lock while calling aops
1400 	 * which block acquiring dlm locks.  The voting thread holds dlm
1401 	 * locks while acquiring page locks while down converting data locks.
1402 	 * This block is helping an aop path notice the inversion and back
1403 	 * off to unlock its page lock before trying the dlm lock again.
1404 	 */
1405 	if (wait && arg_flags & OCFS2_LOCK_NONBLOCK &&
1406 	    mw.mw_mask & (OCFS2_LOCK_BUSY|OCFS2_LOCK_BLOCKED)) {
1407 		wait = 0;
1408 		if (lockres_remove_mask_waiter(lockres, &mw))
1409 			ret = -EAGAIN;
1410 		else
1411 			goto again;
1412 	}
1413 	if (wait) {
1414 		ret = ocfs2_wait_for_mask(&mw);
1415 		if (ret == 0)
1416 			goto again;
1417 		mlog_errno(ret);
1418 	}
1419 	ocfs2_update_lock_stats(lockres, level, &mw, ret);
1420 
1421 #ifdef CONFIG_DEBUG_LOCK_ALLOC
1422 	if (!ret && lockres->l_lockdep_map.key != NULL) {
1423 		if (level == DLM_LOCK_PR)
1424 			rwsem_acquire_read(&lockres->l_lockdep_map, l_subclass,
1425 				!!(arg_flags & OCFS2_META_LOCK_NOQUEUE),
1426 				caller_ip);
1427 		else
1428 			rwsem_acquire(&lockres->l_lockdep_map, l_subclass,
1429 				!!(arg_flags & OCFS2_META_LOCK_NOQUEUE),
1430 				caller_ip);
1431 	}
1432 #endif
1433 	mlog_exit(ret);
1434 	return ret;
1435 }
1436 
1437 static inline int ocfs2_cluster_lock(struct ocfs2_super *osb,
1438 				     struct ocfs2_lock_res *lockres,
1439 				     int level,
1440 				     u32 lkm_flags,
1441 				     int arg_flags)
1442 {
1443 	return __ocfs2_cluster_lock(osb, lockres, level, lkm_flags, arg_flags,
1444 				    0, _RET_IP_);
1445 }
1446 
1447 
1448 static void __ocfs2_cluster_unlock(struct ocfs2_super *osb,
1449 				   struct ocfs2_lock_res *lockres,
1450 				   int level,
1451 				   unsigned long caller_ip)
1452 {
1453 	unsigned long flags;
1454 
1455 	mlog_entry_void();
1456 	spin_lock_irqsave(&lockres->l_lock, flags);
1457 	ocfs2_dec_holders(lockres, level);
1458 	ocfs2_downconvert_on_unlock(osb, lockres);
1459 	spin_unlock_irqrestore(&lockres->l_lock, flags);
1460 #ifdef CONFIG_DEBUG_LOCK_ALLOC
1461 	if (lockres->l_lockdep_map.key != NULL)
1462 		rwsem_release(&lockres->l_lockdep_map, 1, caller_ip);
1463 #endif
1464 	mlog_exit_void();
1465 }
1466 
1467 static int ocfs2_create_new_lock(struct ocfs2_super *osb,
1468 				 struct ocfs2_lock_res *lockres,
1469 				 int ex,
1470 				 int local)
1471 {
1472 	int level =  ex ? DLM_LOCK_EX : DLM_LOCK_PR;
1473 	unsigned long flags;
1474 	u32 lkm_flags = local ? DLM_LKF_LOCAL : 0;
1475 
1476 	spin_lock_irqsave(&lockres->l_lock, flags);
1477 	BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
1478 	lockres_or_flags(lockres, OCFS2_LOCK_LOCAL);
1479 	spin_unlock_irqrestore(&lockres->l_lock, flags);
1480 
1481 	return ocfs2_lock_create(osb, lockres, level, lkm_flags);
1482 }
1483 
1484 /* Grants us an EX lock on the data and metadata resources, skipping
1485  * the normal cluster directory lookup. Use this ONLY on newly created
1486  * inodes which other nodes can't possibly see, and which haven't been
1487  * hashed in the inode hash yet. This can give us a good performance
1488  * increase as it'll skip the network broadcast normally associated
1489  * with creating a new lock resource. */
1490 int ocfs2_create_new_inode_locks(struct inode *inode)
1491 {
1492 	int ret;
1493 	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1494 
1495 	BUG_ON(!inode);
1496 	BUG_ON(!ocfs2_inode_is_new(inode));
1497 
1498 	mlog_entry_void();
1499 
1500 	mlog(0, "Inode %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno);
1501 
1502 	/* NOTE: That we don't increment any of the holder counts, nor
1503 	 * do we add anything to a journal handle. Since this is
1504 	 * supposed to be a new inode which the cluster doesn't know
1505 	 * about yet, there is no need to.  As far as the LVB handling
1506 	 * is concerned, this is basically like acquiring an EX lock
1507 	 * on a resource which has an invalid one -- we'll set it
1508 	 * valid when we release the EX. */
1509 
1510 	ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1, 1);
1511 	if (ret) {
1512 		mlog_errno(ret);
1513 		goto bail;
1514 	}
1515 
1516 	/*
1517 	 * We don't want to use DLM_LKF_LOCAL on a meta data lock as they
1518 	 * don't use a generation in their lock names.
1519 	 */
1520 	ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_inode_lockres, 1, 0);
1521 	if (ret) {
1522 		mlog_errno(ret);
1523 		goto bail;
1524 	}
1525 
1526 	ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_open_lockres, 0, 0);
1527 	if (ret) {
1528 		mlog_errno(ret);
1529 		goto bail;
1530 	}
1531 
1532 bail:
1533 	mlog_exit(ret);
1534 	return ret;
1535 }
1536 
1537 int ocfs2_rw_lock(struct inode *inode, int write)
1538 {
1539 	int status, level;
1540 	struct ocfs2_lock_res *lockres;
1541 	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1542 
1543 	BUG_ON(!inode);
1544 
1545 	mlog_entry_void();
1546 
1547 	mlog(0, "inode %llu take %s RW lock\n",
1548 	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1549 	     write ? "EXMODE" : "PRMODE");
1550 
1551 	if (ocfs2_mount_local(osb))
1552 		return 0;
1553 
1554 	lockres = &OCFS2_I(inode)->ip_rw_lockres;
1555 
1556 	level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
1557 
1558 	status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0,
1559 				    0);
1560 	if (status < 0)
1561 		mlog_errno(status);
1562 
1563 	mlog_exit(status);
1564 	return status;
1565 }
1566 
1567 void ocfs2_rw_unlock(struct inode *inode, int write)
1568 {
1569 	int level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
1570 	struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres;
1571 	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1572 
1573 	mlog_entry_void();
1574 
1575 	mlog(0, "inode %llu drop %s RW lock\n",
1576 	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1577 	     write ? "EXMODE" : "PRMODE");
1578 
1579 	if (!ocfs2_mount_local(osb))
1580 		ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
1581 
1582 	mlog_exit_void();
1583 }
1584 
1585 /*
1586  * ocfs2_open_lock always get PR mode lock.
1587  */
1588 int ocfs2_open_lock(struct inode *inode)
1589 {
1590 	int status = 0;
1591 	struct ocfs2_lock_res *lockres;
1592 	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1593 
1594 	BUG_ON(!inode);
1595 
1596 	mlog_entry_void();
1597 
1598 	mlog(0, "inode %llu take PRMODE open lock\n",
1599 	     (unsigned long long)OCFS2_I(inode)->ip_blkno);
1600 
1601 	if (ocfs2_mount_local(osb))
1602 		goto out;
1603 
1604 	lockres = &OCFS2_I(inode)->ip_open_lockres;
1605 
1606 	status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres,
1607 				    DLM_LOCK_PR, 0, 0);
1608 	if (status < 0)
1609 		mlog_errno(status);
1610 
1611 out:
1612 	mlog_exit(status);
1613 	return status;
1614 }
1615 
1616 int ocfs2_try_open_lock(struct inode *inode, int write)
1617 {
1618 	int status = 0, level;
1619 	struct ocfs2_lock_res *lockres;
1620 	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1621 
1622 	BUG_ON(!inode);
1623 
1624 	mlog_entry_void();
1625 
1626 	mlog(0, "inode %llu try to take %s open lock\n",
1627 	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1628 	     write ? "EXMODE" : "PRMODE");
1629 
1630 	if (ocfs2_mount_local(osb))
1631 		goto out;
1632 
1633 	lockres = &OCFS2_I(inode)->ip_open_lockres;
1634 
1635 	level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
1636 
1637 	/*
1638 	 * The file system may already holding a PRMODE/EXMODE open lock.
1639 	 * Since we pass DLM_LKF_NOQUEUE, the request won't block waiting on
1640 	 * other nodes and the -EAGAIN will indicate to the caller that
1641 	 * this inode is still in use.
1642 	 */
1643 	status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres,
1644 				    level, DLM_LKF_NOQUEUE, 0);
1645 
1646 out:
1647 	mlog_exit(status);
1648 	return status;
1649 }
1650 
1651 /*
1652  * ocfs2_open_unlock unlock PR and EX mode open locks.
1653  */
1654 void ocfs2_open_unlock(struct inode *inode)
1655 {
1656 	struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_open_lockres;
1657 	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1658 
1659 	mlog_entry_void();
1660 
1661 	mlog(0, "inode %llu drop open lock\n",
1662 	     (unsigned long long)OCFS2_I(inode)->ip_blkno);
1663 
1664 	if (ocfs2_mount_local(osb))
1665 		goto out;
1666 
1667 	if(lockres->l_ro_holders)
1668 		ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres,
1669 				     DLM_LOCK_PR);
1670 	if(lockres->l_ex_holders)
1671 		ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres,
1672 				     DLM_LOCK_EX);
1673 
1674 out:
1675 	mlog_exit_void();
1676 }
1677 
1678 static int ocfs2_flock_handle_signal(struct ocfs2_lock_res *lockres,
1679 				     int level)
1680 {
1681 	int ret;
1682 	struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
1683 	unsigned long flags;
1684 	struct ocfs2_mask_waiter mw;
1685 
1686 	ocfs2_init_mask_waiter(&mw);
1687 
1688 retry_cancel:
1689 	spin_lock_irqsave(&lockres->l_lock, flags);
1690 	if (lockres->l_flags & OCFS2_LOCK_BUSY) {
1691 		ret = ocfs2_prepare_cancel_convert(osb, lockres);
1692 		if (ret) {
1693 			spin_unlock_irqrestore(&lockres->l_lock, flags);
1694 			ret = ocfs2_cancel_convert(osb, lockres);
1695 			if (ret < 0) {
1696 				mlog_errno(ret);
1697 				goto out;
1698 			}
1699 			goto retry_cancel;
1700 		}
1701 		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1702 		spin_unlock_irqrestore(&lockres->l_lock, flags);
1703 
1704 		ocfs2_wait_for_mask(&mw);
1705 		goto retry_cancel;
1706 	}
1707 
1708 	ret = -ERESTARTSYS;
1709 	/*
1710 	 * We may still have gotten the lock, in which case there's no
1711 	 * point to restarting the syscall.
1712 	 */
1713 	if (lockres->l_level == level)
1714 		ret = 0;
1715 
1716 	mlog(0, "Cancel returning %d. flags: 0x%lx, level: %d, act: %d\n", ret,
1717 	     lockres->l_flags, lockres->l_level, lockres->l_action);
1718 
1719 	spin_unlock_irqrestore(&lockres->l_lock, flags);
1720 
1721 out:
1722 	return ret;
1723 }
1724 
1725 /*
1726  * ocfs2_file_lock() and ocfs2_file_unlock() map to a single pair of
1727  * flock() calls. The locking approach this requires is sufficiently
1728  * different from all other cluster lock types that we implement a
1729  * seperate path to the "low-level" dlm calls. In particular:
1730  *
1731  * - No optimization of lock levels is done - we take at exactly
1732  *   what's been requested.
1733  *
1734  * - No lock caching is employed. We immediately downconvert to
1735  *   no-lock at unlock time. This also means flock locks never go on
1736  *   the blocking list).
1737  *
1738  * - Since userspace can trivially deadlock itself with flock, we make
1739  *   sure to allow cancellation of a misbehaving applications flock()
1740  *   request.
1741  *
1742  * - Access to any flock lockres doesn't require concurrency, so we
1743  *   can simplify the code by requiring the caller to guarantee
1744  *   serialization of dlmglue flock calls.
1745  */
1746 int ocfs2_file_lock(struct file *file, int ex, int trylock)
1747 {
1748 	int ret, level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
1749 	unsigned int lkm_flags = trylock ? DLM_LKF_NOQUEUE : 0;
1750 	unsigned long flags;
1751 	struct ocfs2_file_private *fp = file->private_data;
1752 	struct ocfs2_lock_res *lockres = &fp->fp_flock;
1753 	struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb);
1754 	struct ocfs2_mask_waiter mw;
1755 
1756 	ocfs2_init_mask_waiter(&mw);
1757 
1758 	if ((lockres->l_flags & OCFS2_LOCK_BUSY) ||
1759 	    (lockres->l_level > DLM_LOCK_NL)) {
1760 		mlog(ML_ERROR,
1761 		     "File lock \"%s\" has busy or locked state: flags: 0x%lx, "
1762 		     "level: %u\n", lockres->l_name, lockres->l_flags,
1763 		     lockres->l_level);
1764 		return -EINVAL;
1765 	}
1766 
1767 	spin_lock_irqsave(&lockres->l_lock, flags);
1768 	if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
1769 		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1770 		spin_unlock_irqrestore(&lockres->l_lock, flags);
1771 
1772 		/*
1773 		 * Get the lock at NLMODE to start - that way we
1774 		 * can cancel the upconvert request if need be.
1775 		 */
1776 		ret = ocfs2_lock_create(osb, lockres, DLM_LOCK_NL, 0);
1777 		if (ret < 0) {
1778 			mlog_errno(ret);
1779 			goto out;
1780 		}
1781 
1782 		ret = ocfs2_wait_for_mask(&mw);
1783 		if (ret) {
1784 			mlog_errno(ret);
1785 			goto out;
1786 		}
1787 		spin_lock_irqsave(&lockres->l_lock, flags);
1788 	}
1789 
1790 	lockres->l_action = OCFS2_AST_CONVERT;
1791 	lkm_flags |= DLM_LKF_CONVERT;
1792 	lockres->l_requested = level;
1793 	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
1794 
1795 	lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1796 	spin_unlock_irqrestore(&lockres->l_lock, flags);
1797 
1798 	ret = ocfs2_dlm_lock(osb->cconn, level, &lockres->l_lksb, lkm_flags,
1799 			     lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1,
1800 			     lockres);
1801 	if (ret) {
1802 		if (!trylock || (ret != -EAGAIN)) {
1803 			ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
1804 			ret = -EINVAL;
1805 		}
1806 
1807 		ocfs2_recover_from_dlm_error(lockres, 1);
1808 		lockres_remove_mask_waiter(lockres, &mw);
1809 		goto out;
1810 	}
1811 
1812 	ret = ocfs2_wait_for_mask_interruptible(&mw, lockres);
1813 	if (ret == -ERESTARTSYS) {
1814 		/*
1815 		 * Userspace can cause deadlock itself with
1816 		 * flock(). Current behavior locally is to allow the
1817 		 * deadlock, but abort the system call if a signal is
1818 		 * received. We follow this example, otherwise a
1819 		 * poorly written program could sit in kernel until
1820 		 * reboot.
1821 		 *
1822 		 * Handling this is a bit more complicated for Ocfs2
1823 		 * though. We can't exit this function with an
1824 		 * outstanding lock request, so a cancel convert is
1825 		 * required. We intentionally overwrite 'ret' - if the
1826 		 * cancel fails and the lock was granted, it's easier
1827 		 * to just bubble sucess back up to the user.
1828 		 */
1829 		ret = ocfs2_flock_handle_signal(lockres, level);
1830 	} else if (!ret && (level > lockres->l_level)) {
1831 		/* Trylock failed asynchronously */
1832 		BUG_ON(!trylock);
1833 		ret = -EAGAIN;
1834 	}
1835 
1836 out:
1837 
1838 	mlog(0, "Lock: \"%s\" ex: %d, trylock: %d, returns: %d\n",
1839 	     lockres->l_name, ex, trylock, ret);
1840 	return ret;
1841 }
1842 
1843 void ocfs2_file_unlock(struct file *file)
1844 {
1845 	int ret;
1846 	unsigned int gen;
1847 	unsigned long flags;
1848 	struct ocfs2_file_private *fp = file->private_data;
1849 	struct ocfs2_lock_res *lockres = &fp->fp_flock;
1850 	struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb);
1851 	struct ocfs2_mask_waiter mw;
1852 
1853 	ocfs2_init_mask_waiter(&mw);
1854 
1855 	if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED))
1856 		return;
1857 
1858 	if (lockres->l_level == DLM_LOCK_NL)
1859 		return;
1860 
1861 	mlog(0, "Unlock: \"%s\" flags: 0x%lx, level: %d, act: %d\n",
1862 	     lockres->l_name, lockres->l_flags, lockres->l_level,
1863 	     lockres->l_action);
1864 
1865 	spin_lock_irqsave(&lockres->l_lock, flags);
1866 	/*
1867 	 * Fake a blocking ast for the downconvert code.
1868 	 */
1869 	lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
1870 	lockres->l_blocking = DLM_LOCK_EX;
1871 
1872 	gen = ocfs2_prepare_downconvert(lockres, DLM_LOCK_NL);
1873 	lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1874 	spin_unlock_irqrestore(&lockres->l_lock, flags);
1875 
1876 	ret = ocfs2_downconvert_lock(osb, lockres, DLM_LOCK_NL, 0, gen);
1877 	if (ret) {
1878 		mlog_errno(ret);
1879 		return;
1880 	}
1881 
1882 	ret = ocfs2_wait_for_mask(&mw);
1883 	if (ret)
1884 		mlog_errno(ret);
1885 }
1886 
1887 static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb,
1888 					struct ocfs2_lock_res *lockres)
1889 {
1890 	int kick = 0;
1891 
1892 	mlog_entry_void();
1893 
1894 	/* If we know that another node is waiting on our lock, kick
1895 	 * the downconvert thread * pre-emptively when we reach a release
1896 	 * condition. */
1897 	if (lockres->l_flags & OCFS2_LOCK_BLOCKED) {
1898 		switch(lockres->l_blocking) {
1899 		case DLM_LOCK_EX:
1900 			if (!lockres->l_ex_holders && !lockres->l_ro_holders)
1901 				kick = 1;
1902 			break;
1903 		case DLM_LOCK_PR:
1904 			if (!lockres->l_ex_holders)
1905 				kick = 1;
1906 			break;
1907 		default:
1908 			BUG();
1909 		}
1910 	}
1911 
1912 	if (kick)
1913 		ocfs2_wake_downconvert_thread(osb);
1914 
1915 	mlog_exit_void();
1916 }
1917 
1918 #define OCFS2_SEC_BITS   34
1919 #define OCFS2_SEC_SHIFT  (64 - 34)
1920 #define OCFS2_NSEC_MASK  ((1ULL << OCFS2_SEC_SHIFT) - 1)
1921 
1922 /* LVB only has room for 64 bits of time here so we pack it for
1923  * now. */
1924 static u64 ocfs2_pack_timespec(struct timespec *spec)
1925 {
1926 	u64 res;
1927 	u64 sec = spec->tv_sec;
1928 	u32 nsec = spec->tv_nsec;
1929 
1930 	res = (sec << OCFS2_SEC_SHIFT) | (nsec & OCFS2_NSEC_MASK);
1931 
1932 	return res;
1933 }
1934 
1935 /* Call this with the lockres locked. I am reasonably sure we don't
1936  * need ip_lock in this function as anyone who would be changing those
1937  * values is supposed to be blocked in ocfs2_inode_lock right now. */
1938 static void __ocfs2_stuff_meta_lvb(struct inode *inode)
1939 {
1940 	struct ocfs2_inode_info *oi = OCFS2_I(inode);
1941 	struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
1942 	struct ocfs2_meta_lvb *lvb;
1943 
1944 	mlog_entry_void();
1945 
1946 	lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
1947 
1948 	/*
1949 	 * Invalidate the LVB of a deleted inode - this way other
1950 	 * nodes are forced to go to disk and discover the new inode
1951 	 * status.
1952 	 */
1953 	if (oi->ip_flags & OCFS2_INODE_DELETED) {
1954 		lvb->lvb_version = 0;
1955 		goto out;
1956 	}
1957 
1958 	lvb->lvb_version   = OCFS2_LVB_VERSION;
1959 	lvb->lvb_isize	   = cpu_to_be64(i_size_read(inode));
1960 	lvb->lvb_iclusters = cpu_to_be32(oi->ip_clusters);
1961 	lvb->lvb_iuid      = cpu_to_be32(inode->i_uid);
1962 	lvb->lvb_igid      = cpu_to_be32(inode->i_gid);
1963 	lvb->lvb_imode     = cpu_to_be16(inode->i_mode);
1964 	lvb->lvb_inlink    = cpu_to_be16(inode->i_nlink);
1965 	lvb->lvb_iatime_packed  =
1966 		cpu_to_be64(ocfs2_pack_timespec(&inode->i_atime));
1967 	lvb->lvb_ictime_packed =
1968 		cpu_to_be64(ocfs2_pack_timespec(&inode->i_ctime));
1969 	lvb->lvb_imtime_packed =
1970 		cpu_to_be64(ocfs2_pack_timespec(&inode->i_mtime));
1971 	lvb->lvb_iattr    = cpu_to_be32(oi->ip_attr);
1972 	lvb->lvb_idynfeatures = cpu_to_be16(oi->ip_dyn_features);
1973 	lvb->lvb_igeneration = cpu_to_be32(inode->i_generation);
1974 
1975 out:
1976 	mlog_meta_lvb(0, lockres);
1977 
1978 	mlog_exit_void();
1979 }
1980 
1981 static void ocfs2_unpack_timespec(struct timespec *spec,
1982 				  u64 packed_time)
1983 {
1984 	spec->tv_sec = packed_time >> OCFS2_SEC_SHIFT;
1985 	spec->tv_nsec = packed_time & OCFS2_NSEC_MASK;
1986 }
1987 
1988 static void ocfs2_refresh_inode_from_lvb(struct inode *inode)
1989 {
1990 	struct ocfs2_inode_info *oi = OCFS2_I(inode);
1991 	struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
1992 	struct ocfs2_meta_lvb *lvb;
1993 
1994 	mlog_entry_void();
1995 
1996 	mlog_meta_lvb(0, lockres);
1997 
1998 	lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
1999 
2000 	/* We're safe here without the lockres lock... */
2001 	spin_lock(&oi->ip_lock);
2002 	oi->ip_clusters = be32_to_cpu(lvb->lvb_iclusters);
2003 	i_size_write(inode, be64_to_cpu(lvb->lvb_isize));
2004 
2005 	oi->ip_attr = be32_to_cpu(lvb->lvb_iattr);
2006 	oi->ip_dyn_features = be16_to_cpu(lvb->lvb_idynfeatures);
2007 	ocfs2_set_inode_flags(inode);
2008 
2009 	/* fast-symlinks are a special case */
2010 	if (S_ISLNK(inode->i_mode) && !oi->ip_clusters)
2011 		inode->i_blocks = 0;
2012 	else
2013 		inode->i_blocks = ocfs2_inode_sector_count(inode);
2014 
2015 	inode->i_uid     = be32_to_cpu(lvb->lvb_iuid);
2016 	inode->i_gid     = be32_to_cpu(lvb->lvb_igid);
2017 	inode->i_mode    = be16_to_cpu(lvb->lvb_imode);
2018 	inode->i_nlink   = be16_to_cpu(lvb->lvb_inlink);
2019 	ocfs2_unpack_timespec(&inode->i_atime,
2020 			      be64_to_cpu(lvb->lvb_iatime_packed));
2021 	ocfs2_unpack_timespec(&inode->i_mtime,
2022 			      be64_to_cpu(lvb->lvb_imtime_packed));
2023 	ocfs2_unpack_timespec(&inode->i_ctime,
2024 			      be64_to_cpu(lvb->lvb_ictime_packed));
2025 	spin_unlock(&oi->ip_lock);
2026 
2027 	mlog_exit_void();
2028 }
2029 
2030 static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode,
2031 					      struct ocfs2_lock_res *lockres)
2032 {
2033 	struct ocfs2_meta_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2034 
2035 	if (ocfs2_dlm_lvb_valid(&lockres->l_lksb)
2036 	    && lvb->lvb_version == OCFS2_LVB_VERSION
2037 	    && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation)
2038 		return 1;
2039 	return 0;
2040 }
2041 
2042 /* Determine whether a lock resource needs to be refreshed, and
2043  * arbitrate who gets to refresh it.
2044  *
2045  *   0 means no refresh needed.
2046  *
2047  *   > 0 means you need to refresh this and you MUST call
2048  *   ocfs2_complete_lock_res_refresh afterwards. */
2049 static int ocfs2_should_refresh_lock_res(struct ocfs2_lock_res *lockres)
2050 {
2051 	unsigned long flags;
2052 	int status = 0;
2053 
2054 	mlog_entry_void();
2055 
2056 refresh_check:
2057 	spin_lock_irqsave(&lockres->l_lock, flags);
2058 	if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) {
2059 		spin_unlock_irqrestore(&lockres->l_lock, flags);
2060 		goto bail;
2061 	}
2062 
2063 	if (lockres->l_flags & OCFS2_LOCK_REFRESHING) {
2064 		spin_unlock_irqrestore(&lockres->l_lock, flags);
2065 
2066 		ocfs2_wait_on_refreshing_lock(lockres);
2067 		goto refresh_check;
2068 	}
2069 
2070 	/* Ok, I'll be the one to refresh this lock. */
2071 	lockres_or_flags(lockres, OCFS2_LOCK_REFRESHING);
2072 	spin_unlock_irqrestore(&lockres->l_lock, flags);
2073 
2074 	status = 1;
2075 bail:
2076 	mlog_exit(status);
2077 	return status;
2078 }
2079 
2080 /* If status is non zero, I'll mark it as not being in refresh
2081  * anymroe, but i won't clear the needs refresh flag. */
2082 static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockres,
2083 						   int status)
2084 {
2085 	unsigned long flags;
2086 	mlog_entry_void();
2087 
2088 	spin_lock_irqsave(&lockres->l_lock, flags);
2089 	lockres_clear_flags(lockres, OCFS2_LOCK_REFRESHING);
2090 	if (!status)
2091 		lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
2092 	spin_unlock_irqrestore(&lockres->l_lock, flags);
2093 
2094 	wake_up(&lockres->l_event);
2095 
2096 	mlog_exit_void();
2097 }
2098 
2099 /* may or may not return a bh if it went to disk. */
2100 static int ocfs2_inode_lock_update(struct inode *inode,
2101 				  struct buffer_head **bh)
2102 {
2103 	int status = 0;
2104 	struct ocfs2_inode_info *oi = OCFS2_I(inode);
2105 	struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
2106 	struct ocfs2_dinode *fe;
2107 	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
2108 
2109 	mlog_entry_void();
2110 
2111 	if (ocfs2_mount_local(osb))
2112 		goto bail;
2113 
2114 	spin_lock(&oi->ip_lock);
2115 	if (oi->ip_flags & OCFS2_INODE_DELETED) {
2116 		mlog(0, "Orphaned inode %llu was deleted while we "
2117 		     "were waiting on a lock. ip_flags = 0x%x\n",
2118 		     (unsigned long long)oi->ip_blkno, oi->ip_flags);
2119 		spin_unlock(&oi->ip_lock);
2120 		status = -ENOENT;
2121 		goto bail;
2122 	}
2123 	spin_unlock(&oi->ip_lock);
2124 
2125 	if (!ocfs2_should_refresh_lock_res(lockres))
2126 		goto bail;
2127 
2128 	/* This will discard any caching information we might have had
2129 	 * for the inode metadata. */
2130 	ocfs2_metadata_cache_purge(inode);
2131 
2132 	ocfs2_extent_map_trunc(inode, 0);
2133 
2134 	if (ocfs2_meta_lvb_is_trustable(inode, lockres)) {
2135 		mlog(0, "Trusting LVB on inode %llu\n",
2136 		     (unsigned long long)oi->ip_blkno);
2137 		ocfs2_refresh_inode_from_lvb(inode);
2138 	} else {
2139 		/* Boo, we have to go to disk. */
2140 		/* read bh, cast, ocfs2_refresh_inode */
2141 		status = ocfs2_read_inode_block(inode, bh);
2142 		if (status < 0) {
2143 			mlog_errno(status);
2144 			goto bail_refresh;
2145 		}
2146 		fe = (struct ocfs2_dinode *) (*bh)->b_data;
2147 
2148 		/* This is a good chance to make sure we're not
2149 		 * locking an invalid object.  ocfs2_read_inode_block()
2150 		 * already checked that the inode block is sane.
2151 		 *
2152 		 * We bug on a stale inode here because we checked
2153 		 * above whether it was wiped from disk. The wiping
2154 		 * node provides a guarantee that we receive that
2155 		 * message and can mark the inode before dropping any
2156 		 * locks associated with it. */
2157 		mlog_bug_on_msg(inode->i_generation !=
2158 				le32_to_cpu(fe->i_generation),
2159 				"Invalid dinode %llu disk generation: %u "
2160 				"inode->i_generation: %u\n",
2161 				(unsigned long long)oi->ip_blkno,
2162 				le32_to_cpu(fe->i_generation),
2163 				inode->i_generation);
2164 		mlog_bug_on_msg(le64_to_cpu(fe->i_dtime) ||
2165 				!(fe->i_flags & cpu_to_le32(OCFS2_VALID_FL)),
2166 				"Stale dinode %llu dtime: %llu flags: 0x%x\n",
2167 				(unsigned long long)oi->ip_blkno,
2168 				(unsigned long long)le64_to_cpu(fe->i_dtime),
2169 				le32_to_cpu(fe->i_flags));
2170 
2171 		ocfs2_refresh_inode(inode, fe);
2172 		ocfs2_track_lock_refresh(lockres);
2173 	}
2174 
2175 	status = 0;
2176 bail_refresh:
2177 	ocfs2_complete_lock_res_refresh(lockres, status);
2178 bail:
2179 	mlog_exit(status);
2180 	return status;
2181 }
2182 
2183 static int ocfs2_assign_bh(struct inode *inode,
2184 			   struct buffer_head **ret_bh,
2185 			   struct buffer_head *passed_bh)
2186 {
2187 	int status;
2188 
2189 	if (passed_bh) {
2190 		/* Ok, the update went to disk for us, use the
2191 		 * returned bh. */
2192 		*ret_bh = passed_bh;
2193 		get_bh(*ret_bh);
2194 
2195 		return 0;
2196 	}
2197 
2198 	status = ocfs2_read_inode_block(inode, ret_bh);
2199 	if (status < 0)
2200 		mlog_errno(status);
2201 
2202 	return status;
2203 }
2204 
2205 /*
2206  * returns < 0 error if the callback will never be called, otherwise
2207  * the result of the lock will be communicated via the callback.
2208  */
2209 int ocfs2_inode_lock_full_nested(struct inode *inode,
2210 				 struct buffer_head **ret_bh,
2211 				 int ex,
2212 				 int arg_flags,
2213 				 int subclass)
2214 {
2215 	int status, level, acquired;
2216 	u32 dlm_flags;
2217 	struct ocfs2_lock_res *lockres = NULL;
2218 	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
2219 	struct buffer_head *local_bh = NULL;
2220 
2221 	BUG_ON(!inode);
2222 
2223 	mlog_entry_void();
2224 
2225 	mlog(0, "inode %llu, take %s META lock\n",
2226 	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
2227 	     ex ? "EXMODE" : "PRMODE");
2228 
2229 	status = 0;
2230 	acquired = 0;
2231 	/* We'll allow faking a readonly metadata lock for
2232 	 * rodevices. */
2233 	if (ocfs2_is_hard_readonly(osb)) {
2234 		if (ex)
2235 			status = -EROFS;
2236 		goto bail;
2237 	}
2238 
2239 	if (ocfs2_mount_local(osb))
2240 		goto local;
2241 
2242 	if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
2243 		ocfs2_wait_for_recovery(osb);
2244 
2245 	lockres = &OCFS2_I(inode)->ip_inode_lockres;
2246 	level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2247 	dlm_flags = 0;
2248 	if (arg_flags & OCFS2_META_LOCK_NOQUEUE)
2249 		dlm_flags |= DLM_LKF_NOQUEUE;
2250 
2251 	status = __ocfs2_cluster_lock(osb, lockres, level, dlm_flags,
2252 				      arg_flags, subclass, _RET_IP_);
2253 	if (status < 0) {
2254 		if (status != -EAGAIN && status != -EIOCBRETRY)
2255 			mlog_errno(status);
2256 		goto bail;
2257 	}
2258 
2259 	/* Notify the error cleanup path to drop the cluster lock. */
2260 	acquired = 1;
2261 
2262 	/* We wait twice because a node may have died while we were in
2263 	 * the lower dlm layers. The second time though, we've
2264 	 * committed to owning this lock so we don't allow signals to
2265 	 * abort the operation. */
2266 	if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
2267 		ocfs2_wait_for_recovery(osb);
2268 
2269 local:
2270 	/*
2271 	 * We only see this flag if we're being called from
2272 	 * ocfs2_read_locked_inode(). It means we're locking an inode
2273 	 * which hasn't been populated yet, so clear the refresh flag
2274 	 * and let the caller handle it.
2275 	 */
2276 	if (inode->i_state & I_NEW) {
2277 		status = 0;
2278 		if (lockres)
2279 			ocfs2_complete_lock_res_refresh(lockres, 0);
2280 		goto bail;
2281 	}
2282 
2283 	/* This is fun. The caller may want a bh back, or it may
2284 	 * not. ocfs2_inode_lock_update definitely wants one in, but
2285 	 * may or may not read one, depending on what's in the
2286 	 * LVB. The result of all of this is that we've *only* gone to
2287 	 * disk if we have to, so the complexity is worthwhile. */
2288 	status = ocfs2_inode_lock_update(inode, &local_bh);
2289 	if (status < 0) {
2290 		if (status != -ENOENT)
2291 			mlog_errno(status);
2292 		goto bail;
2293 	}
2294 
2295 	if (ret_bh) {
2296 		status = ocfs2_assign_bh(inode, ret_bh, local_bh);
2297 		if (status < 0) {
2298 			mlog_errno(status);
2299 			goto bail;
2300 		}
2301 	}
2302 
2303 bail:
2304 	if (status < 0) {
2305 		if (ret_bh && (*ret_bh)) {
2306 			brelse(*ret_bh);
2307 			*ret_bh = NULL;
2308 		}
2309 		if (acquired)
2310 			ocfs2_inode_unlock(inode, ex);
2311 	}
2312 
2313 	if (local_bh)
2314 		brelse(local_bh);
2315 
2316 	mlog_exit(status);
2317 	return status;
2318 }
2319 
2320 /*
2321  * This is working around a lock inversion between tasks acquiring DLM
2322  * locks while holding a page lock and the downconvert thread which
2323  * blocks dlm lock acquiry while acquiring page locks.
2324  *
2325  * ** These _with_page variantes are only intended to be called from aop
2326  * methods that hold page locks and return a very specific *positive* error
2327  * code that aop methods pass up to the VFS -- test for errors with != 0. **
2328  *
2329  * The DLM is called such that it returns -EAGAIN if it would have
2330  * blocked waiting for the downconvert thread.  In that case we unlock
2331  * our page so the downconvert thread can make progress.  Once we've
2332  * done this we have to return AOP_TRUNCATED_PAGE so the aop method
2333  * that called us can bubble that back up into the VFS who will then
2334  * immediately retry the aop call.
2335  *
2336  * We do a blocking lock and immediate unlock before returning, though, so that
2337  * the lock has a great chance of being cached on this node by the time the VFS
2338  * calls back to retry the aop.    This has a potential to livelock as nodes
2339  * ping locks back and forth, but that's a risk we're willing to take to avoid
2340  * the lock inversion simply.
2341  */
2342 int ocfs2_inode_lock_with_page(struct inode *inode,
2343 			      struct buffer_head **ret_bh,
2344 			      int ex,
2345 			      struct page *page)
2346 {
2347 	int ret;
2348 
2349 	ret = ocfs2_inode_lock_full(inode, ret_bh, ex, OCFS2_LOCK_NONBLOCK);
2350 	if (ret == -EAGAIN) {
2351 		unlock_page(page);
2352 		if (ocfs2_inode_lock(inode, ret_bh, ex) == 0)
2353 			ocfs2_inode_unlock(inode, ex);
2354 		ret = AOP_TRUNCATED_PAGE;
2355 	}
2356 
2357 	return ret;
2358 }
2359 
2360 int ocfs2_inode_lock_atime(struct inode *inode,
2361 			  struct vfsmount *vfsmnt,
2362 			  int *level)
2363 {
2364 	int ret;
2365 
2366 	mlog_entry_void();
2367 	ret = ocfs2_inode_lock(inode, NULL, 0);
2368 	if (ret < 0) {
2369 		mlog_errno(ret);
2370 		return ret;
2371 	}
2372 
2373 	/*
2374 	 * If we should update atime, we will get EX lock,
2375 	 * otherwise we just get PR lock.
2376 	 */
2377 	if (ocfs2_should_update_atime(inode, vfsmnt)) {
2378 		struct buffer_head *bh = NULL;
2379 
2380 		ocfs2_inode_unlock(inode, 0);
2381 		ret = ocfs2_inode_lock(inode, &bh, 1);
2382 		if (ret < 0) {
2383 			mlog_errno(ret);
2384 			return ret;
2385 		}
2386 		*level = 1;
2387 		if (ocfs2_should_update_atime(inode, vfsmnt))
2388 			ocfs2_update_inode_atime(inode, bh);
2389 		if (bh)
2390 			brelse(bh);
2391 	} else
2392 		*level = 0;
2393 
2394 	mlog_exit(ret);
2395 	return ret;
2396 }
2397 
2398 void ocfs2_inode_unlock(struct inode *inode,
2399 		       int ex)
2400 {
2401 	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2402 	struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_inode_lockres;
2403 	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
2404 
2405 	mlog_entry_void();
2406 
2407 	mlog(0, "inode %llu drop %s META lock\n",
2408 	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
2409 	     ex ? "EXMODE" : "PRMODE");
2410 
2411 	if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)) &&
2412 	    !ocfs2_mount_local(osb))
2413 		ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
2414 
2415 	mlog_exit_void();
2416 }
2417 
2418 int ocfs2_orphan_scan_lock(struct ocfs2_super *osb, u32 *seqno)
2419 {
2420 	struct ocfs2_lock_res *lockres;
2421 	struct ocfs2_orphan_scan_lvb *lvb;
2422 	int status = 0;
2423 
2424 	if (ocfs2_is_hard_readonly(osb))
2425 		return -EROFS;
2426 
2427 	if (ocfs2_mount_local(osb))
2428 		return 0;
2429 
2430 	lockres = &osb->osb_orphan_scan.os_lockres;
2431 	status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0);
2432 	if (status < 0)
2433 		return status;
2434 
2435 	lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2436 	if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) &&
2437 	    lvb->lvb_version == OCFS2_ORPHAN_LVB_VERSION)
2438 		*seqno = be32_to_cpu(lvb->lvb_os_seqno);
2439 	else
2440 		*seqno = osb->osb_orphan_scan.os_seqno + 1;
2441 
2442 	return status;
2443 }
2444 
2445 void ocfs2_orphan_scan_unlock(struct ocfs2_super *osb, u32 seqno)
2446 {
2447 	struct ocfs2_lock_res *lockres;
2448 	struct ocfs2_orphan_scan_lvb *lvb;
2449 
2450 	if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb)) {
2451 		lockres = &osb->osb_orphan_scan.os_lockres;
2452 		lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2453 		lvb->lvb_version = OCFS2_ORPHAN_LVB_VERSION;
2454 		lvb->lvb_os_seqno = cpu_to_be32(seqno);
2455 		ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX);
2456 	}
2457 }
2458 
2459 int ocfs2_super_lock(struct ocfs2_super *osb,
2460 		     int ex)
2461 {
2462 	int status = 0;
2463 	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2464 	struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
2465 
2466 	mlog_entry_void();
2467 
2468 	if (ocfs2_is_hard_readonly(osb))
2469 		return -EROFS;
2470 
2471 	if (ocfs2_mount_local(osb))
2472 		goto bail;
2473 
2474 	status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
2475 	if (status < 0) {
2476 		mlog_errno(status);
2477 		goto bail;
2478 	}
2479 
2480 	/* The super block lock path is really in the best position to
2481 	 * know when resources covered by the lock need to be
2482 	 * refreshed, so we do it here. Of course, making sense of
2483 	 * everything is up to the caller :) */
2484 	status = ocfs2_should_refresh_lock_res(lockres);
2485 	if (status < 0) {
2486 		mlog_errno(status);
2487 		goto bail;
2488 	}
2489 	if (status) {
2490 		status = ocfs2_refresh_slot_info(osb);
2491 
2492 		ocfs2_complete_lock_res_refresh(lockres, status);
2493 
2494 		if (status < 0)
2495 			mlog_errno(status);
2496 		ocfs2_track_lock_refresh(lockres);
2497 	}
2498 bail:
2499 	mlog_exit(status);
2500 	return status;
2501 }
2502 
2503 void ocfs2_super_unlock(struct ocfs2_super *osb,
2504 			int ex)
2505 {
2506 	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2507 	struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
2508 
2509 	if (!ocfs2_mount_local(osb))
2510 		ocfs2_cluster_unlock(osb, lockres, level);
2511 }
2512 
2513 int ocfs2_rename_lock(struct ocfs2_super *osb)
2514 {
2515 	int status;
2516 	struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
2517 
2518 	if (ocfs2_is_hard_readonly(osb))
2519 		return -EROFS;
2520 
2521 	if (ocfs2_mount_local(osb))
2522 		return 0;
2523 
2524 	status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0);
2525 	if (status < 0)
2526 		mlog_errno(status);
2527 
2528 	return status;
2529 }
2530 
2531 void ocfs2_rename_unlock(struct ocfs2_super *osb)
2532 {
2533 	struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
2534 
2535 	if (!ocfs2_mount_local(osb))
2536 		ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX);
2537 }
2538 
2539 int ocfs2_nfs_sync_lock(struct ocfs2_super *osb, int ex)
2540 {
2541 	int status;
2542 	struct ocfs2_lock_res *lockres = &osb->osb_nfs_sync_lockres;
2543 
2544 	if (ocfs2_is_hard_readonly(osb))
2545 		return -EROFS;
2546 
2547 	if (ocfs2_mount_local(osb))
2548 		return 0;
2549 
2550 	status = ocfs2_cluster_lock(osb, lockres, ex ? LKM_EXMODE : LKM_PRMODE,
2551 				    0, 0);
2552 	if (status < 0)
2553 		mlog(ML_ERROR, "lock on nfs sync lock failed %d\n", status);
2554 
2555 	return status;
2556 }
2557 
2558 void ocfs2_nfs_sync_unlock(struct ocfs2_super *osb, int ex)
2559 {
2560 	struct ocfs2_lock_res *lockres = &osb->osb_nfs_sync_lockres;
2561 
2562 	if (!ocfs2_mount_local(osb))
2563 		ocfs2_cluster_unlock(osb, lockres,
2564 				     ex ? LKM_EXMODE : LKM_PRMODE);
2565 }
2566 
2567 int ocfs2_dentry_lock(struct dentry *dentry, int ex)
2568 {
2569 	int ret;
2570 	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2571 	struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
2572 	struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
2573 
2574 	BUG_ON(!dl);
2575 
2576 	if (ocfs2_is_hard_readonly(osb))
2577 		return -EROFS;
2578 
2579 	if (ocfs2_mount_local(osb))
2580 		return 0;
2581 
2582 	ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0);
2583 	if (ret < 0)
2584 		mlog_errno(ret);
2585 
2586 	return ret;
2587 }
2588 
2589 void ocfs2_dentry_unlock(struct dentry *dentry, int ex)
2590 {
2591 	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2592 	struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
2593 	struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
2594 
2595 	if (!ocfs2_mount_local(osb))
2596 		ocfs2_cluster_unlock(osb, &dl->dl_lockres, level);
2597 }
2598 
2599 /* Reference counting of the dlm debug structure. We want this because
2600  * open references on the debug inodes can live on after a mount, so
2601  * we can't rely on the ocfs2_super to always exist. */
2602 static void ocfs2_dlm_debug_free(struct kref *kref)
2603 {
2604 	struct ocfs2_dlm_debug *dlm_debug;
2605 
2606 	dlm_debug = container_of(kref, struct ocfs2_dlm_debug, d_refcnt);
2607 
2608 	kfree(dlm_debug);
2609 }
2610 
2611 void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug)
2612 {
2613 	if (dlm_debug)
2614 		kref_put(&dlm_debug->d_refcnt, ocfs2_dlm_debug_free);
2615 }
2616 
2617 static void ocfs2_get_dlm_debug(struct ocfs2_dlm_debug *debug)
2618 {
2619 	kref_get(&debug->d_refcnt);
2620 }
2621 
2622 struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void)
2623 {
2624 	struct ocfs2_dlm_debug *dlm_debug;
2625 
2626 	dlm_debug = kmalloc(sizeof(struct ocfs2_dlm_debug), GFP_KERNEL);
2627 	if (!dlm_debug) {
2628 		mlog_errno(-ENOMEM);
2629 		goto out;
2630 	}
2631 
2632 	kref_init(&dlm_debug->d_refcnt);
2633 	INIT_LIST_HEAD(&dlm_debug->d_lockres_tracking);
2634 	dlm_debug->d_locking_state = NULL;
2635 out:
2636 	return dlm_debug;
2637 }
2638 
2639 /* Access to this is arbitrated for us via seq_file->sem. */
2640 struct ocfs2_dlm_seq_priv {
2641 	struct ocfs2_dlm_debug *p_dlm_debug;
2642 	struct ocfs2_lock_res p_iter_res;
2643 	struct ocfs2_lock_res p_tmp_res;
2644 };
2645 
2646 static struct ocfs2_lock_res *ocfs2_dlm_next_res(struct ocfs2_lock_res *start,
2647 						 struct ocfs2_dlm_seq_priv *priv)
2648 {
2649 	struct ocfs2_lock_res *iter, *ret = NULL;
2650 	struct ocfs2_dlm_debug *dlm_debug = priv->p_dlm_debug;
2651 
2652 	assert_spin_locked(&ocfs2_dlm_tracking_lock);
2653 
2654 	list_for_each_entry(iter, &start->l_debug_list, l_debug_list) {
2655 		/* discover the head of the list */
2656 		if (&iter->l_debug_list == &dlm_debug->d_lockres_tracking) {
2657 			mlog(0, "End of list found, %p\n", ret);
2658 			break;
2659 		}
2660 
2661 		/* We track our "dummy" iteration lockres' by a NULL
2662 		 * l_ops field. */
2663 		if (iter->l_ops != NULL) {
2664 			ret = iter;
2665 			break;
2666 		}
2667 	}
2668 
2669 	return ret;
2670 }
2671 
2672 static void *ocfs2_dlm_seq_start(struct seq_file *m, loff_t *pos)
2673 {
2674 	struct ocfs2_dlm_seq_priv *priv = m->private;
2675 	struct ocfs2_lock_res *iter;
2676 
2677 	spin_lock(&ocfs2_dlm_tracking_lock);
2678 	iter = ocfs2_dlm_next_res(&priv->p_iter_res, priv);
2679 	if (iter) {
2680 		/* Since lockres' have the lifetime of their container
2681 		 * (which can be inodes, ocfs2_supers, etc) we want to
2682 		 * copy this out to a temporary lockres while still
2683 		 * under the spinlock. Obviously after this we can't
2684 		 * trust any pointers on the copy returned, but that's
2685 		 * ok as the information we want isn't typically held
2686 		 * in them. */
2687 		priv->p_tmp_res = *iter;
2688 		iter = &priv->p_tmp_res;
2689 	}
2690 	spin_unlock(&ocfs2_dlm_tracking_lock);
2691 
2692 	return iter;
2693 }
2694 
2695 static void ocfs2_dlm_seq_stop(struct seq_file *m, void *v)
2696 {
2697 }
2698 
2699 static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos)
2700 {
2701 	struct ocfs2_dlm_seq_priv *priv = m->private;
2702 	struct ocfs2_lock_res *iter = v;
2703 	struct ocfs2_lock_res *dummy = &priv->p_iter_res;
2704 
2705 	spin_lock(&ocfs2_dlm_tracking_lock);
2706 	iter = ocfs2_dlm_next_res(iter, priv);
2707 	list_del_init(&dummy->l_debug_list);
2708 	if (iter) {
2709 		list_add(&dummy->l_debug_list, &iter->l_debug_list);
2710 		priv->p_tmp_res = *iter;
2711 		iter = &priv->p_tmp_res;
2712 	}
2713 	spin_unlock(&ocfs2_dlm_tracking_lock);
2714 
2715 	return iter;
2716 }
2717 
2718 /* So that debugfs.ocfs2 can determine which format is being used */
2719 #define OCFS2_DLM_DEBUG_STR_VERSION 2
2720 static int ocfs2_dlm_seq_show(struct seq_file *m, void *v)
2721 {
2722 	int i;
2723 	char *lvb;
2724 	struct ocfs2_lock_res *lockres = v;
2725 
2726 	if (!lockres)
2727 		return -EINVAL;
2728 
2729 	seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION);
2730 
2731 	if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY)
2732 		seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1,
2733 			   lockres->l_name,
2734 			   (unsigned int)ocfs2_get_dentry_lock_ino(lockres));
2735 	else
2736 		seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name);
2737 
2738 	seq_printf(m, "%d\t"
2739 		   "0x%lx\t"
2740 		   "0x%x\t"
2741 		   "0x%x\t"
2742 		   "%u\t"
2743 		   "%u\t"
2744 		   "%d\t"
2745 		   "%d\t",
2746 		   lockres->l_level,
2747 		   lockres->l_flags,
2748 		   lockres->l_action,
2749 		   lockres->l_unlock_action,
2750 		   lockres->l_ro_holders,
2751 		   lockres->l_ex_holders,
2752 		   lockres->l_requested,
2753 		   lockres->l_blocking);
2754 
2755 	/* Dump the raw LVB */
2756 	lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2757 	for(i = 0; i < DLM_LVB_LEN; i++)
2758 		seq_printf(m, "0x%x\t", lvb[i]);
2759 
2760 #ifdef CONFIG_OCFS2_FS_STATS
2761 # define lock_num_prmode(_l)		(_l)->l_lock_num_prmode
2762 # define lock_num_exmode(_l)		(_l)->l_lock_num_exmode
2763 # define lock_num_prmode_failed(_l)	(_l)->l_lock_num_prmode_failed
2764 # define lock_num_exmode_failed(_l)	(_l)->l_lock_num_exmode_failed
2765 # define lock_total_prmode(_l)		(_l)->l_lock_total_prmode
2766 # define lock_total_exmode(_l)		(_l)->l_lock_total_exmode
2767 # define lock_max_prmode(_l)		(_l)->l_lock_max_prmode
2768 # define lock_max_exmode(_l)		(_l)->l_lock_max_exmode
2769 # define lock_refresh(_l)		(_l)->l_lock_refresh
2770 #else
2771 # define lock_num_prmode(_l)		(0ULL)
2772 # define lock_num_exmode(_l)		(0ULL)
2773 # define lock_num_prmode_failed(_l)	(0)
2774 # define lock_num_exmode_failed(_l)	(0)
2775 # define lock_total_prmode(_l)		(0ULL)
2776 # define lock_total_exmode(_l)		(0ULL)
2777 # define lock_max_prmode(_l)		(0)
2778 # define lock_max_exmode(_l)		(0)
2779 # define lock_refresh(_l)		(0)
2780 #endif
2781 	/* The following seq_print was added in version 2 of this output */
2782 	seq_printf(m, "%llu\t"
2783 		   "%llu\t"
2784 		   "%u\t"
2785 		   "%u\t"
2786 		   "%llu\t"
2787 		   "%llu\t"
2788 		   "%u\t"
2789 		   "%u\t"
2790 		   "%u\t",
2791 		   lock_num_prmode(lockres),
2792 		   lock_num_exmode(lockres),
2793 		   lock_num_prmode_failed(lockres),
2794 		   lock_num_exmode_failed(lockres),
2795 		   lock_total_prmode(lockres),
2796 		   lock_total_exmode(lockres),
2797 		   lock_max_prmode(lockres),
2798 		   lock_max_exmode(lockres),
2799 		   lock_refresh(lockres));
2800 
2801 	/* End the line */
2802 	seq_printf(m, "\n");
2803 	return 0;
2804 }
2805 
2806 static const struct seq_operations ocfs2_dlm_seq_ops = {
2807 	.start =	ocfs2_dlm_seq_start,
2808 	.stop =		ocfs2_dlm_seq_stop,
2809 	.next =		ocfs2_dlm_seq_next,
2810 	.show =		ocfs2_dlm_seq_show,
2811 };
2812 
2813 static int ocfs2_dlm_debug_release(struct inode *inode, struct file *file)
2814 {
2815 	struct seq_file *seq = (struct seq_file *) file->private_data;
2816 	struct ocfs2_dlm_seq_priv *priv = seq->private;
2817 	struct ocfs2_lock_res *res = &priv->p_iter_res;
2818 
2819 	ocfs2_remove_lockres_tracking(res);
2820 	ocfs2_put_dlm_debug(priv->p_dlm_debug);
2821 	return seq_release_private(inode, file);
2822 }
2823 
2824 static int ocfs2_dlm_debug_open(struct inode *inode, struct file *file)
2825 {
2826 	int ret;
2827 	struct ocfs2_dlm_seq_priv *priv;
2828 	struct seq_file *seq;
2829 	struct ocfs2_super *osb;
2830 
2831 	priv = kzalloc(sizeof(struct ocfs2_dlm_seq_priv), GFP_KERNEL);
2832 	if (!priv) {
2833 		ret = -ENOMEM;
2834 		mlog_errno(ret);
2835 		goto out;
2836 	}
2837 	osb = inode->i_private;
2838 	ocfs2_get_dlm_debug(osb->osb_dlm_debug);
2839 	priv->p_dlm_debug = osb->osb_dlm_debug;
2840 	INIT_LIST_HEAD(&priv->p_iter_res.l_debug_list);
2841 
2842 	ret = seq_open(file, &ocfs2_dlm_seq_ops);
2843 	if (ret) {
2844 		kfree(priv);
2845 		mlog_errno(ret);
2846 		goto out;
2847 	}
2848 
2849 	seq = (struct seq_file *) file->private_data;
2850 	seq->private = priv;
2851 
2852 	ocfs2_add_lockres_tracking(&priv->p_iter_res,
2853 				   priv->p_dlm_debug);
2854 
2855 out:
2856 	return ret;
2857 }
2858 
2859 static const struct file_operations ocfs2_dlm_debug_fops = {
2860 	.open =		ocfs2_dlm_debug_open,
2861 	.release =	ocfs2_dlm_debug_release,
2862 	.read =		seq_read,
2863 	.llseek =	seq_lseek,
2864 };
2865 
2866 static int ocfs2_dlm_init_debug(struct ocfs2_super *osb)
2867 {
2868 	int ret = 0;
2869 	struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
2870 
2871 	dlm_debug->d_locking_state = debugfs_create_file("locking_state",
2872 							 S_IFREG|S_IRUSR,
2873 							 osb->osb_debug_root,
2874 							 osb,
2875 							 &ocfs2_dlm_debug_fops);
2876 	if (!dlm_debug->d_locking_state) {
2877 		ret = -EINVAL;
2878 		mlog(ML_ERROR,
2879 		     "Unable to create locking state debugfs file.\n");
2880 		goto out;
2881 	}
2882 
2883 	ocfs2_get_dlm_debug(dlm_debug);
2884 out:
2885 	return ret;
2886 }
2887 
2888 static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb)
2889 {
2890 	struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
2891 
2892 	if (dlm_debug) {
2893 		debugfs_remove(dlm_debug->d_locking_state);
2894 		ocfs2_put_dlm_debug(dlm_debug);
2895 	}
2896 }
2897 
2898 int ocfs2_dlm_init(struct ocfs2_super *osb)
2899 {
2900 	int status = 0;
2901 	struct ocfs2_cluster_connection *conn = NULL;
2902 
2903 	mlog_entry_void();
2904 
2905 	if (ocfs2_mount_local(osb)) {
2906 		osb->node_num = 0;
2907 		goto local;
2908 	}
2909 
2910 	status = ocfs2_dlm_init_debug(osb);
2911 	if (status < 0) {
2912 		mlog_errno(status);
2913 		goto bail;
2914 	}
2915 
2916 	/* launch downconvert thread */
2917 	osb->dc_task = kthread_run(ocfs2_downconvert_thread, osb, "ocfs2dc");
2918 	if (IS_ERR(osb->dc_task)) {
2919 		status = PTR_ERR(osb->dc_task);
2920 		osb->dc_task = NULL;
2921 		mlog_errno(status);
2922 		goto bail;
2923 	}
2924 
2925 	/* for now, uuid == domain */
2926 	status = ocfs2_cluster_connect(osb->osb_cluster_stack,
2927 				       osb->uuid_str,
2928 				       strlen(osb->uuid_str),
2929 				       ocfs2_do_node_down, osb,
2930 				       &conn);
2931 	if (status) {
2932 		mlog_errno(status);
2933 		goto bail;
2934 	}
2935 
2936 	status = ocfs2_cluster_this_node(&osb->node_num);
2937 	if (status < 0) {
2938 		mlog_errno(status);
2939 		mlog(ML_ERROR,
2940 		     "could not find this host's node number\n");
2941 		ocfs2_cluster_disconnect(conn, 0);
2942 		goto bail;
2943 	}
2944 
2945 local:
2946 	ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb);
2947 	ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb);
2948 	ocfs2_nfs_sync_lock_res_init(&osb->osb_nfs_sync_lockres, osb);
2949 	ocfs2_orphan_scan_lock_res_init(&osb->osb_orphan_scan.os_lockres, osb);
2950 
2951 	osb->cconn = conn;
2952 
2953 	status = 0;
2954 bail:
2955 	if (status < 0) {
2956 		ocfs2_dlm_shutdown_debug(osb);
2957 		if (osb->dc_task)
2958 			kthread_stop(osb->dc_task);
2959 	}
2960 
2961 	mlog_exit(status);
2962 	return status;
2963 }
2964 
2965 void ocfs2_dlm_shutdown(struct ocfs2_super *osb,
2966 			int hangup_pending)
2967 {
2968 	mlog_entry_void();
2969 
2970 	ocfs2_drop_osb_locks(osb);
2971 
2972 	/*
2973 	 * Now that we have dropped all locks and ocfs2_dismount_volume()
2974 	 * has disabled recovery, the DLM won't be talking to us.  It's
2975 	 * safe to tear things down before disconnecting the cluster.
2976 	 */
2977 
2978 	if (osb->dc_task) {
2979 		kthread_stop(osb->dc_task);
2980 		osb->dc_task = NULL;
2981 	}
2982 
2983 	ocfs2_lock_res_free(&osb->osb_super_lockres);
2984 	ocfs2_lock_res_free(&osb->osb_rename_lockres);
2985 	ocfs2_lock_res_free(&osb->osb_nfs_sync_lockres);
2986 	ocfs2_lock_res_free(&osb->osb_orphan_scan.os_lockres);
2987 
2988 	ocfs2_cluster_disconnect(osb->cconn, hangup_pending);
2989 	osb->cconn = NULL;
2990 
2991 	ocfs2_dlm_shutdown_debug(osb);
2992 
2993 	mlog_exit_void();
2994 }
2995 
2996 static void ocfs2_unlock_ast(void *opaque, int error)
2997 {
2998 	struct ocfs2_lock_res *lockres = opaque;
2999 	unsigned long flags;
3000 
3001 	mlog_entry_void();
3002 
3003 	mlog(0, "UNLOCK AST called on lock %s, action = %d\n", lockres->l_name,
3004 	     lockres->l_unlock_action);
3005 
3006 	spin_lock_irqsave(&lockres->l_lock, flags);
3007 	if (error) {
3008 		mlog(ML_ERROR, "Dlm passes error %d for lock %s, "
3009 		     "unlock_action %d\n", error, lockres->l_name,
3010 		     lockres->l_unlock_action);
3011 		spin_unlock_irqrestore(&lockres->l_lock, flags);
3012 		return;
3013 	}
3014 
3015 	switch(lockres->l_unlock_action) {
3016 	case OCFS2_UNLOCK_CANCEL_CONVERT:
3017 		mlog(0, "Cancel convert success for %s\n", lockres->l_name);
3018 		lockres->l_action = OCFS2_AST_INVALID;
3019 		/* Downconvert thread may have requeued this lock, we
3020 		 * need to wake it. */
3021 		if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
3022 			ocfs2_wake_downconvert_thread(ocfs2_get_lockres_osb(lockres));
3023 		break;
3024 	case OCFS2_UNLOCK_DROP_LOCK:
3025 		lockres->l_level = DLM_LOCK_IV;
3026 		break;
3027 	default:
3028 		BUG();
3029 	}
3030 
3031 	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
3032 	lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
3033 	wake_up(&lockres->l_event);
3034 	spin_unlock_irqrestore(&lockres->l_lock, flags);
3035 
3036 	mlog_exit_void();
3037 }
3038 
3039 static int ocfs2_drop_lock(struct ocfs2_super *osb,
3040 			   struct ocfs2_lock_res *lockres)
3041 {
3042 	int ret;
3043 	unsigned long flags;
3044 	u32 lkm_flags = 0;
3045 
3046 	/* We didn't get anywhere near actually using this lockres. */
3047 	if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED))
3048 		goto out;
3049 
3050 	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
3051 		lkm_flags |= DLM_LKF_VALBLK;
3052 
3053 	spin_lock_irqsave(&lockres->l_lock, flags);
3054 
3055 	mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_FREEING),
3056 			"lockres %s, flags 0x%lx\n",
3057 			lockres->l_name, lockres->l_flags);
3058 
3059 	while (lockres->l_flags & OCFS2_LOCK_BUSY) {
3060 		mlog(0, "waiting on busy lock \"%s\": flags = %lx, action = "
3061 		     "%u, unlock_action = %u\n",
3062 		     lockres->l_name, lockres->l_flags, lockres->l_action,
3063 		     lockres->l_unlock_action);
3064 
3065 		spin_unlock_irqrestore(&lockres->l_lock, flags);
3066 
3067 		/* XXX: Today we just wait on any busy
3068 		 * locks... Perhaps we need to cancel converts in the
3069 		 * future? */
3070 		ocfs2_wait_on_busy_lock(lockres);
3071 
3072 		spin_lock_irqsave(&lockres->l_lock, flags);
3073 	}
3074 
3075 	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
3076 		if (lockres->l_flags & OCFS2_LOCK_ATTACHED &&
3077 		    lockres->l_level == DLM_LOCK_EX &&
3078 		    !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
3079 			lockres->l_ops->set_lvb(lockres);
3080 	}
3081 
3082 	if (lockres->l_flags & OCFS2_LOCK_BUSY)
3083 		mlog(ML_ERROR, "destroying busy lock: \"%s\"\n",
3084 		     lockres->l_name);
3085 	if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
3086 		mlog(0, "destroying blocked lock: \"%s\"\n", lockres->l_name);
3087 
3088 	if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
3089 		spin_unlock_irqrestore(&lockres->l_lock, flags);
3090 		goto out;
3091 	}
3092 
3093 	lockres_clear_flags(lockres, OCFS2_LOCK_ATTACHED);
3094 
3095 	/* make sure we never get here while waiting for an ast to
3096 	 * fire. */
3097 	BUG_ON(lockres->l_action != OCFS2_AST_INVALID);
3098 
3099 	/* is this necessary? */
3100 	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
3101 	lockres->l_unlock_action = OCFS2_UNLOCK_DROP_LOCK;
3102 	spin_unlock_irqrestore(&lockres->l_lock, flags);
3103 
3104 	mlog(0, "lock %s\n", lockres->l_name);
3105 
3106 	ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, lkm_flags,
3107 			       lockres);
3108 	if (ret) {
3109 		ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
3110 		mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags);
3111 		ocfs2_dlm_dump_lksb(&lockres->l_lksb);
3112 		BUG();
3113 	}
3114 	mlog(0, "lock %s, successful return from ocfs2_dlm_unlock\n",
3115 	     lockres->l_name);
3116 
3117 	ocfs2_wait_on_busy_lock(lockres);
3118 out:
3119 	mlog_exit(0);
3120 	return 0;
3121 }
3122 
3123 /* Mark the lockres as being dropped. It will no longer be
3124  * queued if blocking, but we still may have to wait on it
3125  * being dequeued from the downconvert thread before we can consider
3126  * it safe to drop.
3127  *
3128  * You can *not* attempt to call cluster_lock on this lockres anymore. */
3129 void ocfs2_mark_lockres_freeing(struct ocfs2_lock_res *lockres)
3130 {
3131 	int status;
3132 	struct ocfs2_mask_waiter mw;
3133 	unsigned long flags;
3134 
3135 	ocfs2_init_mask_waiter(&mw);
3136 
3137 	spin_lock_irqsave(&lockres->l_lock, flags);
3138 	lockres->l_flags |= OCFS2_LOCK_FREEING;
3139 	while (lockres->l_flags & OCFS2_LOCK_QUEUED) {
3140 		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_QUEUED, 0);
3141 		spin_unlock_irqrestore(&lockres->l_lock, flags);
3142 
3143 		mlog(0, "Waiting on lockres %s\n", lockres->l_name);
3144 
3145 		status = ocfs2_wait_for_mask(&mw);
3146 		if (status)
3147 			mlog_errno(status);
3148 
3149 		spin_lock_irqsave(&lockres->l_lock, flags);
3150 	}
3151 	spin_unlock_irqrestore(&lockres->l_lock, flags);
3152 }
3153 
3154 void ocfs2_simple_drop_lockres(struct ocfs2_super *osb,
3155 			       struct ocfs2_lock_res *lockres)
3156 {
3157 	int ret;
3158 
3159 	ocfs2_mark_lockres_freeing(lockres);
3160 	ret = ocfs2_drop_lock(osb, lockres);
3161 	if (ret)
3162 		mlog_errno(ret);
3163 }
3164 
3165 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb)
3166 {
3167 	ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres);
3168 	ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres);
3169 	ocfs2_simple_drop_lockres(osb, &osb->osb_nfs_sync_lockres);
3170 	ocfs2_simple_drop_lockres(osb, &osb->osb_orphan_scan.os_lockres);
3171 }
3172 
3173 int ocfs2_drop_inode_locks(struct inode *inode)
3174 {
3175 	int status, err;
3176 
3177 	mlog_entry_void();
3178 
3179 	/* No need to call ocfs2_mark_lockres_freeing here -
3180 	 * ocfs2_clear_inode has done it for us. */
3181 
3182 	err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
3183 			      &OCFS2_I(inode)->ip_open_lockres);
3184 	if (err < 0)
3185 		mlog_errno(err);
3186 
3187 	status = err;
3188 
3189 	err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
3190 			      &OCFS2_I(inode)->ip_inode_lockres);
3191 	if (err < 0)
3192 		mlog_errno(err);
3193 	if (err < 0 && !status)
3194 		status = err;
3195 
3196 	err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
3197 			      &OCFS2_I(inode)->ip_rw_lockres);
3198 	if (err < 0)
3199 		mlog_errno(err);
3200 	if (err < 0 && !status)
3201 		status = err;
3202 
3203 	mlog_exit(status);
3204 	return status;
3205 }
3206 
3207 static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
3208 					      int new_level)
3209 {
3210 	assert_spin_locked(&lockres->l_lock);
3211 
3212 	BUG_ON(lockres->l_blocking <= DLM_LOCK_NL);
3213 
3214 	if (lockres->l_level <= new_level) {
3215 		mlog(ML_ERROR, "lockres->l_level (%d) <= new_level (%d)\n",
3216 		     lockres->l_level, new_level);
3217 		BUG();
3218 	}
3219 
3220 	mlog(0, "lock %s, new_level = %d, l_blocking = %d\n",
3221 	     lockres->l_name, new_level, lockres->l_blocking);
3222 
3223 	lockres->l_action = OCFS2_AST_DOWNCONVERT;
3224 	lockres->l_requested = new_level;
3225 	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
3226 	return lockres_set_pending(lockres);
3227 }
3228 
3229 static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
3230 				  struct ocfs2_lock_res *lockres,
3231 				  int new_level,
3232 				  int lvb,
3233 				  unsigned int generation)
3234 {
3235 	int ret;
3236 	u32 dlm_flags = DLM_LKF_CONVERT;
3237 
3238 	mlog_entry_void();
3239 
3240 	if (lvb)
3241 		dlm_flags |= DLM_LKF_VALBLK;
3242 
3243 	ret = ocfs2_dlm_lock(osb->cconn,
3244 			     new_level,
3245 			     &lockres->l_lksb,
3246 			     dlm_flags,
3247 			     lockres->l_name,
3248 			     OCFS2_LOCK_ID_MAX_LEN - 1,
3249 			     lockres);
3250 	lockres_clear_pending(lockres, generation, osb);
3251 	if (ret) {
3252 		ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
3253 		ocfs2_recover_from_dlm_error(lockres, 1);
3254 		goto bail;
3255 	}
3256 
3257 	ret = 0;
3258 bail:
3259 	mlog_exit(ret);
3260 	return ret;
3261 }
3262 
3263 /* returns 1 when the caller should unlock and call ocfs2_dlm_unlock */
3264 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
3265 				        struct ocfs2_lock_res *lockres)
3266 {
3267 	assert_spin_locked(&lockres->l_lock);
3268 
3269 	mlog_entry_void();
3270 	mlog(0, "lock %s\n", lockres->l_name);
3271 
3272 	if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
3273 		/* If we're already trying to cancel a lock conversion
3274 		 * then just drop the spinlock and allow the caller to
3275 		 * requeue this lock. */
3276 
3277 		mlog(0, "Lockres %s, skip convert\n", lockres->l_name);
3278 		return 0;
3279 	}
3280 
3281 	/* were we in a convert when we got the bast fire? */
3282 	BUG_ON(lockres->l_action != OCFS2_AST_CONVERT &&
3283 	       lockres->l_action != OCFS2_AST_DOWNCONVERT);
3284 	/* set things up for the unlockast to know to just
3285 	 * clear out the ast_action and unset busy, etc. */
3286 	lockres->l_unlock_action = OCFS2_UNLOCK_CANCEL_CONVERT;
3287 
3288 	mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_BUSY),
3289 			"lock %s, invalid flags: 0x%lx\n",
3290 			lockres->l_name, lockres->l_flags);
3291 
3292 	return 1;
3293 }
3294 
3295 static int ocfs2_cancel_convert(struct ocfs2_super *osb,
3296 				struct ocfs2_lock_res *lockres)
3297 {
3298 	int ret;
3299 
3300 	mlog_entry_void();
3301 	mlog(0, "lock %s\n", lockres->l_name);
3302 
3303 	ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb,
3304 			       DLM_LKF_CANCEL, lockres);
3305 	if (ret) {
3306 		ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
3307 		ocfs2_recover_from_dlm_error(lockres, 0);
3308 	}
3309 
3310 	mlog(0, "lock %s return from ocfs2_dlm_unlock\n", lockres->l_name);
3311 
3312 	mlog_exit(ret);
3313 	return ret;
3314 }
3315 
3316 static int ocfs2_unblock_lock(struct ocfs2_super *osb,
3317 			      struct ocfs2_lock_res *lockres,
3318 			      struct ocfs2_unblock_ctl *ctl)
3319 {
3320 	unsigned long flags;
3321 	int blocking;
3322 	int new_level;
3323 	int ret = 0;
3324 	int set_lvb = 0;
3325 	unsigned int gen;
3326 
3327 	mlog_entry_void();
3328 
3329 	spin_lock_irqsave(&lockres->l_lock, flags);
3330 
3331 	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
3332 
3333 recheck:
3334 	if (lockres->l_flags & OCFS2_LOCK_BUSY) {
3335 		/* XXX
3336 		 * This is a *big* race.  The OCFS2_LOCK_PENDING flag
3337 		 * exists entirely for one reason - another thread has set
3338 		 * OCFS2_LOCK_BUSY, but has *NOT* yet called dlm_lock().
3339 		 *
3340 		 * If we do ocfs2_cancel_convert() before the other thread
3341 		 * calls dlm_lock(), our cancel will do nothing.  We will
3342 		 * get no ast, and we will have no way of knowing the
3343 		 * cancel failed.  Meanwhile, the other thread will call
3344 		 * into dlm_lock() and wait...forever.
3345 		 *
3346 		 * Why forever?  Because another node has asked for the
3347 		 * lock first; that's why we're here in unblock_lock().
3348 		 *
3349 		 * The solution is OCFS2_LOCK_PENDING.  When PENDING is
3350 		 * set, we just requeue the unblock.  Only when the other
3351 		 * thread has called dlm_lock() and cleared PENDING will
3352 		 * we then cancel their request.
3353 		 *
3354 		 * All callers of dlm_lock() must set OCFS2_DLM_PENDING
3355 		 * at the same time they set OCFS2_DLM_BUSY.  They must
3356 		 * clear OCFS2_DLM_PENDING after dlm_lock() returns.
3357 		 */
3358 		if (lockres->l_flags & OCFS2_LOCK_PENDING)
3359 			goto leave_requeue;
3360 
3361 		ctl->requeue = 1;
3362 		ret = ocfs2_prepare_cancel_convert(osb, lockres);
3363 		spin_unlock_irqrestore(&lockres->l_lock, flags);
3364 		if (ret) {
3365 			ret = ocfs2_cancel_convert(osb, lockres);
3366 			if (ret < 0)
3367 				mlog_errno(ret);
3368 		}
3369 		goto leave;
3370 	}
3371 
3372 	/* if we're blocking an exclusive and we have *any* holders,
3373 	 * then requeue. */
3374 	if ((lockres->l_blocking == DLM_LOCK_EX)
3375 	    && (lockres->l_ex_holders || lockres->l_ro_holders))
3376 		goto leave_requeue;
3377 
3378 	/* If it's a PR we're blocking, then only
3379 	 * requeue if we've got any EX holders */
3380 	if (lockres->l_blocking == DLM_LOCK_PR &&
3381 	    lockres->l_ex_holders)
3382 		goto leave_requeue;
3383 
3384 	/*
3385 	 * Can we get a lock in this state if the holder counts are
3386 	 * zero? The meta data unblock code used to check this.
3387 	 */
3388 	if ((lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
3389 	    && (lockres->l_flags & OCFS2_LOCK_REFRESHING))
3390 		goto leave_requeue;
3391 
3392 	new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking);
3393 
3394 	if (lockres->l_ops->check_downconvert
3395 	    && !lockres->l_ops->check_downconvert(lockres, new_level))
3396 		goto leave_requeue;
3397 
3398 	/* If we get here, then we know that there are no more
3399 	 * incompatible holders (and anyone asking for an incompatible
3400 	 * lock is blocked). We can now downconvert the lock */
3401 	if (!lockres->l_ops->downconvert_worker)
3402 		goto downconvert;
3403 
3404 	/* Some lockres types want to do a bit of work before
3405 	 * downconverting a lock. Allow that here. The worker function
3406 	 * may sleep, so we save off a copy of what we're blocking as
3407 	 * it may change while we're not holding the spin lock. */
3408 	blocking = lockres->l_blocking;
3409 	spin_unlock_irqrestore(&lockres->l_lock, flags);
3410 
3411 	ctl->unblock_action = lockres->l_ops->downconvert_worker(lockres, blocking);
3412 
3413 	if (ctl->unblock_action == UNBLOCK_STOP_POST)
3414 		goto leave;
3415 
3416 	spin_lock_irqsave(&lockres->l_lock, flags);
3417 	if (blocking != lockres->l_blocking) {
3418 		/* If this changed underneath us, then we can't drop
3419 		 * it just yet. */
3420 		goto recheck;
3421 	}
3422 
3423 downconvert:
3424 	ctl->requeue = 0;
3425 
3426 	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
3427 		if (lockres->l_level == DLM_LOCK_EX)
3428 			set_lvb = 1;
3429 
3430 		/*
3431 		 * We only set the lvb if the lock has been fully
3432 		 * refreshed - otherwise we risk setting stale
3433 		 * data. Otherwise, there's no need to actually clear
3434 		 * out the lvb here as it's value is still valid.
3435 		 */
3436 		if (set_lvb && !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
3437 			lockres->l_ops->set_lvb(lockres);
3438 	}
3439 
3440 	gen = ocfs2_prepare_downconvert(lockres, new_level);
3441 	spin_unlock_irqrestore(&lockres->l_lock, flags);
3442 	ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb,
3443 				     gen);
3444 
3445 leave:
3446 	mlog_exit(ret);
3447 	return ret;
3448 
3449 leave_requeue:
3450 	spin_unlock_irqrestore(&lockres->l_lock, flags);
3451 	ctl->requeue = 1;
3452 
3453 	mlog_exit(0);
3454 	return 0;
3455 }
3456 
3457 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
3458 				     int blocking)
3459 {
3460 	struct inode *inode;
3461 	struct address_space *mapping;
3462 
3463        	inode = ocfs2_lock_res_inode(lockres);
3464 	mapping = inode->i_mapping;
3465 
3466 	if (!S_ISREG(inode->i_mode))
3467 		goto out;
3468 
3469 	/*
3470 	 * We need this before the filemap_fdatawrite() so that it can
3471 	 * transfer the dirty bit from the PTE to the
3472 	 * page. Unfortunately this means that even for EX->PR
3473 	 * downconverts, we'll lose our mappings and have to build
3474 	 * them up again.
3475 	 */
3476 	unmap_mapping_range(mapping, 0, 0, 0);
3477 
3478 	if (filemap_fdatawrite(mapping)) {
3479 		mlog(ML_ERROR, "Could not sync inode %llu for downconvert!",
3480 		     (unsigned long long)OCFS2_I(inode)->ip_blkno);
3481 	}
3482 	sync_mapping_buffers(mapping);
3483 	if (blocking == DLM_LOCK_EX) {
3484 		truncate_inode_pages(mapping, 0);
3485 	} else {
3486 		/* We only need to wait on the I/O if we're not also
3487 		 * truncating pages because truncate_inode_pages waits
3488 		 * for us above. We don't truncate pages if we're
3489 		 * blocking anything < EXMODE because we want to keep
3490 		 * them around in that case. */
3491 		filemap_fdatawait(mapping);
3492 	}
3493 
3494 out:
3495 	return UNBLOCK_CONTINUE;
3496 }
3497 
3498 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
3499 					int new_level)
3500 {
3501 	struct inode *inode = ocfs2_lock_res_inode(lockres);
3502 	int checkpointed = ocfs2_inode_fully_checkpointed(inode);
3503 
3504 	BUG_ON(new_level != DLM_LOCK_NL && new_level != DLM_LOCK_PR);
3505 	BUG_ON(lockres->l_level != DLM_LOCK_EX && !checkpointed);
3506 
3507 	if (checkpointed)
3508 		return 1;
3509 
3510 	ocfs2_start_checkpoint(OCFS2_SB(inode->i_sb));
3511 	return 0;
3512 }
3513 
3514 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres)
3515 {
3516 	struct inode *inode = ocfs2_lock_res_inode(lockres);
3517 
3518 	__ocfs2_stuff_meta_lvb(inode);
3519 }
3520 
3521 /*
3522  * Does the final reference drop on our dentry lock. Right now this
3523  * happens in the downconvert thread, but we could choose to simplify the
3524  * dlmglue API and push these off to the ocfs2_wq in the future.
3525  */
3526 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
3527 				     struct ocfs2_lock_res *lockres)
3528 {
3529 	struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
3530 	ocfs2_dentry_lock_put(osb, dl);
3531 }
3532 
3533 /*
3534  * d_delete() matching dentries before the lock downconvert.
3535  *
3536  * At this point, any process waiting to destroy the
3537  * dentry_lock due to last ref count is stopped by the
3538  * OCFS2_LOCK_QUEUED flag.
3539  *
3540  * We have two potential problems
3541  *
3542  * 1) If we do the last reference drop on our dentry_lock (via dput)
3543  *    we'll wind up in ocfs2_release_dentry_lock(), waiting on
3544  *    the downconvert to finish. Instead we take an elevated
3545  *    reference and push the drop until after we've completed our
3546  *    unblock processing.
3547  *
3548  * 2) There might be another process with a final reference,
3549  *    waiting on us to finish processing. If this is the case, we
3550  *    detect it and exit out - there's no more dentries anyway.
3551  */
3552 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
3553 				       int blocking)
3554 {
3555 	struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
3556 	struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode);
3557 	struct dentry *dentry;
3558 	unsigned long flags;
3559 	int extra_ref = 0;
3560 
3561 	/*
3562 	 * This node is blocking another node from getting a read
3563 	 * lock. This happens when we've renamed within a
3564 	 * directory. We've forced the other nodes to d_delete(), but
3565 	 * we never actually dropped our lock because it's still
3566 	 * valid. The downconvert code will retain a PR for this node,
3567 	 * so there's no further work to do.
3568 	 */
3569 	if (blocking == DLM_LOCK_PR)
3570 		return UNBLOCK_CONTINUE;
3571 
3572 	/*
3573 	 * Mark this inode as potentially orphaned. The code in
3574 	 * ocfs2_delete_inode() will figure out whether it actually
3575 	 * needs to be freed or not.
3576 	 */
3577 	spin_lock(&oi->ip_lock);
3578 	oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED;
3579 	spin_unlock(&oi->ip_lock);
3580 
3581 	/*
3582 	 * Yuck. We need to make sure however that the check of
3583 	 * OCFS2_LOCK_FREEING and the extra reference are atomic with
3584 	 * respect to a reference decrement or the setting of that
3585 	 * flag.
3586 	 */
3587 	spin_lock_irqsave(&lockres->l_lock, flags);
3588 	spin_lock(&dentry_attach_lock);
3589 	if (!(lockres->l_flags & OCFS2_LOCK_FREEING)
3590 	    && dl->dl_count) {
3591 		dl->dl_count++;
3592 		extra_ref = 1;
3593 	}
3594 	spin_unlock(&dentry_attach_lock);
3595 	spin_unlock_irqrestore(&lockres->l_lock, flags);
3596 
3597 	mlog(0, "extra_ref = %d\n", extra_ref);
3598 
3599 	/*
3600 	 * We have a process waiting on us in ocfs2_dentry_iput(),
3601 	 * which means we can't have any more outstanding
3602 	 * aliases. There's no need to do any more work.
3603 	 */
3604 	if (!extra_ref)
3605 		return UNBLOCK_CONTINUE;
3606 
3607 	spin_lock(&dentry_attach_lock);
3608 	while (1) {
3609 		dentry = ocfs2_find_local_alias(dl->dl_inode,
3610 						dl->dl_parent_blkno, 1);
3611 		if (!dentry)
3612 			break;
3613 		spin_unlock(&dentry_attach_lock);
3614 
3615 		mlog(0, "d_delete(%.*s);\n", dentry->d_name.len,
3616 		     dentry->d_name.name);
3617 
3618 		/*
3619 		 * The following dcache calls may do an
3620 		 * iput(). Normally we don't want that from the
3621 		 * downconverting thread, but in this case it's ok
3622 		 * because the requesting node already has an
3623 		 * exclusive lock on the inode, so it can't be queued
3624 		 * for a downconvert.
3625 		 */
3626 		d_delete(dentry);
3627 		dput(dentry);
3628 
3629 		spin_lock(&dentry_attach_lock);
3630 	}
3631 	spin_unlock(&dentry_attach_lock);
3632 
3633 	/*
3634 	 * If we are the last holder of this dentry lock, there is no
3635 	 * reason to downconvert so skip straight to the unlock.
3636 	 */
3637 	if (dl->dl_count == 1)
3638 		return UNBLOCK_STOP_POST;
3639 
3640 	return UNBLOCK_CONTINUE_POST;
3641 }
3642 
3643 static void ocfs2_set_qinfo_lvb(struct ocfs2_lock_res *lockres)
3644 {
3645 	struct ocfs2_qinfo_lvb *lvb;
3646 	struct ocfs2_mem_dqinfo *oinfo = ocfs2_lock_res_qinfo(lockres);
3647 	struct mem_dqinfo *info = sb_dqinfo(oinfo->dqi_gi.dqi_sb,
3648 					    oinfo->dqi_gi.dqi_type);
3649 
3650 	mlog_entry_void();
3651 
3652 	lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
3653 	lvb->lvb_version = OCFS2_QINFO_LVB_VERSION;
3654 	lvb->lvb_bgrace = cpu_to_be32(info->dqi_bgrace);
3655 	lvb->lvb_igrace = cpu_to_be32(info->dqi_igrace);
3656 	lvb->lvb_syncms = cpu_to_be32(oinfo->dqi_syncms);
3657 	lvb->lvb_blocks = cpu_to_be32(oinfo->dqi_gi.dqi_blocks);
3658 	lvb->lvb_free_blk = cpu_to_be32(oinfo->dqi_gi.dqi_free_blk);
3659 	lvb->lvb_free_entry = cpu_to_be32(oinfo->dqi_gi.dqi_free_entry);
3660 
3661 	mlog_exit_void();
3662 }
3663 
3664 void ocfs2_qinfo_unlock(struct ocfs2_mem_dqinfo *oinfo, int ex)
3665 {
3666 	struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock;
3667 	struct ocfs2_super *osb = OCFS2_SB(oinfo->dqi_gi.dqi_sb);
3668 	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
3669 
3670 	mlog_entry_void();
3671 	if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb))
3672 		ocfs2_cluster_unlock(osb, lockres, level);
3673 	mlog_exit_void();
3674 }
3675 
3676 static int ocfs2_refresh_qinfo(struct ocfs2_mem_dqinfo *oinfo)
3677 {
3678 	struct mem_dqinfo *info = sb_dqinfo(oinfo->dqi_gi.dqi_sb,
3679 					    oinfo->dqi_gi.dqi_type);
3680 	struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock;
3681 	struct ocfs2_qinfo_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
3682 	struct buffer_head *bh = NULL;
3683 	struct ocfs2_global_disk_dqinfo *gdinfo;
3684 	int status = 0;
3685 
3686 	if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) &&
3687 	    lvb->lvb_version == OCFS2_QINFO_LVB_VERSION) {
3688 		info->dqi_bgrace = be32_to_cpu(lvb->lvb_bgrace);
3689 		info->dqi_igrace = be32_to_cpu(lvb->lvb_igrace);
3690 		oinfo->dqi_syncms = be32_to_cpu(lvb->lvb_syncms);
3691 		oinfo->dqi_gi.dqi_blocks = be32_to_cpu(lvb->lvb_blocks);
3692 		oinfo->dqi_gi.dqi_free_blk = be32_to_cpu(lvb->lvb_free_blk);
3693 		oinfo->dqi_gi.dqi_free_entry =
3694 					be32_to_cpu(lvb->lvb_free_entry);
3695 	} else {
3696 		status = ocfs2_read_quota_block(oinfo->dqi_gqinode, 0, &bh);
3697 		if (status) {
3698 			mlog_errno(status);
3699 			goto bail;
3700 		}
3701 		gdinfo = (struct ocfs2_global_disk_dqinfo *)
3702 					(bh->b_data + OCFS2_GLOBAL_INFO_OFF);
3703 		info->dqi_bgrace = le32_to_cpu(gdinfo->dqi_bgrace);
3704 		info->dqi_igrace = le32_to_cpu(gdinfo->dqi_igrace);
3705 		oinfo->dqi_syncms = le32_to_cpu(gdinfo->dqi_syncms);
3706 		oinfo->dqi_gi.dqi_blocks = le32_to_cpu(gdinfo->dqi_blocks);
3707 		oinfo->dqi_gi.dqi_free_blk = le32_to_cpu(gdinfo->dqi_free_blk);
3708 		oinfo->dqi_gi.dqi_free_entry =
3709 					le32_to_cpu(gdinfo->dqi_free_entry);
3710 		brelse(bh);
3711 		ocfs2_track_lock_refresh(lockres);
3712 	}
3713 
3714 bail:
3715 	return status;
3716 }
3717 
3718 /* Lock quota info, this function expects at least shared lock on the quota file
3719  * so that we can safely refresh quota info from disk. */
3720 int ocfs2_qinfo_lock(struct ocfs2_mem_dqinfo *oinfo, int ex)
3721 {
3722 	struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock;
3723 	struct ocfs2_super *osb = OCFS2_SB(oinfo->dqi_gi.dqi_sb);
3724 	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
3725 	int status = 0;
3726 
3727 	mlog_entry_void();
3728 
3729 	/* On RO devices, locking really isn't needed... */
3730 	if (ocfs2_is_hard_readonly(osb)) {
3731 		if (ex)
3732 			status = -EROFS;
3733 		goto bail;
3734 	}
3735 	if (ocfs2_mount_local(osb))
3736 		goto bail;
3737 
3738 	status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
3739 	if (status < 0) {
3740 		mlog_errno(status);
3741 		goto bail;
3742 	}
3743 	if (!ocfs2_should_refresh_lock_res(lockres))
3744 		goto bail;
3745 	/* OK, we have the lock but we need to refresh the quota info */
3746 	status = ocfs2_refresh_qinfo(oinfo);
3747 	if (status)
3748 		ocfs2_qinfo_unlock(oinfo, ex);
3749 	ocfs2_complete_lock_res_refresh(lockres, status);
3750 bail:
3751 	mlog_exit(status);
3752 	return status;
3753 }
3754 
3755 /*
3756  * This is the filesystem locking protocol.  It provides the lock handling
3757  * hooks for the underlying DLM.  It has a maximum version number.
3758  * The version number allows interoperability with systems running at
3759  * the same major number and an equal or smaller minor number.
3760  *
3761  * Whenever the filesystem does new things with locks (adds or removes a
3762  * lock, orders them differently, does different things underneath a lock),
3763  * the version must be changed.  The protocol is negotiated when joining
3764  * the dlm domain.  A node may join the domain if its major version is
3765  * identical to all other nodes and its minor version is greater than
3766  * or equal to all other nodes.  When its minor version is greater than
3767  * the other nodes, it will run at the minor version specified by the
3768  * other nodes.
3769  *
3770  * If a locking change is made that will not be compatible with older
3771  * versions, the major number must be increased and the minor version set
3772  * to zero.  If a change merely adds a behavior that can be disabled when
3773  * speaking to older versions, the minor version must be increased.  If a
3774  * change adds a fully backwards compatible change (eg, LVB changes that
3775  * are just ignored by older versions), the version does not need to be
3776  * updated.
3777  */
3778 static struct ocfs2_locking_protocol lproto = {
3779 	.lp_max_version = {
3780 		.pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
3781 		.pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
3782 	},
3783 	.lp_lock_ast		= ocfs2_locking_ast,
3784 	.lp_blocking_ast	= ocfs2_blocking_ast,
3785 	.lp_unlock_ast		= ocfs2_unlock_ast,
3786 };
3787 
3788 void ocfs2_set_locking_protocol(void)
3789 {
3790 	ocfs2_stack_glue_set_locking_protocol(&lproto);
3791 }
3792 
3793 
3794 static void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
3795 				       struct ocfs2_lock_res *lockres)
3796 {
3797 	int status;
3798 	struct ocfs2_unblock_ctl ctl = {0, 0,};
3799 	unsigned long flags;
3800 
3801 	/* Our reference to the lockres in this function can be
3802 	 * considered valid until we remove the OCFS2_LOCK_QUEUED
3803 	 * flag. */
3804 
3805 	mlog_entry_void();
3806 
3807 	BUG_ON(!lockres);
3808 	BUG_ON(!lockres->l_ops);
3809 
3810 	mlog(0, "lockres %s blocked.\n", lockres->l_name);
3811 
3812 	/* Detect whether a lock has been marked as going away while
3813 	 * the downconvert thread was processing other things. A lock can
3814 	 * still be marked with OCFS2_LOCK_FREEING after this check,
3815 	 * but short circuiting here will still save us some
3816 	 * performance. */
3817 	spin_lock_irqsave(&lockres->l_lock, flags);
3818 	if (lockres->l_flags & OCFS2_LOCK_FREEING)
3819 		goto unqueue;
3820 	spin_unlock_irqrestore(&lockres->l_lock, flags);
3821 
3822 	status = ocfs2_unblock_lock(osb, lockres, &ctl);
3823 	if (status < 0)
3824 		mlog_errno(status);
3825 
3826 	spin_lock_irqsave(&lockres->l_lock, flags);
3827 unqueue:
3828 	if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) {
3829 		lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED);
3830 	} else
3831 		ocfs2_schedule_blocked_lock(osb, lockres);
3832 
3833 	mlog(0, "lockres %s, requeue = %s.\n", lockres->l_name,
3834 	     ctl.requeue ? "yes" : "no");
3835 	spin_unlock_irqrestore(&lockres->l_lock, flags);
3836 
3837 	if (ctl.unblock_action != UNBLOCK_CONTINUE
3838 	    && lockres->l_ops->post_unlock)
3839 		lockres->l_ops->post_unlock(osb, lockres);
3840 
3841 	mlog_exit_void();
3842 }
3843 
3844 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
3845 					struct ocfs2_lock_res *lockres)
3846 {
3847 	mlog_entry_void();
3848 
3849 	assert_spin_locked(&lockres->l_lock);
3850 
3851 	if (lockres->l_flags & OCFS2_LOCK_FREEING) {
3852 		/* Do not schedule a lock for downconvert when it's on
3853 		 * the way to destruction - any nodes wanting access
3854 		 * to the resource will get it soon. */
3855 		mlog(0, "Lockres %s won't be scheduled: flags 0x%lx\n",
3856 		     lockres->l_name, lockres->l_flags);
3857 		return;
3858 	}
3859 
3860 	lockres_or_flags(lockres, OCFS2_LOCK_QUEUED);
3861 
3862 	spin_lock(&osb->dc_task_lock);
3863 	if (list_empty(&lockres->l_blocked_list)) {
3864 		list_add_tail(&lockres->l_blocked_list,
3865 			      &osb->blocked_lock_list);
3866 		osb->blocked_lock_count++;
3867 	}
3868 	spin_unlock(&osb->dc_task_lock);
3869 
3870 	mlog_exit_void();
3871 }
3872 
3873 static void ocfs2_downconvert_thread_do_work(struct ocfs2_super *osb)
3874 {
3875 	unsigned long processed;
3876 	struct ocfs2_lock_res *lockres;
3877 
3878 	mlog_entry_void();
3879 
3880 	spin_lock(&osb->dc_task_lock);
3881 	/* grab this early so we know to try again if a state change and
3882 	 * wake happens part-way through our work  */
3883 	osb->dc_work_sequence = osb->dc_wake_sequence;
3884 
3885 	processed = osb->blocked_lock_count;
3886 	while (processed) {
3887 		BUG_ON(list_empty(&osb->blocked_lock_list));
3888 
3889 		lockres = list_entry(osb->blocked_lock_list.next,
3890 				     struct ocfs2_lock_res, l_blocked_list);
3891 		list_del_init(&lockres->l_blocked_list);
3892 		osb->blocked_lock_count--;
3893 		spin_unlock(&osb->dc_task_lock);
3894 
3895 		BUG_ON(!processed);
3896 		processed--;
3897 
3898 		ocfs2_process_blocked_lock(osb, lockres);
3899 
3900 		spin_lock(&osb->dc_task_lock);
3901 	}
3902 	spin_unlock(&osb->dc_task_lock);
3903 
3904 	mlog_exit_void();
3905 }
3906 
3907 static int ocfs2_downconvert_thread_lists_empty(struct ocfs2_super *osb)
3908 {
3909 	int empty = 0;
3910 
3911 	spin_lock(&osb->dc_task_lock);
3912 	if (list_empty(&osb->blocked_lock_list))
3913 		empty = 1;
3914 
3915 	spin_unlock(&osb->dc_task_lock);
3916 	return empty;
3917 }
3918 
3919 static int ocfs2_downconvert_thread_should_wake(struct ocfs2_super *osb)
3920 {
3921 	int should_wake = 0;
3922 
3923 	spin_lock(&osb->dc_task_lock);
3924 	if (osb->dc_work_sequence != osb->dc_wake_sequence)
3925 		should_wake = 1;
3926 	spin_unlock(&osb->dc_task_lock);
3927 
3928 	return should_wake;
3929 }
3930 
3931 static int ocfs2_downconvert_thread(void *arg)
3932 {
3933 	int status = 0;
3934 	struct ocfs2_super *osb = arg;
3935 
3936 	/* only quit once we've been asked to stop and there is no more
3937 	 * work available */
3938 	while (!(kthread_should_stop() &&
3939 		ocfs2_downconvert_thread_lists_empty(osb))) {
3940 
3941 		wait_event_interruptible(osb->dc_event,
3942 					 ocfs2_downconvert_thread_should_wake(osb) ||
3943 					 kthread_should_stop());
3944 
3945 		mlog(0, "downconvert_thread: awoken\n");
3946 
3947 		ocfs2_downconvert_thread_do_work(osb);
3948 	}
3949 
3950 	osb->dc_task = NULL;
3951 	return status;
3952 }
3953 
3954 void ocfs2_wake_downconvert_thread(struct ocfs2_super *osb)
3955 {
3956 	spin_lock(&osb->dc_task_lock);
3957 	/* make sure the voting thread gets a swipe at whatever changes
3958 	 * the caller may have made to the voting state */
3959 	osb->dc_wake_sequence++;
3960 	spin_unlock(&osb->dc_task_lock);
3961 	wake_up(&osb->dc_event);
3962 }
3963