xref: /openbmc/linux/fs/ocfs2/dlm/dlmunlock.c (revision 87c2ce3b)
1 /* -*- mode: c; c-basic-offset: 8; -*-
2  * vim: noexpandtab sw=8 ts=8 sts=0:
3  *
4  * dlmunlock.c
5  *
6  * underlying calls for unlocking locks
7  *
8  * Copyright (C) 2004 Oracle.  All rights reserved.
9  *
10  * This program is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public
21  * License along with this program; if not, write to the
22  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23  * Boston, MA 021110-1307, USA.
24  *
25  */
26 
27 
28 #include <linux/module.h>
29 #include <linux/fs.h>
30 #include <linux/types.h>
31 #include <linux/slab.h>
32 #include <linux/highmem.h>
33 #include <linux/utsname.h>
34 #include <linux/init.h>
35 #include <linux/sysctl.h>
36 #include <linux/random.h>
37 #include <linux/blkdev.h>
38 #include <linux/socket.h>
39 #include <linux/inet.h>
40 #include <linux/spinlock.h>
41 #include <linux/delay.h>
42 
43 #include "cluster/heartbeat.h"
44 #include "cluster/nodemanager.h"
45 #include "cluster/tcp.h"
46 
47 #include "dlmapi.h"
48 #include "dlmcommon.h"
49 
50 #define MLOG_MASK_PREFIX ML_DLM
51 #include "cluster/masklog.h"
52 
53 #define DLM_UNLOCK_FREE_LOCK           0x00000001
54 #define DLM_UNLOCK_CALL_AST            0x00000002
55 #define DLM_UNLOCK_REMOVE_LOCK         0x00000004
56 #define DLM_UNLOCK_REGRANT_LOCK        0x00000008
57 #define DLM_UNLOCK_CLEAR_CONVERT_TYPE  0x00000010
58 
59 
60 static enum dlm_status dlm_get_cancel_actions(struct dlm_ctxt *dlm,
61 					      struct dlm_lock_resource *res,
62 					      struct dlm_lock *lock,
63 					      struct dlm_lockstatus *lksb,
64 					      int *actions);
65 static enum dlm_status dlm_get_unlock_actions(struct dlm_ctxt *dlm,
66 					      struct dlm_lock_resource *res,
67 					      struct dlm_lock *lock,
68 					      struct dlm_lockstatus *lksb,
69 					      int *actions);
70 
71 static enum dlm_status dlm_send_remote_unlock_request(struct dlm_ctxt *dlm,
72 						 struct dlm_lock_resource *res,
73 						 struct dlm_lock *lock,
74 						 struct dlm_lockstatus *lksb,
75 						 int flags,
76 						 u8 owner);
77 
78 
79 /*
80  * according to the spec:
81  * http://opendlm.sourceforge.net/cvsmirror/opendlm/docs/dlmbook_final.pdf
82  *
83  *  flags & LKM_CANCEL != 0: must be converting or blocked
84  *  flags & LKM_CANCEL == 0: must be granted
85  *
86  * So to unlock a converting lock, you must first cancel the
87  * convert (passing LKM_CANCEL in flags), then call the unlock
88  * again (with no LKM_CANCEL in flags).
89  */
90 
91 
92 /*
93  * locking:
94  *   caller needs:  none
95  *   taken:         res->spinlock and lock->spinlock taken and dropped
96  *   held on exit:  none
97  * returns: DLM_NORMAL, DLM_NOLOCKMGR, status from network
98  * all callers should have taken an extra ref on lock coming in
99  */
100 static enum dlm_status dlmunlock_common(struct dlm_ctxt *dlm,
101 					struct dlm_lock_resource *res,
102 					struct dlm_lock *lock,
103 					struct dlm_lockstatus *lksb,
104 					int flags, int *call_ast,
105 					int master_node)
106 {
107 	enum dlm_status status;
108 	int actions = 0;
109 	int in_use;
110         u8 owner;
111 
112 	mlog(0, "master_node = %d, valblk = %d\n", master_node,
113 	     flags & LKM_VALBLK);
114 
115 	if (master_node)
116 		BUG_ON(res->owner != dlm->node_num);
117 	else
118 		BUG_ON(res->owner == dlm->node_num);
119 
120 	spin_lock(&dlm->spinlock);
121 	/* We want to be sure that we're not freeing a lock
122 	 * that still has AST's pending... */
123 	in_use = !list_empty(&lock->ast_list);
124 	spin_unlock(&dlm->spinlock);
125 	if (in_use) {
126 	       mlog(ML_ERROR, "lockres %.*s: Someone is calling dlmunlock "
127 		    "while waiting for an ast!", res->lockname.len,
128 		    res->lockname.name);
129 		return DLM_BADPARAM;
130 	}
131 
132 	spin_lock(&res->spinlock);
133 	if (res->state & DLM_LOCK_RES_IN_PROGRESS) {
134 		if (master_node) {
135 			mlog(ML_ERROR, "lockres in progress!\n");
136 			spin_unlock(&res->spinlock);
137 			return DLM_FORWARD;
138 		}
139 		/* ok for this to sleep if not in a network handler */
140 		__dlm_wait_on_lockres(res);
141 		res->state |= DLM_LOCK_RES_IN_PROGRESS;
142 	}
143 	spin_lock(&lock->spinlock);
144 
145 	if (res->state & DLM_LOCK_RES_RECOVERING) {
146 		status = DLM_RECOVERING;
147 		goto leave;
148 	}
149 
150 
151 	/* see above for what the spec says about
152 	 * LKM_CANCEL and the lock queue state */
153 	if (flags & LKM_CANCEL)
154 		status = dlm_get_cancel_actions(dlm, res, lock, lksb, &actions);
155 	else
156 		status = dlm_get_unlock_actions(dlm, res, lock, lksb, &actions);
157 
158 	if (status != DLM_NORMAL)
159 		goto leave;
160 
161 	/* By now this has been masked out of cancel requests. */
162 	if (flags & LKM_VALBLK) {
163 		/* make the final update to the lvb */
164 		if (master_node)
165 			memcpy(res->lvb, lksb->lvb, DLM_LVB_LEN);
166 		else
167 			flags |= LKM_PUT_LVB; /* let the send function
168 					       * handle it. */
169 	}
170 
171 	if (!master_node) {
172 		owner = res->owner;
173 		/* drop locks and send message */
174 		if (flags & LKM_CANCEL)
175 			lock->cancel_pending = 1;
176 		else
177 			lock->unlock_pending = 1;
178 		spin_unlock(&lock->spinlock);
179 		spin_unlock(&res->spinlock);
180 		status = dlm_send_remote_unlock_request(dlm, res, lock, lksb,
181 							flags, owner);
182 		spin_lock(&res->spinlock);
183 		spin_lock(&lock->spinlock);
184 		/* if the master told us the lock was already granted,
185 		 * let the ast handle all of these actions */
186 		if (status == DLM_NORMAL &&
187 		    lksb->status == DLM_CANCELGRANT) {
188 			actions &= ~(DLM_UNLOCK_REMOVE_LOCK|
189 				     DLM_UNLOCK_REGRANT_LOCK|
190 				     DLM_UNLOCK_CLEAR_CONVERT_TYPE);
191 		}
192 		if (flags & LKM_CANCEL)
193 			lock->cancel_pending = 0;
194 		else
195 			lock->unlock_pending = 0;
196 
197 	}
198 
199 	/* get an extra ref on lock.  if we are just switching
200 	 * lists here, we dont want the lock to go away. */
201 	dlm_lock_get(lock);
202 
203 	if (actions & DLM_UNLOCK_REMOVE_LOCK) {
204 		list_del_init(&lock->list);
205 		dlm_lock_put(lock);
206 	}
207 	if (actions & DLM_UNLOCK_REGRANT_LOCK) {
208 		dlm_lock_get(lock);
209 		list_add_tail(&lock->list, &res->granted);
210 	}
211 	if (actions & DLM_UNLOCK_CLEAR_CONVERT_TYPE) {
212 		mlog(0, "clearing convert_type at %smaster node\n",
213 		     master_node ? "" : "non-");
214 		lock->ml.convert_type = LKM_IVMODE;
215 	}
216 
217 	/* remove the extra ref on lock */
218 	dlm_lock_put(lock);
219 
220 leave:
221 	res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
222 	if (!dlm_lock_on_list(&res->converting, lock))
223 		BUG_ON(lock->ml.convert_type != LKM_IVMODE);
224 	else
225 		BUG_ON(lock->ml.convert_type == LKM_IVMODE);
226 	spin_unlock(&lock->spinlock);
227 	spin_unlock(&res->spinlock);
228 	wake_up(&res->wq);
229 
230 	/* let the caller's final dlm_lock_put handle the actual kfree */
231 	if (actions & DLM_UNLOCK_FREE_LOCK) {
232 		/* this should always be coupled with list removal */
233 		BUG_ON(!(actions & DLM_UNLOCK_REMOVE_LOCK));
234 		mlog(0, "lock %"MLFu64" should be gone now! refs=%d\n",
235 		     lock->ml.cookie, atomic_read(&lock->lock_refs.refcount)-1);
236 		dlm_lock_put(lock);
237 	}
238 	if (actions & DLM_UNLOCK_CALL_AST)
239 		*call_ast = 1;
240 
241 	/* if cancel or unlock succeeded, lvb work is done */
242 	if (status == DLM_NORMAL)
243 		lksb->flags &= ~(DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB);
244 
245 	return status;
246 }
247 
248 void dlm_commit_pending_unlock(struct dlm_lock_resource *res,
249 			       struct dlm_lock *lock)
250 {
251 	/* leave DLM_LKSB_PUT_LVB on the lksb so any final
252 	 * update of the lvb will be sent to the new master */
253 	list_del_init(&lock->list);
254 }
255 
256 void dlm_commit_pending_cancel(struct dlm_lock_resource *res,
257 			       struct dlm_lock *lock)
258 {
259 	list_del_init(&lock->list);
260 	list_add_tail(&lock->list, &res->granted);
261 	lock->ml.convert_type = LKM_IVMODE;
262 }
263 
264 
265 static inline enum dlm_status dlmunlock_master(struct dlm_ctxt *dlm,
266 					  struct dlm_lock_resource *res,
267 					  struct dlm_lock *lock,
268 					  struct dlm_lockstatus *lksb,
269 					  int flags,
270 					  int *call_ast)
271 {
272 	return dlmunlock_common(dlm, res, lock, lksb, flags, call_ast, 1);
273 }
274 
275 static inline enum dlm_status dlmunlock_remote(struct dlm_ctxt *dlm,
276 					  struct dlm_lock_resource *res,
277 					  struct dlm_lock *lock,
278 					  struct dlm_lockstatus *lksb,
279 					  int flags, int *call_ast)
280 {
281 	return dlmunlock_common(dlm, res, lock, lksb, flags, call_ast, 0);
282 }
283 
284 /*
285  * locking:
286  *   caller needs:  none
287  *   taken:         none
288  *   held on exit:  none
289  * returns: DLM_NORMAL, DLM_NOLOCKMGR, status from network
290  */
291 static enum dlm_status dlm_send_remote_unlock_request(struct dlm_ctxt *dlm,
292 						 struct dlm_lock_resource *res,
293 						 struct dlm_lock *lock,
294 						 struct dlm_lockstatus *lksb,
295 						 int flags,
296 						 u8 owner)
297 {
298 	struct dlm_unlock_lock unlock;
299 	int tmpret;
300 	enum dlm_status ret;
301 	int status = 0;
302 	struct kvec vec[2];
303 	size_t veclen = 1;
304 
305 	mlog_entry("%.*s\n", res->lockname.len, res->lockname.name);
306 
307 	memset(&unlock, 0, sizeof(unlock));
308 	unlock.node_idx = dlm->node_num;
309 	unlock.flags = cpu_to_be32(flags);
310 	unlock.cookie = lock->ml.cookie;
311 	unlock.namelen = res->lockname.len;
312 	memcpy(unlock.name, res->lockname.name, unlock.namelen);
313 
314 	vec[0].iov_len = sizeof(struct dlm_unlock_lock);
315 	vec[0].iov_base = &unlock;
316 
317 	if (flags & LKM_PUT_LVB) {
318 		/* extra data to send if we are updating lvb */
319 		vec[1].iov_len = DLM_LVB_LEN;
320 		vec[1].iov_base = lock->lksb->lvb;
321 		veclen++;
322 	}
323 
324 	tmpret = o2net_send_message_vec(DLM_UNLOCK_LOCK_MSG, dlm->key,
325 					vec, veclen, owner, &status);
326 	if (tmpret >= 0) {
327 		// successfully sent and received
328 		if (status == DLM_CANCELGRANT)
329 			ret = DLM_NORMAL;
330 		else if (status == DLM_FORWARD) {
331 			mlog(0, "master was in-progress.  retry\n");
332 			ret = DLM_FORWARD;
333 		} else
334 			ret = status;
335 		lksb->status = status;
336 	} else {
337 		mlog_errno(tmpret);
338 		if (dlm_is_host_down(tmpret)) {
339 			/* NOTE: this seems strange, but it is what we want.
340 			 * when the master goes down during a cancel or
341 			 * unlock, the recovery code completes the operation
342 			 * as if the master had not died, then passes the
343 			 * updated state to the recovery master.  this thread
344 			 * just needs to finish out the operation and call
345 			 * the unlockast. */
346 			ret = DLM_NORMAL;
347 		} else {
348 			/* something bad.  this will BUG in ocfs2 */
349 			ret = dlm_err_to_dlm_status(tmpret);
350 		}
351 		lksb->status = ret;
352 	}
353 
354 	return ret;
355 }
356 
357 /*
358  * locking:
359  *   caller needs:  none
360  *   taken:         takes and drops res->spinlock
361  *   held on exit:  none
362  * returns: DLM_NORMAL, DLM_BADARGS, DLM_IVLOCKID,
363  *          return value from dlmunlock_master
364  */
365 int dlm_unlock_lock_handler(struct o2net_msg *msg, u32 len, void *data)
366 {
367 	struct dlm_ctxt *dlm = data;
368 	struct dlm_unlock_lock *unlock = (struct dlm_unlock_lock *)msg->buf;
369 	struct dlm_lock_resource *res = NULL;
370 	struct list_head *iter;
371 	struct dlm_lock *lock = NULL;
372 	enum dlm_status status = DLM_NORMAL;
373 	int found = 0, i;
374 	struct dlm_lockstatus *lksb = NULL;
375 	int ignore;
376 	u32 flags;
377 	struct list_head *queue;
378 
379 	flags = be32_to_cpu(unlock->flags);
380 
381 	if (flags & LKM_GET_LVB) {
382 		mlog(ML_ERROR, "bad args!  GET_LVB specified on unlock!\n");
383 		return DLM_BADARGS;
384 	}
385 
386 	if ((flags & (LKM_PUT_LVB|LKM_CANCEL)) == (LKM_PUT_LVB|LKM_CANCEL)) {
387 		mlog(ML_ERROR, "bad args!  cannot modify lvb on a CANCEL "
388 		     "request!\n");
389 		return DLM_BADARGS;
390 	}
391 
392 	if (unlock->namelen > DLM_LOCKID_NAME_MAX) {
393 		mlog(ML_ERROR, "Invalid name length in unlock handler!\n");
394 		return DLM_IVBUFLEN;
395 	}
396 
397 	if (!dlm_grab(dlm))
398 		return DLM_REJECTED;
399 
400 	mlog_bug_on_msg(!dlm_domain_fully_joined(dlm),
401 			"Domain %s not fully joined!\n", dlm->name);
402 
403 	mlog(0, "lvb: %s\n", flags & LKM_PUT_LVB ? "put lvb" : "none");
404 
405 	res = dlm_lookup_lockres(dlm, unlock->name, unlock->namelen);
406 	if (!res) {
407 		/* We assume here that a no lock resource simply means
408 		 * it was migrated away and destroyed before the other
409 		 * node could detect it. */
410 		mlog(0, "returning DLM_FORWARD -- res no longer exists\n");
411 		status = DLM_FORWARD;
412 		goto not_found;
413 	}
414 
415 	queue=&res->granted;
416 	found = 0;
417 	spin_lock(&res->spinlock);
418 	if (res->state & DLM_LOCK_RES_RECOVERING) {
419 		spin_unlock(&res->spinlock);
420 		mlog(0, "returning DLM_RECOVERING\n");
421 		status = DLM_RECOVERING;
422 		goto leave;
423 	}
424 
425 	if (res->state & DLM_LOCK_RES_MIGRATING) {
426 		spin_unlock(&res->spinlock);
427 		mlog(0, "returning DLM_MIGRATING\n");
428 		status = DLM_MIGRATING;
429 		goto leave;
430 	}
431 
432 	if (res->owner != dlm->node_num) {
433 		spin_unlock(&res->spinlock);
434 		mlog(0, "returning DLM_FORWARD -- not master\n");
435 		status = DLM_FORWARD;
436 		goto leave;
437 	}
438 
439 	for (i=0; i<3; i++) {
440 		list_for_each(iter, queue) {
441 			lock = list_entry(iter, struct dlm_lock, list);
442 			if (lock->ml.cookie == unlock->cookie &&
443 		    	    lock->ml.node == unlock->node_idx) {
444 				dlm_lock_get(lock);
445 				found = 1;
446 				break;
447 			}
448 		}
449 		if (found)
450 			break;
451 		/* scan granted -> converting -> blocked queues */
452 		queue++;
453 	}
454 	spin_unlock(&res->spinlock);
455 	if (!found) {
456 		status = DLM_IVLOCKID;
457 		goto not_found;
458 	}
459 
460 	/* lock was found on queue */
461 	lksb = lock->lksb;
462 	/* unlockast only called on originating node */
463 	if (flags & LKM_PUT_LVB) {
464 		lksb->flags |= DLM_LKSB_PUT_LVB;
465 		memcpy(&lksb->lvb[0], &unlock->lvb[0], DLM_LVB_LEN);
466 	}
467 
468 	/* if this is in-progress, propagate the DLM_FORWARD
469 	 * all the way back out */
470 	status = dlmunlock_master(dlm, res, lock, lksb, flags, &ignore);
471 	if (status == DLM_FORWARD)
472 		mlog(0, "lockres is in progress\n");
473 
474 	if (flags & LKM_PUT_LVB)
475 		lksb->flags &= ~DLM_LKSB_PUT_LVB;
476 
477 	dlm_lockres_calc_usage(dlm, res);
478 	dlm_kick_thread(dlm, res);
479 
480 not_found:
481 	if (!found)
482 		mlog(ML_ERROR, "failed to find lock to unlock! "
483 			       "cookie=%"MLFu64"\n",
484 		     unlock->cookie);
485 	else {
486 		/* send the lksb->status back to the other node */
487 		status = lksb->status;
488 		dlm_lock_put(lock);
489 	}
490 
491 leave:
492 	if (res)
493 		dlm_lockres_put(res);
494 
495 	dlm_put(dlm);
496 
497 	return status;
498 }
499 
500 
501 static enum dlm_status dlm_get_cancel_actions(struct dlm_ctxt *dlm,
502 					      struct dlm_lock_resource *res,
503 					      struct dlm_lock *lock,
504 					      struct dlm_lockstatus *lksb,
505 					      int *actions)
506 {
507 	enum dlm_status status;
508 
509 	if (dlm_lock_on_list(&res->blocked, lock)) {
510 		/* cancel this outright */
511 		lksb->status = DLM_NORMAL;
512 		status = DLM_NORMAL;
513 		*actions = (DLM_UNLOCK_CALL_AST |
514 			    DLM_UNLOCK_REMOVE_LOCK);
515 	} else if (dlm_lock_on_list(&res->converting, lock)) {
516 		/* cancel the request, put back on granted */
517 		lksb->status = DLM_NORMAL;
518 		status = DLM_NORMAL;
519 		*actions = (DLM_UNLOCK_CALL_AST |
520 			    DLM_UNLOCK_REMOVE_LOCK |
521 			    DLM_UNLOCK_REGRANT_LOCK |
522 			    DLM_UNLOCK_CLEAR_CONVERT_TYPE);
523 	} else if (dlm_lock_on_list(&res->granted, lock)) {
524 		/* too late, already granted.  DLM_CANCELGRANT */
525 		lksb->status = DLM_CANCELGRANT;
526 		status = DLM_NORMAL;
527 		*actions = DLM_UNLOCK_CALL_AST;
528 	} else {
529 		mlog(ML_ERROR, "lock to cancel is not on any list!\n");
530 		lksb->status = DLM_IVLOCKID;
531 		status = DLM_IVLOCKID;
532 		*actions = 0;
533 	}
534 	return status;
535 }
536 
537 static enum dlm_status dlm_get_unlock_actions(struct dlm_ctxt *dlm,
538 					      struct dlm_lock_resource *res,
539 					      struct dlm_lock *lock,
540 					      struct dlm_lockstatus *lksb,
541 					      int *actions)
542 {
543 	enum dlm_status status;
544 
545 	/* unlock request */
546 	if (!dlm_lock_on_list(&res->granted, lock)) {
547 		lksb->status = DLM_DENIED;
548 		status = DLM_DENIED;
549 		dlm_error(status);
550 		*actions = 0;
551 	} else {
552 		/* unlock granted lock */
553 		lksb->status = DLM_NORMAL;
554 		status = DLM_NORMAL;
555 		*actions = (DLM_UNLOCK_FREE_LOCK |
556 			    DLM_UNLOCK_CALL_AST |
557 			    DLM_UNLOCK_REMOVE_LOCK);
558 	}
559 	return status;
560 }
561 
562 /* there seems to be no point in doing this async
563  * since (even for the remote case) there is really
564  * no work to queue up... so just do it and fire the
565  * unlockast by hand when done... */
566 enum dlm_status dlmunlock(struct dlm_ctxt *dlm, struct dlm_lockstatus *lksb,
567 			  int flags, dlm_astunlockfunc_t *unlockast, void *data)
568 {
569 	enum dlm_status status;
570 	struct dlm_lock_resource *res;
571 	struct dlm_lock *lock = NULL;
572 	int call_ast, is_master;
573 
574 	mlog_entry_void();
575 
576 	if (!lksb) {
577 		dlm_error(DLM_BADARGS);
578 		return DLM_BADARGS;
579 	}
580 
581 	if (flags & ~(LKM_CANCEL | LKM_VALBLK | LKM_INVVALBLK)) {
582 		dlm_error(DLM_BADPARAM);
583 		return DLM_BADPARAM;
584 	}
585 
586 	if ((flags & (LKM_VALBLK | LKM_CANCEL)) == (LKM_VALBLK | LKM_CANCEL)) {
587 		mlog(0, "VALBLK given with CANCEL: ignoring VALBLK\n");
588 		flags &= ~LKM_VALBLK;
589 	}
590 
591 	if (!lksb->lockid || !lksb->lockid->lockres) {
592 		dlm_error(DLM_BADPARAM);
593 		return DLM_BADPARAM;
594 	}
595 
596 	lock = lksb->lockid;
597 	BUG_ON(!lock);
598 	dlm_lock_get(lock);
599 
600 	res = lock->lockres;
601 	BUG_ON(!res);
602 	dlm_lockres_get(res);
603 retry:
604 	call_ast = 0;
605 	/* need to retry up here because owner may have changed */
606 	mlog(0, "lock=%p res=%p\n", lock, res);
607 
608 	spin_lock(&res->spinlock);
609 	is_master = (res->owner == dlm->node_num);
610 	spin_unlock(&res->spinlock);
611 
612 	if (is_master) {
613 		status = dlmunlock_master(dlm, res, lock, lksb, flags,
614 					  &call_ast);
615 		mlog(0, "done calling dlmunlock_master: returned %d, "
616 		     "call_ast is %d\n", status, call_ast);
617 	} else {
618 		status = dlmunlock_remote(dlm, res, lock, lksb, flags,
619 					  &call_ast);
620 		mlog(0, "done calling dlmunlock_remote: returned %d, "
621 		     "call_ast is %d\n", status, call_ast);
622 	}
623 
624 	if (status == DLM_RECOVERING ||
625 	    status == DLM_MIGRATING ||
626 	    status == DLM_FORWARD) {
627 		/* We want to go away for a tiny bit to allow recovery
628 		 * / migration to complete on this resource. I don't
629 		 * know of any wait queue we could sleep on as this
630 		 * may be happening on another node. Perhaps the
631 		 * proper solution is to queue up requests on the
632 		 * other end? */
633 
634 		/* do we want to yield(); ?? */
635 		msleep(50);
636 
637 		mlog(0, "retrying unlock due to pending recovery/"
638 		     "migration/in-progress\n");
639 		goto retry;
640 	}
641 
642 	if (call_ast) {
643 		mlog(0, "calling unlockast(%p, %d)\n", data, lksb->status);
644 		if (is_master) {
645 			/* it is possible that there is one last bast
646 			 * pending.  make sure it is flushed, then
647 			 * call the unlockast.
648 			 * not an issue if this is a mastered remotely,
649 			 * since this lock has been removed from the
650 			 * lockres queues and cannot be found. */
651 			dlm_kick_thread(dlm, NULL);
652 			wait_event(dlm->ast_wq,
653 				   dlm_lock_basts_flushed(dlm, lock));
654 		}
655 		(*unlockast)(data, lksb->status);
656 	}
657 
658 	if (status == DLM_NORMAL) {
659 		mlog(0, "kicking the thread\n");
660 		dlm_kick_thread(dlm, res);
661 	} else
662 		dlm_error(status);
663 
664 	dlm_lockres_calc_usage(dlm, res);
665 	dlm_lockres_put(res);
666 	dlm_lock_put(lock);
667 
668 	mlog(0, "returning status=%d!\n", status);
669 	return status;
670 }
671 EXPORT_SYMBOL_GPL(dlmunlock);
672 
673