xref: /openbmc/linux/fs/ocfs2/dlm/dlmunlock.c (revision f21e49be)
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /*
3  * dlmunlock.c
4  *
5  * underlying calls for unlocking locks
6  *
7  * Copyright (C) 2004 Oracle.  All rights reserved.
8  */
9 
10 
11 #include <linux/module.h>
12 #include <linux/fs.h>
13 #include <linux/types.h>
14 #include <linux/highmem.h>
15 #include <linux/init.h>
16 #include <linux/sysctl.h>
17 #include <linux/random.h>
18 #include <linux/blkdev.h>
19 #include <linux/socket.h>
20 #include <linux/inet.h>
21 #include <linux/spinlock.h>
22 #include <linux/delay.h>
23 
24 #include "../cluster/heartbeat.h"
25 #include "../cluster/nodemanager.h"
26 #include "../cluster/tcp.h"
27 
28 #include "dlmapi.h"
29 #include "dlmcommon.h"
30 
31 #define MLOG_MASK_PREFIX ML_DLM
32 #include "../cluster/masklog.h"
33 
34 #define DLM_UNLOCK_FREE_LOCK           0x00000001
35 #define DLM_UNLOCK_CALL_AST            0x00000002
36 #define DLM_UNLOCK_REMOVE_LOCK         0x00000004
37 #define DLM_UNLOCK_REGRANT_LOCK        0x00000008
38 #define DLM_UNLOCK_CLEAR_CONVERT_TYPE  0x00000010
39 
40 
41 static enum dlm_status dlm_get_cancel_actions(struct dlm_ctxt *dlm,
42 					      struct dlm_lock_resource *res,
43 					      struct dlm_lock *lock,
44 					      struct dlm_lockstatus *lksb,
45 					      int *actions);
46 static enum dlm_status dlm_get_unlock_actions(struct dlm_ctxt *dlm,
47 					      struct dlm_lock_resource *res,
48 					      struct dlm_lock *lock,
49 					      struct dlm_lockstatus *lksb,
50 					      int *actions);
51 
52 static enum dlm_status dlm_send_remote_unlock_request(struct dlm_ctxt *dlm,
53 						 struct dlm_lock_resource *res,
54 						 struct dlm_lock *lock,
55 						 struct dlm_lockstatus *lksb,
56 						 int flags,
57 						 u8 owner);
58 
59 
60 /*
61  * according to the spec:
62  * http://opendlm.sourceforge.net/cvsmirror/opendlm/docs/dlmbook_final.pdf
63  *
64  *  flags & LKM_CANCEL != 0: must be converting or blocked
65  *  flags & LKM_CANCEL == 0: must be granted
66  *
67  * So to unlock a converting lock, you must first cancel the
68  * convert (passing LKM_CANCEL in flags), then call the unlock
69  * again (with no LKM_CANCEL in flags).
70  */
71 
72 
73 /*
74  * locking:
75  *   caller needs:  none
76  *   taken:         res->spinlock and lock->spinlock taken and dropped
77  *   held on exit:  none
78  * returns: DLM_NORMAL, DLM_NOLOCKMGR, status from network
79  * all callers should have taken an extra ref on lock coming in
80  */
81 static enum dlm_status dlmunlock_common(struct dlm_ctxt *dlm,
82 					struct dlm_lock_resource *res,
83 					struct dlm_lock *lock,
84 					struct dlm_lockstatus *lksb,
85 					int flags, int *call_ast,
86 					int master_node)
87 {
88 	enum dlm_status status;
89 	int actions = 0;
90 	int in_use;
91 	u8 owner;
92 	int recovery_wait = 0;
93 
94 	mlog(0, "master_node = %d, valblk = %d\n", master_node,
95 	     flags & LKM_VALBLK);
96 
97 	if (master_node)
98 		BUG_ON(res->owner != dlm->node_num);
99 	else
100 		BUG_ON(res->owner == dlm->node_num);
101 
102 	spin_lock(&dlm->ast_lock);
103 	/* We want to be sure that we're not freeing a lock
104 	 * that still has AST's pending... */
105 	in_use = !list_empty(&lock->ast_list);
106 	spin_unlock(&dlm->ast_lock);
107 	if (in_use && !(flags & LKM_CANCEL)) {
108 	       mlog(ML_ERROR, "lockres %.*s: Someone is calling dlmunlock "
109 		    "while waiting for an ast!", res->lockname.len,
110 		    res->lockname.name);
111 		return DLM_BADPARAM;
112 	}
113 
114 	spin_lock(&res->spinlock);
115 	if (res->state & DLM_LOCK_RES_IN_PROGRESS) {
116 		if (master_node && !(flags & LKM_CANCEL)) {
117 			mlog(ML_ERROR, "lockres in progress!\n");
118 			spin_unlock(&res->spinlock);
119 			return DLM_FORWARD;
120 		}
121 		/* ok for this to sleep if not in a network handler */
122 		__dlm_wait_on_lockres(res);
123 		res->state |= DLM_LOCK_RES_IN_PROGRESS;
124 	}
125 	spin_lock(&lock->spinlock);
126 
127 	if (res->state & DLM_LOCK_RES_RECOVERING) {
128 		status = DLM_RECOVERING;
129 		goto leave;
130 	}
131 
132 	if (res->state & DLM_LOCK_RES_MIGRATING) {
133 		status = DLM_MIGRATING;
134 		goto leave;
135 	}
136 
137 	/* see above for what the spec says about
138 	 * LKM_CANCEL and the lock queue state */
139 	if (flags & LKM_CANCEL)
140 		status = dlm_get_cancel_actions(dlm, res, lock, lksb, &actions);
141 	else
142 		status = dlm_get_unlock_actions(dlm, res, lock, lksb, &actions);
143 
144 	if (status != DLM_NORMAL && (status != DLM_CANCELGRANT || !master_node))
145 		goto leave;
146 
147 	/* By now this has been masked out of cancel requests. */
148 	if (flags & LKM_VALBLK) {
149 		/* make the final update to the lvb */
150 		if (master_node)
151 			memcpy(res->lvb, lksb->lvb, DLM_LVB_LEN);
152 		else
153 			flags |= LKM_PUT_LVB; /* let the send function
154 					       * handle it. */
155 	}
156 
157 	if (!master_node) {
158 		owner = res->owner;
159 		/* drop locks and send message */
160 		if (flags & LKM_CANCEL)
161 			lock->cancel_pending = 1;
162 		else
163 			lock->unlock_pending = 1;
164 		spin_unlock(&lock->spinlock);
165 		spin_unlock(&res->spinlock);
166 		status = dlm_send_remote_unlock_request(dlm, res, lock, lksb,
167 							flags, owner);
168 		spin_lock(&res->spinlock);
169 		spin_lock(&lock->spinlock);
170 		/* if the master told us the lock was already granted,
171 		 * let the ast handle all of these actions */
172 		if (status == DLM_CANCELGRANT) {
173 			actions &= ~(DLM_UNLOCK_REMOVE_LOCK|
174 				     DLM_UNLOCK_REGRANT_LOCK|
175 				     DLM_UNLOCK_CLEAR_CONVERT_TYPE);
176 		} else if (status == DLM_RECOVERING ||
177 			   status == DLM_MIGRATING ||
178 			   status == DLM_FORWARD ||
179 			   status == DLM_NOLOCKMGR
180 			   ) {
181 			/* must clear the actions because this unlock
182 			 * is about to be retried.  cannot free or do
183 			 * any list manipulation. */
184 			mlog(0, "%s:%.*s: clearing actions, %s\n",
185 			     dlm->name, res->lockname.len,
186 			     res->lockname.name,
187 			     status==DLM_RECOVERING?"recovering":
188 			     (status==DLM_MIGRATING?"migrating":
189 				(status == DLM_FORWARD ? "forward" :
190 						"nolockmanager")));
191 			actions = 0;
192 		}
193 		if (flags & LKM_CANCEL)
194 			lock->cancel_pending = 0;
195 		else {
196 			if (!lock->unlock_pending)
197 				recovery_wait = 1;
198 			else
199 				lock->unlock_pending = 0;
200 		}
201 	}
202 
203 	/* get an extra ref on lock.  if we are just switching
204 	 * lists here, we dont want the lock to go away. */
205 	dlm_lock_get(lock);
206 
207 	if (actions & DLM_UNLOCK_REMOVE_LOCK) {
208 		list_del_init(&lock->list);
209 		dlm_lock_put(lock);
210 	}
211 	if (actions & DLM_UNLOCK_REGRANT_LOCK) {
212 		dlm_lock_get(lock);
213 		list_add_tail(&lock->list, &res->granted);
214 	}
215 	if (actions & DLM_UNLOCK_CLEAR_CONVERT_TYPE) {
216 		mlog(0, "clearing convert_type at %smaster node\n",
217 		     master_node ? "" : "non-");
218 		lock->ml.convert_type = LKM_IVMODE;
219 	}
220 
221 	/* remove the extra ref on lock */
222 	dlm_lock_put(lock);
223 
224 leave:
225 	res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
226 	if (!dlm_lock_on_list(&res->converting, lock))
227 		BUG_ON(lock->ml.convert_type != LKM_IVMODE);
228 	else
229 		BUG_ON(lock->ml.convert_type == LKM_IVMODE);
230 	spin_unlock(&lock->spinlock);
231 	spin_unlock(&res->spinlock);
232 	wake_up(&res->wq);
233 
234 	if (recovery_wait) {
235 		spin_lock(&res->spinlock);
236 		/* Unlock request will directly succeed after owner dies,
237 		 * and the lock is already removed from grant list. We have to
238 		 * wait for RECOVERING done or we miss the chance to purge it
239 		 * since the removement is much faster than RECOVERING proc.
240 		 */
241 		__dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_RECOVERING);
242 		spin_unlock(&res->spinlock);
243 	}
244 
245 	/* let the caller's final dlm_lock_put handle the actual kfree */
246 	if (actions & DLM_UNLOCK_FREE_LOCK) {
247 		/* this should always be coupled with list removal */
248 		BUG_ON(!(actions & DLM_UNLOCK_REMOVE_LOCK));
249 		mlog(0, "lock %u:%llu should be gone now! refs=%d\n",
250 		     dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)),
251 		     dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)),
252 		     kref_read(&lock->lock_refs)-1);
253 		dlm_lock_put(lock);
254 	}
255 	if (actions & DLM_UNLOCK_CALL_AST)
256 		*call_ast = 1;
257 
258 	/* if cancel or unlock succeeded, lvb work is done */
259 	if (status == DLM_NORMAL)
260 		lksb->flags &= ~(DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB);
261 
262 	return status;
263 }
264 
265 void dlm_commit_pending_unlock(struct dlm_lock_resource *res,
266 			       struct dlm_lock *lock)
267 {
268 	/* leave DLM_LKSB_PUT_LVB on the lksb so any final
269 	 * update of the lvb will be sent to the new master */
270 	list_del_init(&lock->list);
271 }
272 
273 void dlm_commit_pending_cancel(struct dlm_lock_resource *res,
274 			       struct dlm_lock *lock)
275 {
276 	list_move_tail(&lock->list, &res->granted);
277 	lock->ml.convert_type = LKM_IVMODE;
278 }
279 
280 
281 static inline enum dlm_status dlmunlock_master(struct dlm_ctxt *dlm,
282 					  struct dlm_lock_resource *res,
283 					  struct dlm_lock *lock,
284 					  struct dlm_lockstatus *lksb,
285 					  int flags,
286 					  int *call_ast)
287 {
288 	return dlmunlock_common(dlm, res, lock, lksb, flags, call_ast, 1);
289 }
290 
291 static inline enum dlm_status dlmunlock_remote(struct dlm_ctxt *dlm,
292 					  struct dlm_lock_resource *res,
293 					  struct dlm_lock *lock,
294 					  struct dlm_lockstatus *lksb,
295 					  int flags, int *call_ast)
296 {
297 	return dlmunlock_common(dlm, res, lock, lksb, flags, call_ast, 0);
298 }
299 
300 /*
301  * locking:
302  *   caller needs:  none
303  *   taken:         none
304  *   held on exit:  none
305  * returns: DLM_NORMAL, DLM_NOLOCKMGR, status from network
306  */
307 static enum dlm_status dlm_send_remote_unlock_request(struct dlm_ctxt *dlm,
308 						 struct dlm_lock_resource *res,
309 						 struct dlm_lock *lock,
310 						 struct dlm_lockstatus *lksb,
311 						 int flags,
312 						 u8 owner)
313 {
314 	struct dlm_unlock_lock unlock;
315 	int tmpret;
316 	enum dlm_status ret;
317 	int status = 0;
318 	struct kvec vec[2];
319 	size_t veclen = 1;
320 
321 	mlog(0, "%.*s\n", res->lockname.len, res->lockname.name);
322 
323 	if (owner == dlm->node_num) {
324 		/* ended up trying to contact ourself.  this means
325 		 * that the lockres had been remote but became local
326 		 * via a migration.  just retry it, now as local */
327 		mlog(0, "%s:%.*s: this node became the master due to a "
328 		     "migration, re-evaluate now\n", dlm->name,
329 		     res->lockname.len, res->lockname.name);
330 		return DLM_FORWARD;
331 	}
332 
333 	memset(&unlock, 0, sizeof(unlock));
334 	unlock.node_idx = dlm->node_num;
335 	unlock.flags = cpu_to_be32(flags);
336 	unlock.cookie = lock->ml.cookie;
337 	unlock.namelen = res->lockname.len;
338 	memcpy(unlock.name, res->lockname.name, unlock.namelen);
339 
340 	vec[0].iov_len = sizeof(struct dlm_unlock_lock);
341 	vec[0].iov_base = &unlock;
342 
343 	if (flags & LKM_PUT_LVB) {
344 		/* extra data to send if we are updating lvb */
345 		vec[1].iov_len = DLM_LVB_LEN;
346 		vec[1].iov_base = lock->lksb->lvb;
347 		veclen++;
348 	}
349 
350 	tmpret = o2net_send_message_vec(DLM_UNLOCK_LOCK_MSG, dlm->key,
351 					vec, veclen, owner, &status);
352 	if (tmpret >= 0) {
353 		// successfully sent and received
354 		if (status == DLM_FORWARD)
355 			mlog(0, "master was in-progress.  retry\n");
356 		ret = status;
357 	} else {
358 		mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to "
359 		     "node %u\n", tmpret, DLM_UNLOCK_LOCK_MSG, dlm->key, owner);
360 		if (dlm_is_host_down(tmpret)) {
361 			/* NOTE: this seems strange, but it is what we want.
362 			 * when the master goes down during a cancel or
363 			 * unlock, the recovery code completes the operation
364 			 * as if the master had not died, then passes the
365 			 * updated state to the recovery master.  this thread
366 			 * just needs to finish out the operation and call
367 			 * the unlockast. */
368 			if (dlm_is_node_dead(dlm, owner))
369 				ret = DLM_NORMAL;
370 			else
371 				ret = DLM_NOLOCKMGR;
372 		} else {
373 			/* something bad.  this will BUG in ocfs2 */
374 			ret = dlm_err_to_dlm_status(tmpret);
375 		}
376 	}
377 
378 	return ret;
379 }
380 
381 /*
382  * locking:
383  *   caller needs:  none
384  *   taken:         takes and drops res->spinlock
385  *   held on exit:  none
386  * returns: DLM_NORMAL, DLM_BADARGS, DLM_IVLOCKID,
387  *          return value from dlmunlock_master
388  */
389 int dlm_unlock_lock_handler(struct o2net_msg *msg, u32 len, void *data,
390 			    void **ret_data)
391 {
392 	struct dlm_ctxt *dlm = data;
393 	struct dlm_unlock_lock *unlock = (struct dlm_unlock_lock *)msg->buf;
394 	struct dlm_lock_resource *res = NULL;
395 	struct dlm_lock *lock = NULL;
396 	enum dlm_status status = DLM_NORMAL;
397 	int found = 0, i;
398 	struct dlm_lockstatus *lksb = NULL;
399 	int ignore;
400 	u32 flags;
401 	struct list_head *queue;
402 
403 	flags = be32_to_cpu(unlock->flags);
404 
405 	if (flags & LKM_GET_LVB) {
406 		mlog(ML_ERROR, "bad args!  GET_LVB specified on unlock!\n");
407 		return DLM_BADARGS;
408 	}
409 
410 	if ((flags & (LKM_PUT_LVB|LKM_CANCEL)) == (LKM_PUT_LVB|LKM_CANCEL)) {
411 		mlog(ML_ERROR, "bad args!  cannot modify lvb on a CANCEL "
412 		     "request!\n");
413 		return DLM_BADARGS;
414 	}
415 
416 	if (unlock->namelen > DLM_LOCKID_NAME_MAX) {
417 		mlog(ML_ERROR, "Invalid name length in unlock handler!\n");
418 		return DLM_IVBUFLEN;
419 	}
420 
421 	if (!dlm_grab(dlm))
422 		return DLM_FORWARD;
423 
424 	mlog_bug_on_msg(!dlm_domain_fully_joined(dlm),
425 			"Domain %s not fully joined!\n", dlm->name);
426 
427 	mlog(0, "lvb: %s\n", flags & LKM_PUT_LVB ? "put lvb" : "none");
428 
429 	res = dlm_lookup_lockres(dlm, unlock->name, unlock->namelen);
430 	if (!res) {
431 		/* We assume here that a no lock resource simply means
432 		 * it was migrated away and destroyed before the other
433 		 * node could detect it. */
434 		mlog(0, "returning DLM_FORWARD -- res no longer exists\n");
435 		status = DLM_FORWARD;
436 		goto not_found;
437 	}
438 
439 	queue=&res->granted;
440 	found = 0;
441 	spin_lock(&res->spinlock);
442 	if (res->state & DLM_LOCK_RES_RECOVERING) {
443 		spin_unlock(&res->spinlock);
444 		mlog(0, "returning DLM_RECOVERING\n");
445 		status = DLM_RECOVERING;
446 		goto leave;
447 	}
448 
449 	if (res->state & DLM_LOCK_RES_MIGRATING) {
450 		spin_unlock(&res->spinlock);
451 		mlog(0, "returning DLM_MIGRATING\n");
452 		status = DLM_MIGRATING;
453 		goto leave;
454 	}
455 
456 	if (res->owner != dlm->node_num) {
457 		spin_unlock(&res->spinlock);
458 		mlog(0, "returning DLM_FORWARD -- not master\n");
459 		status = DLM_FORWARD;
460 		goto leave;
461 	}
462 
463 	for (i=0; i<3; i++) {
464 		list_for_each_entry(lock, queue, list) {
465 			if (lock->ml.cookie == unlock->cookie &&
466 		    	    lock->ml.node == unlock->node_idx) {
467 				dlm_lock_get(lock);
468 				found = 1;
469 				break;
470 			}
471 		}
472 		if (found)
473 			break;
474 		/* scan granted -> converting -> blocked queues */
475 		queue++;
476 	}
477 	spin_unlock(&res->spinlock);
478 	if (!found) {
479 		status = DLM_IVLOCKID;
480 		goto not_found;
481 	}
482 
483 	/* lock was found on queue */
484 	lksb = lock->lksb;
485 	if (flags & (LKM_VALBLK|LKM_PUT_LVB) &&
486 	    lock->ml.type != LKM_EXMODE)
487 		flags &= ~(LKM_VALBLK|LKM_PUT_LVB);
488 
489 	/* unlockast only called on originating node */
490 	if (flags & LKM_PUT_LVB) {
491 		lksb->flags |= DLM_LKSB_PUT_LVB;
492 		memcpy(&lksb->lvb[0], &unlock->lvb[0], DLM_LVB_LEN);
493 	}
494 
495 	/* if this is in-progress, propagate the DLM_FORWARD
496 	 * all the way back out */
497 	status = dlmunlock_master(dlm, res, lock, lksb, flags, &ignore);
498 	if (status == DLM_FORWARD)
499 		mlog(0, "lockres is in progress\n");
500 
501 	if (flags & LKM_PUT_LVB)
502 		lksb->flags &= ~DLM_LKSB_PUT_LVB;
503 
504 	dlm_lockres_calc_usage(dlm, res);
505 	dlm_kick_thread(dlm, res);
506 
507 not_found:
508 	if (!found)
509 		mlog(ML_ERROR, "failed to find lock to unlock! "
510 			       "cookie=%u:%llu\n",
511 		     dlm_get_lock_cookie_node(be64_to_cpu(unlock->cookie)),
512 		     dlm_get_lock_cookie_seq(be64_to_cpu(unlock->cookie)));
513 	else
514 		dlm_lock_put(lock);
515 
516 leave:
517 	if (res)
518 		dlm_lockres_put(res);
519 
520 	dlm_put(dlm);
521 
522 	return status;
523 }
524 
525 
526 static enum dlm_status dlm_get_cancel_actions(struct dlm_ctxt *dlm,
527 					      struct dlm_lock_resource *res,
528 					      struct dlm_lock *lock,
529 					      struct dlm_lockstatus *lksb,
530 					      int *actions)
531 {
532 	enum dlm_status status;
533 
534 	if (dlm_lock_on_list(&res->blocked, lock)) {
535 		/* cancel this outright */
536 		status = DLM_NORMAL;
537 		*actions = (DLM_UNLOCK_CALL_AST |
538 			    DLM_UNLOCK_REMOVE_LOCK);
539 	} else if (dlm_lock_on_list(&res->converting, lock)) {
540 		/* cancel the request, put back on granted */
541 		status = DLM_NORMAL;
542 		*actions = (DLM_UNLOCK_CALL_AST |
543 			    DLM_UNLOCK_REMOVE_LOCK |
544 			    DLM_UNLOCK_REGRANT_LOCK |
545 			    DLM_UNLOCK_CLEAR_CONVERT_TYPE);
546 	} else if (dlm_lock_on_list(&res->granted, lock)) {
547 		/* too late, already granted. */
548 		status = DLM_CANCELGRANT;
549 		*actions = DLM_UNLOCK_CALL_AST;
550 	} else {
551 		mlog(ML_ERROR, "lock to cancel is not on any list!\n");
552 		status = DLM_IVLOCKID;
553 		*actions = 0;
554 	}
555 	return status;
556 }
557 
558 static enum dlm_status dlm_get_unlock_actions(struct dlm_ctxt *dlm,
559 					      struct dlm_lock_resource *res,
560 					      struct dlm_lock *lock,
561 					      struct dlm_lockstatus *lksb,
562 					      int *actions)
563 {
564 	enum dlm_status status;
565 
566 	/* unlock request */
567 	if (!dlm_lock_on_list(&res->granted, lock)) {
568 		status = DLM_DENIED;
569 		dlm_error(status);
570 		*actions = 0;
571 	} else {
572 		/* unlock granted lock */
573 		status = DLM_NORMAL;
574 		*actions = (DLM_UNLOCK_FREE_LOCK |
575 			    DLM_UNLOCK_CALL_AST |
576 			    DLM_UNLOCK_REMOVE_LOCK);
577 	}
578 	return status;
579 }
580 
581 /* there seems to be no point in doing this async
582  * since (even for the remote case) there is really
583  * no work to queue up... so just do it and fire the
584  * unlockast by hand when done... */
585 enum dlm_status dlmunlock(struct dlm_ctxt *dlm, struct dlm_lockstatus *lksb,
586 			  int flags, dlm_astunlockfunc_t *unlockast, void *data)
587 {
588 	enum dlm_status status;
589 	struct dlm_lock_resource *res;
590 	struct dlm_lock *lock = NULL;
591 	int call_ast, is_master;
592 
593 	if (!lksb) {
594 		dlm_error(DLM_BADARGS);
595 		return DLM_BADARGS;
596 	}
597 
598 	if (flags & ~(LKM_CANCEL | LKM_VALBLK | LKM_INVVALBLK)) {
599 		dlm_error(DLM_BADPARAM);
600 		return DLM_BADPARAM;
601 	}
602 
603 	if ((flags & (LKM_VALBLK | LKM_CANCEL)) == (LKM_VALBLK | LKM_CANCEL)) {
604 		mlog(0, "VALBLK given with CANCEL: ignoring VALBLK\n");
605 		flags &= ~LKM_VALBLK;
606 	}
607 
608 	if (!lksb->lockid || !lksb->lockid->lockres) {
609 		dlm_error(DLM_BADPARAM);
610 		return DLM_BADPARAM;
611 	}
612 
613 	lock = lksb->lockid;
614 	BUG_ON(!lock);
615 	dlm_lock_get(lock);
616 
617 	res = lock->lockres;
618 	BUG_ON(!res);
619 	dlm_lockres_get(res);
620 retry:
621 	call_ast = 0;
622 	/* need to retry up here because owner may have changed */
623 	mlog(0, "lock=%p res=%p\n", lock, res);
624 
625 	spin_lock(&res->spinlock);
626 	is_master = (res->owner == dlm->node_num);
627 	if (flags & LKM_VALBLK && lock->ml.type != LKM_EXMODE)
628 		flags &= ~LKM_VALBLK;
629 	spin_unlock(&res->spinlock);
630 
631 	if (is_master) {
632 		status = dlmunlock_master(dlm, res, lock, lksb, flags,
633 					  &call_ast);
634 		mlog(0, "done calling dlmunlock_master: returned %d, "
635 		     "call_ast is %d\n", status, call_ast);
636 	} else {
637 		status = dlmunlock_remote(dlm, res, lock, lksb, flags,
638 					  &call_ast);
639 		mlog(0, "done calling dlmunlock_remote: returned %d, "
640 		     "call_ast is %d\n", status, call_ast);
641 	}
642 
643 	if (status == DLM_RECOVERING ||
644 	    status == DLM_MIGRATING ||
645 	    status == DLM_FORWARD ||
646 	    status == DLM_NOLOCKMGR) {
647 
648 		/* We want to go away for a tiny bit to allow recovery
649 		 * / migration to complete on this resource. I don't
650 		 * know of any wait queue we could sleep on as this
651 		 * may be happening on another node. Perhaps the
652 		 * proper solution is to queue up requests on the
653 		 * other end? */
654 
655 		/* do we want to yield(); ?? */
656 		msleep(50);
657 
658 		mlog(0, "retrying unlock due to pending recovery/"
659 		     "migration/in-progress/reconnect\n");
660 		goto retry;
661 	}
662 
663 	if (call_ast) {
664 		mlog(0, "calling unlockast(%p, %d)\n", data, status);
665 		if (is_master) {
666 			/* it is possible that there is one last bast
667 			 * pending.  make sure it is flushed, then
668 			 * call the unlockast.
669 			 * not an issue if this is a mastered remotely,
670 			 * since this lock has been removed from the
671 			 * lockres queues and cannot be found. */
672 			dlm_kick_thread(dlm, NULL);
673 			wait_event(dlm->ast_wq,
674 				   dlm_lock_basts_flushed(dlm, lock));
675 		}
676 		(*unlockast)(data, status);
677 	}
678 
679 	if (status == DLM_CANCELGRANT)
680 		status = DLM_NORMAL;
681 
682 	if (status == DLM_NORMAL) {
683 		mlog(0, "kicking the thread\n");
684 		dlm_kick_thread(dlm, res);
685 	} else
686 		dlm_error(status);
687 
688 	dlm_lockres_calc_usage(dlm, res);
689 	dlm_lockres_put(res);
690 	dlm_lock_put(lock);
691 
692 	mlog(0, "returning status=%d!\n", status);
693 	return status;
694 }
695 EXPORT_SYMBOL_GPL(dlmunlock);
696 
697