xref: /openbmc/linux/fs/dlm/lock.c (revision a1bc86e6bddd34362ca08a3a4d898eb4b5c15215)
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12 
13 /* Central locking logic has four stages:
14 
15    dlm_lock()
16    dlm_unlock()
17 
18    request_lock(ls, lkb)
19    convert_lock(ls, lkb)
20    unlock_lock(ls, lkb)
21    cancel_lock(ls, lkb)
22 
23    _request_lock(r, lkb)
24    _convert_lock(r, lkb)
25    _unlock_lock(r, lkb)
26    _cancel_lock(r, lkb)
27 
28    do_request(r, lkb)
29    do_convert(r, lkb)
30    do_unlock(r, lkb)
31    do_cancel(r, lkb)
32 
33    Stage 1 (lock, unlock) is mainly about checking input args and
34    splitting into one of the four main operations:
35 
36        dlm_lock          = request_lock
37        dlm_lock+CONVERT  = convert_lock
38        dlm_unlock        = unlock_lock
39        dlm_unlock+CANCEL = cancel_lock
40 
41    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42    provided to the next stage.
43 
44    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45    When remote, it calls send_xxxx(), when local it calls do_xxxx().
46 
47    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
48    given rsb and lkb and queues callbacks.
49 
50    For remote operations, send_xxxx() results in the corresponding do_xxxx()
51    function being executed on the remote node.  The connecting send/receive
52    calls on local (L) and remote (R) nodes:
53 
54    L: send_xxxx()              ->  R: receive_xxxx()
55                                    R: do_xxxx()
56    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
57 */
58 #include <linux/types.h>
59 #include "dlm_internal.h"
60 #include <linux/dlm_device.h>
61 #include "memory.h"
62 #include "lowcomms.h"
63 #include "requestqueue.h"
64 #include "util.h"
65 #include "dir.h"
66 #include "member.h"
67 #include "lockspace.h"
68 #include "ast.h"
69 #include "lock.h"
70 #include "rcom.h"
71 #include "recover.h"
72 #include "lvb_table.h"
73 #include "user.h"
74 #include "config.h"
75 
76 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
77 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
78 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
82 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_remove(struct dlm_rsb *r);
84 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
86 				    struct dlm_message *ms);
87 static int receive_extralen(struct dlm_message *ms);
88 
89 /*
90  * Lock compatibilty matrix - thanks Steve
91  * UN = Unlocked state. Not really a state, used as a flag
92  * PD = Padding. Used to make the matrix a nice power of two in size
93  * Other states are the same as the VMS DLM.
94  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
95  */
96 
97 static const int __dlm_compat_matrix[8][8] = {
98       /* UN NL CR CW PR PW EX PD */
99         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
100         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
101         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
102         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
103         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
104         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
105         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
106         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
107 };
108 
109 /*
110  * This defines the direction of transfer of LVB data.
111  * Granted mode is the row; requested mode is the column.
112  * Usage: matrix[grmode+1][rqmode+1]
113  * 1 = LVB is returned to the caller
114  * 0 = LVB is written to the resource
115  * -1 = nothing happens to the LVB
116  */
117 
118 const int dlm_lvb_operations[8][8] = {
119         /* UN   NL  CR  CW  PR  PW  EX  PD*/
120         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
121         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
122         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
123         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
124         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
125         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
126         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
127         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
128 };
129 
130 #define modes_compat(gr, rq) \
131 	__dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
132 
133 int dlm_modes_compat(int mode1, int mode2)
134 {
135 	return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
136 }
137 
138 /*
139  * Compatibility matrix for conversions with QUECVT set.
140  * Granted mode is the row; requested mode is the column.
141  * Usage: matrix[grmode+1][rqmode+1]
142  */
143 
144 static const int __quecvt_compat_matrix[8][8] = {
145       /* UN NL CR CW PR PW EX PD */
146         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
147         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
148         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
149         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
150         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
151         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
152         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
153         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
154 };
155 
156 void dlm_print_lkb(struct dlm_lkb *lkb)
157 {
158 	printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
159 	       "     status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
160 	       lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
161 	       lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
162 	       lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
163 }
164 
165 void dlm_print_rsb(struct dlm_rsb *r)
166 {
167 	printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
168 	       r->res_nodeid, r->res_flags, r->res_first_lkid,
169 	       r->res_recover_locks_count, r->res_name);
170 }
171 
172 void dlm_dump_rsb(struct dlm_rsb *r)
173 {
174 	struct dlm_lkb *lkb;
175 
176 	dlm_print_rsb(r);
177 
178 	printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
179 	       list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
180 	printk(KERN_ERR "rsb lookup list\n");
181 	list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
182 		dlm_print_lkb(lkb);
183 	printk(KERN_ERR "rsb grant queue:\n");
184 	list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
185 		dlm_print_lkb(lkb);
186 	printk(KERN_ERR "rsb convert queue:\n");
187 	list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
188 		dlm_print_lkb(lkb);
189 	printk(KERN_ERR "rsb wait queue:\n");
190 	list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
191 		dlm_print_lkb(lkb);
192 }
193 
194 /* Threads cannot use the lockspace while it's being recovered */
195 
196 static inline void lock_recovery(struct dlm_ls *ls)
197 {
198 	down_read(&ls->ls_in_recovery);
199 }
200 
201 static inline void unlock_recovery(struct dlm_ls *ls)
202 {
203 	up_read(&ls->ls_in_recovery);
204 }
205 
206 static inline int lock_recovery_try(struct dlm_ls *ls)
207 {
208 	return down_read_trylock(&ls->ls_in_recovery);
209 }
210 
211 static inline int can_be_queued(struct dlm_lkb *lkb)
212 {
213 	return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
214 }
215 
216 static inline int force_blocking_asts(struct dlm_lkb *lkb)
217 {
218 	return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
219 }
220 
221 static inline int is_demoted(struct dlm_lkb *lkb)
222 {
223 	return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
224 }
225 
226 static inline int is_remote(struct dlm_rsb *r)
227 {
228 	DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
229 	return !!r->res_nodeid;
230 }
231 
232 static inline int is_process_copy(struct dlm_lkb *lkb)
233 {
234 	return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
235 }
236 
237 static inline int is_master_copy(struct dlm_lkb *lkb)
238 {
239 	if (lkb->lkb_flags & DLM_IFL_MSTCPY)
240 		DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
241 	return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
242 }
243 
244 static inline int middle_conversion(struct dlm_lkb *lkb)
245 {
246 	if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
247 	    (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
248 		return 1;
249 	return 0;
250 }
251 
252 static inline int down_conversion(struct dlm_lkb *lkb)
253 {
254 	return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
255 }
256 
257 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
258 {
259 	if (is_master_copy(lkb))
260 		return;
261 
262 	DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
263 
264 	lkb->lkb_lksb->sb_status = rv;
265 	lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
266 
267 	dlm_add_ast(lkb, AST_COMP);
268 }
269 
270 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
271 {
272 	if (is_master_copy(lkb))
273 		send_bast(r, lkb, rqmode);
274 	else {
275 		lkb->lkb_bastmode = rqmode;
276 		dlm_add_ast(lkb, AST_BAST);
277 	}
278 }
279 
280 /*
281  * Basic operations on rsb's and lkb's
282  */
283 
284 static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
285 {
286 	struct dlm_rsb *r;
287 
288 	r = allocate_rsb(ls, len);
289 	if (!r)
290 		return NULL;
291 
292 	r->res_ls = ls;
293 	r->res_length = len;
294 	memcpy(r->res_name, name, len);
295 	mutex_init(&r->res_mutex);
296 
297 	INIT_LIST_HEAD(&r->res_lookup);
298 	INIT_LIST_HEAD(&r->res_grantqueue);
299 	INIT_LIST_HEAD(&r->res_convertqueue);
300 	INIT_LIST_HEAD(&r->res_waitqueue);
301 	INIT_LIST_HEAD(&r->res_root_list);
302 	INIT_LIST_HEAD(&r->res_recover_list);
303 
304 	return r;
305 }
306 
307 static int search_rsb_list(struct list_head *head, char *name, int len,
308 			   unsigned int flags, struct dlm_rsb **r_ret)
309 {
310 	struct dlm_rsb *r;
311 	int error = 0;
312 
313 	list_for_each_entry(r, head, res_hashchain) {
314 		if (len == r->res_length && !memcmp(name, r->res_name, len))
315 			goto found;
316 	}
317 	return -EBADR;
318 
319  found:
320 	if (r->res_nodeid && (flags & R_MASTER))
321 		error = -ENOTBLK;
322 	*r_ret = r;
323 	return error;
324 }
325 
326 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
327 		       unsigned int flags, struct dlm_rsb **r_ret)
328 {
329 	struct dlm_rsb *r;
330 	int error;
331 
332 	error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
333 	if (!error) {
334 		kref_get(&r->res_ref);
335 		goto out;
336 	}
337 	error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
338 	if (error)
339 		goto out;
340 
341 	list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
342 
343 	if (dlm_no_directory(ls))
344 		goto out;
345 
346 	if (r->res_nodeid == -1) {
347 		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
348 		r->res_first_lkid = 0;
349 	} else if (r->res_nodeid > 0) {
350 		rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
351 		r->res_first_lkid = 0;
352 	} else {
353 		DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
354 		DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
355 	}
356  out:
357 	*r_ret = r;
358 	return error;
359 }
360 
361 static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
362 		      unsigned int flags, struct dlm_rsb **r_ret)
363 {
364 	int error;
365 	write_lock(&ls->ls_rsbtbl[b].lock);
366 	error = _search_rsb(ls, name, len, b, flags, r_ret);
367 	write_unlock(&ls->ls_rsbtbl[b].lock);
368 	return error;
369 }
370 
371 /*
372  * Find rsb in rsbtbl and potentially create/add one
373  *
374  * Delaying the release of rsb's has a similar benefit to applications keeping
375  * NL locks on an rsb, but without the guarantee that the cached master value
376  * will still be valid when the rsb is reused.  Apps aren't always smart enough
377  * to keep NL locks on an rsb that they may lock again shortly; this can lead
378  * to excessive master lookups and removals if we don't delay the release.
379  *
380  * Searching for an rsb means looking through both the normal list and toss
381  * list.  When found on the toss list the rsb is moved to the normal list with
382  * ref count of 1; when found on normal list the ref count is incremented.
383  */
384 
385 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
386 		    unsigned int flags, struct dlm_rsb **r_ret)
387 {
388 	struct dlm_rsb *r, *tmp;
389 	uint32_t hash, bucket;
390 	int error = 0;
391 
392 	if (dlm_no_directory(ls))
393 		flags |= R_CREATE;
394 
395 	hash = jhash(name, namelen, 0);
396 	bucket = hash & (ls->ls_rsbtbl_size - 1);
397 
398 	error = search_rsb(ls, name, namelen, bucket, flags, &r);
399 	if (!error)
400 		goto out;
401 
402 	if (error == -EBADR && !(flags & R_CREATE))
403 		goto out;
404 
405 	/* the rsb was found but wasn't a master copy */
406 	if (error == -ENOTBLK)
407 		goto out;
408 
409 	error = -ENOMEM;
410 	r = create_rsb(ls, name, namelen);
411 	if (!r)
412 		goto out;
413 
414 	r->res_hash = hash;
415 	r->res_bucket = bucket;
416 	r->res_nodeid = -1;
417 	kref_init(&r->res_ref);
418 
419 	/* With no directory, the master can be set immediately */
420 	if (dlm_no_directory(ls)) {
421 		int nodeid = dlm_dir_nodeid(r);
422 		if (nodeid == dlm_our_nodeid())
423 			nodeid = 0;
424 		r->res_nodeid = nodeid;
425 	}
426 
427 	write_lock(&ls->ls_rsbtbl[bucket].lock);
428 	error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
429 	if (!error) {
430 		write_unlock(&ls->ls_rsbtbl[bucket].lock);
431 		free_rsb(r);
432 		r = tmp;
433 		goto out;
434 	}
435 	list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
436 	write_unlock(&ls->ls_rsbtbl[bucket].lock);
437 	error = 0;
438  out:
439 	*r_ret = r;
440 	return error;
441 }
442 
443 int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen,
444 		 unsigned int flags, struct dlm_rsb **r_ret)
445 {
446 	return find_rsb(ls, name, namelen, flags, r_ret);
447 }
448 
449 /* This is only called to add a reference when the code already holds
450    a valid reference to the rsb, so there's no need for locking. */
451 
452 static inline void hold_rsb(struct dlm_rsb *r)
453 {
454 	kref_get(&r->res_ref);
455 }
456 
457 void dlm_hold_rsb(struct dlm_rsb *r)
458 {
459 	hold_rsb(r);
460 }
461 
462 static void toss_rsb(struct kref *kref)
463 {
464 	struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
465 	struct dlm_ls *ls = r->res_ls;
466 
467 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
468 	kref_init(&r->res_ref);
469 	list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
470 	r->res_toss_time = jiffies;
471 	if (r->res_lvbptr) {
472 		free_lvb(r->res_lvbptr);
473 		r->res_lvbptr = NULL;
474 	}
475 }
476 
477 /* When all references to the rsb are gone it's transfered to
478    the tossed list for later disposal. */
479 
480 static void put_rsb(struct dlm_rsb *r)
481 {
482 	struct dlm_ls *ls = r->res_ls;
483 	uint32_t bucket = r->res_bucket;
484 
485 	write_lock(&ls->ls_rsbtbl[bucket].lock);
486 	kref_put(&r->res_ref, toss_rsb);
487 	write_unlock(&ls->ls_rsbtbl[bucket].lock);
488 }
489 
490 void dlm_put_rsb(struct dlm_rsb *r)
491 {
492 	put_rsb(r);
493 }
494 
495 /* See comment for unhold_lkb */
496 
497 static void unhold_rsb(struct dlm_rsb *r)
498 {
499 	int rv;
500 	rv = kref_put(&r->res_ref, toss_rsb);
501 	DLM_ASSERT(!rv, dlm_dump_rsb(r););
502 }
503 
504 static void kill_rsb(struct kref *kref)
505 {
506 	struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
507 
508 	/* All work is done after the return from kref_put() so we
509 	   can release the write_lock before the remove and free. */
510 
511 	DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
512 	DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
513 	DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
514 	DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
515 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
516 	DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
517 }
518 
519 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
520    The rsb must exist as long as any lkb's for it do. */
521 
522 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
523 {
524 	hold_rsb(r);
525 	lkb->lkb_resource = r;
526 }
527 
528 static void detach_lkb(struct dlm_lkb *lkb)
529 {
530 	if (lkb->lkb_resource) {
531 		put_rsb(lkb->lkb_resource);
532 		lkb->lkb_resource = NULL;
533 	}
534 }
535 
536 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
537 {
538 	struct dlm_lkb *lkb, *tmp;
539 	uint32_t lkid = 0;
540 	uint16_t bucket;
541 
542 	lkb = allocate_lkb(ls);
543 	if (!lkb)
544 		return -ENOMEM;
545 
546 	lkb->lkb_nodeid = -1;
547 	lkb->lkb_grmode = DLM_LOCK_IV;
548 	kref_init(&lkb->lkb_ref);
549 	INIT_LIST_HEAD(&lkb->lkb_ownqueue);
550 
551 	get_random_bytes(&bucket, sizeof(bucket));
552 	bucket &= (ls->ls_lkbtbl_size - 1);
553 
554 	write_lock(&ls->ls_lkbtbl[bucket].lock);
555 
556 	/* counter can roll over so we must verify lkid is not in use */
557 
558 	while (lkid == 0) {
559 		lkid = bucket | (ls->ls_lkbtbl[bucket].counter++ << 16);
560 
561 		list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
562 				    lkb_idtbl_list) {
563 			if (tmp->lkb_id != lkid)
564 				continue;
565 			lkid = 0;
566 			break;
567 		}
568 	}
569 
570 	lkb->lkb_id = lkid;
571 	list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
572 	write_unlock(&ls->ls_lkbtbl[bucket].lock);
573 
574 	*lkb_ret = lkb;
575 	return 0;
576 }
577 
578 static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
579 {
580 	uint16_t bucket = lkid & 0xFFFF;
581 	struct dlm_lkb *lkb;
582 
583 	list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
584 		if (lkb->lkb_id == lkid)
585 			return lkb;
586 	}
587 	return NULL;
588 }
589 
590 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
591 {
592 	struct dlm_lkb *lkb;
593 	uint16_t bucket = lkid & 0xFFFF;
594 
595 	if (bucket >= ls->ls_lkbtbl_size)
596 		return -EBADSLT;
597 
598 	read_lock(&ls->ls_lkbtbl[bucket].lock);
599 	lkb = __find_lkb(ls, lkid);
600 	if (lkb)
601 		kref_get(&lkb->lkb_ref);
602 	read_unlock(&ls->ls_lkbtbl[bucket].lock);
603 
604 	*lkb_ret = lkb;
605 	return lkb ? 0 : -ENOENT;
606 }
607 
608 static void kill_lkb(struct kref *kref)
609 {
610 	struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
611 
612 	/* All work is done after the return from kref_put() so we
613 	   can release the write_lock before the detach_lkb */
614 
615 	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
616 }
617 
618 /* __put_lkb() is used when an lkb may not have an rsb attached to
619    it so we need to provide the lockspace explicitly */
620 
621 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
622 {
623 	uint16_t bucket = lkb->lkb_id & 0xFFFF;
624 
625 	write_lock(&ls->ls_lkbtbl[bucket].lock);
626 	if (kref_put(&lkb->lkb_ref, kill_lkb)) {
627 		list_del(&lkb->lkb_idtbl_list);
628 		write_unlock(&ls->ls_lkbtbl[bucket].lock);
629 
630 		detach_lkb(lkb);
631 
632 		/* for local/process lkbs, lvbptr points to caller's lksb */
633 		if (lkb->lkb_lvbptr && is_master_copy(lkb))
634 			free_lvb(lkb->lkb_lvbptr);
635 		free_lkb(lkb);
636 		return 1;
637 	} else {
638 		write_unlock(&ls->ls_lkbtbl[bucket].lock);
639 		return 0;
640 	}
641 }
642 
643 int dlm_put_lkb(struct dlm_lkb *lkb)
644 {
645 	struct dlm_ls *ls;
646 
647 	DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
648 	DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
649 
650 	ls = lkb->lkb_resource->res_ls;
651 	return __put_lkb(ls, lkb);
652 }
653 
654 /* This is only called to add a reference when the code already holds
655    a valid reference to the lkb, so there's no need for locking. */
656 
657 static inline void hold_lkb(struct dlm_lkb *lkb)
658 {
659 	kref_get(&lkb->lkb_ref);
660 }
661 
662 /* This is called when we need to remove a reference and are certain
663    it's not the last ref.  e.g. del_lkb is always called between a
664    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
665    put_lkb would work fine, but would involve unnecessary locking */
666 
667 static inline void unhold_lkb(struct dlm_lkb *lkb)
668 {
669 	int rv;
670 	rv = kref_put(&lkb->lkb_ref, kill_lkb);
671 	DLM_ASSERT(!rv, dlm_print_lkb(lkb););
672 }
673 
674 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
675 			    int mode)
676 {
677 	struct dlm_lkb *lkb = NULL;
678 
679 	list_for_each_entry(lkb, head, lkb_statequeue)
680 		if (lkb->lkb_rqmode < mode)
681 			break;
682 
683 	if (!lkb)
684 		list_add_tail(new, head);
685 	else
686 		__list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
687 }
688 
689 /* add/remove lkb to rsb's grant/convert/wait queue */
690 
691 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
692 {
693 	kref_get(&lkb->lkb_ref);
694 
695 	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
696 
697 	lkb->lkb_status = status;
698 
699 	switch (status) {
700 	case DLM_LKSTS_WAITING:
701 		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
702 			list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
703 		else
704 			list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
705 		break;
706 	case DLM_LKSTS_GRANTED:
707 		/* convention says granted locks kept in order of grmode */
708 		lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
709 				lkb->lkb_grmode);
710 		break;
711 	case DLM_LKSTS_CONVERT:
712 		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
713 			list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
714 		else
715 			list_add_tail(&lkb->lkb_statequeue,
716 				      &r->res_convertqueue);
717 		break;
718 	default:
719 		DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
720 	}
721 }
722 
723 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
724 {
725 	lkb->lkb_status = 0;
726 	list_del(&lkb->lkb_statequeue);
727 	unhold_lkb(lkb);
728 }
729 
730 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
731 {
732 	hold_lkb(lkb);
733 	del_lkb(r, lkb);
734 	add_lkb(r, lkb, sts);
735 	unhold_lkb(lkb);
736 }
737 
738 /* add/remove lkb from global waiters list of lkb's waiting for
739    a reply from a remote node */
740 
741 static void add_to_waiters(struct dlm_lkb *lkb, int mstype)
742 {
743 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
744 
745 	mutex_lock(&ls->ls_waiters_mutex);
746 	if (lkb->lkb_wait_type) {
747 		log_print("add_to_waiters error %d", lkb->lkb_wait_type);
748 		goto out;
749 	}
750 	lkb->lkb_wait_type = mstype;
751 	kref_get(&lkb->lkb_ref);
752 	list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
753  out:
754 	mutex_unlock(&ls->ls_waiters_mutex);
755 }
756 
757 static int _remove_from_waiters(struct dlm_lkb *lkb)
758 {
759 	int error = 0;
760 
761 	if (!lkb->lkb_wait_type) {
762 		log_print("remove_from_waiters error");
763 		error = -EINVAL;
764 		goto out;
765 	}
766 	lkb->lkb_wait_type = 0;
767 	list_del(&lkb->lkb_wait_reply);
768 	unhold_lkb(lkb);
769  out:
770 	return error;
771 }
772 
773 static int remove_from_waiters(struct dlm_lkb *lkb)
774 {
775 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
776 	int error;
777 
778 	mutex_lock(&ls->ls_waiters_mutex);
779 	error = _remove_from_waiters(lkb);
780 	mutex_unlock(&ls->ls_waiters_mutex);
781 	return error;
782 }
783 
784 static void dir_remove(struct dlm_rsb *r)
785 {
786 	int to_nodeid;
787 
788 	if (dlm_no_directory(r->res_ls))
789 		return;
790 
791 	to_nodeid = dlm_dir_nodeid(r);
792 	if (to_nodeid != dlm_our_nodeid())
793 		send_remove(r);
794 	else
795 		dlm_dir_remove_entry(r->res_ls, to_nodeid,
796 				     r->res_name, r->res_length);
797 }
798 
799 /* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
800    found since they are in order of newest to oldest? */
801 
802 static int shrink_bucket(struct dlm_ls *ls, int b)
803 {
804 	struct dlm_rsb *r;
805 	int count = 0, found;
806 
807 	for (;;) {
808 		found = 0;
809 		write_lock(&ls->ls_rsbtbl[b].lock);
810 		list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
811 					    res_hashchain) {
812 			if (!time_after_eq(jiffies, r->res_toss_time +
813 					   dlm_config.ci_toss_secs * HZ))
814 				continue;
815 			found = 1;
816 			break;
817 		}
818 
819 		if (!found) {
820 			write_unlock(&ls->ls_rsbtbl[b].lock);
821 			break;
822 		}
823 
824 		if (kref_put(&r->res_ref, kill_rsb)) {
825 			list_del(&r->res_hashchain);
826 			write_unlock(&ls->ls_rsbtbl[b].lock);
827 
828 			if (is_master(r))
829 				dir_remove(r);
830 			free_rsb(r);
831 			count++;
832 		} else {
833 			write_unlock(&ls->ls_rsbtbl[b].lock);
834 			log_error(ls, "tossed rsb in use %s", r->res_name);
835 		}
836 	}
837 
838 	return count;
839 }
840 
841 void dlm_scan_rsbs(struct dlm_ls *ls)
842 {
843 	int i;
844 
845 	if (dlm_locking_stopped(ls))
846 		return;
847 
848 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
849 		shrink_bucket(ls, i);
850 		cond_resched();
851 	}
852 }
853 
854 /* lkb is master or local copy */
855 
856 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
857 {
858 	int b, len = r->res_ls->ls_lvblen;
859 
860 	/* b=1 lvb returned to caller
861 	   b=0 lvb written to rsb or invalidated
862 	   b=-1 do nothing */
863 
864 	b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
865 
866 	if (b == 1) {
867 		if (!lkb->lkb_lvbptr)
868 			return;
869 
870 		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
871 			return;
872 
873 		if (!r->res_lvbptr)
874 			return;
875 
876 		memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
877 		lkb->lkb_lvbseq = r->res_lvbseq;
878 
879 	} else if (b == 0) {
880 		if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
881 			rsb_set_flag(r, RSB_VALNOTVALID);
882 			return;
883 		}
884 
885 		if (!lkb->lkb_lvbptr)
886 			return;
887 
888 		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
889 			return;
890 
891 		if (!r->res_lvbptr)
892 			r->res_lvbptr = allocate_lvb(r->res_ls);
893 
894 		if (!r->res_lvbptr)
895 			return;
896 
897 		memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
898 		r->res_lvbseq++;
899 		lkb->lkb_lvbseq = r->res_lvbseq;
900 		rsb_clear_flag(r, RSB_VALNOTVALID);
901 	}
902 
903 	if (rsb_flag(r, RSB_VALNOTVALID))
904 		lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
905 }
906 
907 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
908 {
909 	if (lkb->lkb_grmode < DLM_LOCK_PW)
910 		return;
911 
912 	if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
913 		rsb_set_flag(r, RSB_VALNOTVALID);
914 		return;
915 	}
916 
917 	if (!lkb->lkb_lvbptr)
918 		return;
919 
920 	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
921 		return;
922 
923 	if (!r->res_lvbptr)
924 		r->res_lvbptr = allocate_lvb(r->res_ls);
925 
926 	if (!r->res_lvbptr)
927 		return;
928 
929 	memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
930 	r->res_lvbseq++;
931 	rsb_clear_flag(r, RSB_VALNOTVALID);
932 }
933 
934 /* lkb is process copy (pc) */
935 
936 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
937 			    struct dlm_message *ms)
938 {
939 	int b;
940 
941 	if (!lkb->lkb_lvbptr)
942 		return;
943 
944 	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
945 		return;
946 
947 	b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
948 	if (b == 1) {
949 		int len = receive_extralen(ms);
950 		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
951 		lkb->lkb_lvbseq = ms->m_lvbseq;
952 	}
953 }
954 
955 /* Manipulate lkb's on rsb's convert/granted/waiting queues
956    remove_lock -- used for unlock, removes lkb from granted
957    revert_lock -- used for cancel, moves lkb from convert to granted
958    grant_lock  -- used for request and convert, adds lkb to granted or
959                   moves lkb from convert or waiting to granted
960 
961    Each of these is used for master or local copy lkb's.  There is
962    also a _pc() variation used to make the corresponding change on
963    a process copy (pc) lkb. */
964 
965 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
966 {
967 	del_lkb(r, lkb);
968 	lkb->lkb_grmode = DLM_LOCK_IV;
969 	/* this unhold undoes the original ref from create_lkb()
970 	   so this leads to the lkb being freed */
971 	unhold_lkb(lkb);
972 }
973 
974 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
975 {
976 	set_lvb_unlock(r, lkb);
977 	_remove_lock(r, lkb);
978 }
979 
980 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
981 {
982 	_remove_lock(r, lkb);
983 }
984 
985 static void revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
986 {
987 	lkb->lkb_rqmode = DLM_LOCK_IV;
988 
989 	switch (lkb->lkb_status) {
990 	case DLM_LKSTS_GRANTED:
991 		break;
992 	case DLM_LKSTS_CONVERT:
993 		move_lkb(r, lkb, DLM_LKSTS_GRANTED);
994 		break;
995 	case DLM_LKSTS_WAITING:
996 		del_lkb(r, lkb);
997 		lkb->lkb_grmode = DLM_LOCK_IV;
998 		/* this unhold undoes the original ref from create_lkb()
999 		   so this leads to the lkb being freed */
1000 		unhold_lkb(lkb);
1001 		break;
1002 	default:
1003 		log_print("invalid status for revert %d", lkb->lkb_status);
1004 	}
1005 }
1006 
1007 static void revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1008 {
1009 	revert_lock(r, lkb);
1010 }
1011 
1012 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1013 {
1014 	if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1015 		lkb->lkb_grmode = lkb->lkb_rqmode;
1016 		if (lkb->lkb_status)
1017 			move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1018 		else
1019 			add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1020 	}
1021 
1022 	lkb->lkb_rqmode = DLM_LOCK_IV;
1023 }
1024 
1025 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1026 {
1027 	set_lvb_lock(r, lkb);
1028 	_grant_lock(r, lkb);
1029 	lkb->lkb_highbast = 0;
1030 }
1031 
1032 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1033 			  struct dlm_message *ms)
1034 {
1035 	set_lvb_lock_pc(r, lkb, ms);
1036 	_grant_lock(r, lkb);
1037 }
1038 
1039 /* called by grant_pending_locks() which means an async grant message must
1040    be sent to the requesting node in addition to granting the lock if the
1041    lkb belongs to a remote node. */
1042 
1043 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1044 {
1045 	grant_lock(r, lkb);
1046 	if (is_master_copy(lkb))
1047 		send_grant(r, lkb);
1048 	else
1049 		queue_cast(r, lkb, 0);
1050 }
1051 
1052 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1053 {
1054 	struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1055 					   lkb_statequeue);
1056 	if (lkb->lkb_id == first->lkb_id)
1057 		return 1;
1058 
1059 	return 0;
1060 }
1061 
1062 /* Check if the given lkb conflicts with another lkb on the queue. */
1063 
1064 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1065 {
1066 	struct dlm_lkb *this;
1067 
1068 	list_for_each_entry(this, head, lkb_statequeue) {
1069 		if (this == lkb)
1070 			continue;
1071 		if (!modes_compat(this, lkb))
1072 			return 1;
1073 	}
1074 	return 0;
1075 }
1076 
1077 /*
1078  * "A conversion deadlock arises with a pair of lock requests in the converting
1079  * queue for one resource.  The granted mode of each lock blocks the requested
1080  * mode of the other lock."
1081  *
1082  * Part 2: if the granted mode of lkb is preventing the first lkb in the
1083  * convert queue from being granted, then demote lkb (set grmode to NL).
1084  * This second form requires that we check for conv-deadlk even when
1085  * now == 0 in _can_be_granted().
1086  *
1087  * Example:
1088  * Granted Queue: empty
1089  * Convert Queue: NL->EX (first lock)
1090  *                PR->EX (second lock)
1091  *
1092  * The first lock can't be granted because of the granted mode of the second
1093  * lock and the second lock can't be granted because it's not first in the
1094  * list.  We demote the granted mode of the second lock (the lkb passed to this
1095  * function).
1096  *
1097  * After the resolution, the "grant pending" function needs to go back and try
1098  * to grant locks on the convert queue again since the first lock can now be
1099  * granted.
1100  */
1101 
1102 static int conversion_deadlock_detect(struct dlm_rsb *rsb, struct dlm_lkb *lkb)
1103 {
1104 	struct dlm_lkb *this, *first = NULL, *self = NULL;
1105 
1106 	list_for_each_entry(this, &rsb->res_convertqueue, lkb_statequeue) {
1107 		if (!first)
1108 			first = this;
1109 		if (this == lkb) {
1110 			self = lkb;
1111 			continue;
1112 		}
1113 
1114 		if (!modes_compat(this, lkb) && !modes_compat(lkb, this))
1115 			return 1;
1116 	}
1117 
1118 	/* if lkb is on the convert queue and is preventing the first
1119 	   from being granted, then there's deadlock and we demote lkb.
1120 	   multiple converting locks may need to do this before the first
1121 	   converting lock can be granted. */
1122 
1123 	if (self && self != first) {
1124 		if (!modes_compat(lkb, first) &&
1125 		    !queue_conflict(&rsb->res_grantqueue, first))
1126 			return 1;
1127 	}
1128 
1129 	return 0;
1130 }
1131 
1132 /*
1133  * Return 1 if the lock can be granted, 0 otherwise.
1134  * Also detect and resolve conversion deadlocks.
1135  *
1136  * lkb is the lock to be granted
1137  *
1138  * now is 1 if the function is being called in the context of the
1139  * immediate request, it is 0 if called later, after the lock has been
1140  * queued.
1141  *
1142  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1143  */
1144 
1145 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1146 {
1147 	int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1148 
1149 	/*
1150 	 * 6-10: Version 5.4 introduced an option to address the phenomenon of
1151 	 * a new request for a NL mode lock being blocked.
1152 	 *
1153 	 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1154 	 * request, then it would be granted.  In essence, the use of this flag
1155 	 * tells the Lock Manager to expedite theis request by not considering
1156 	 * what may be in the CONVERTING or WAITING queues...  As of this
1157 	 * writing, the EXPEDITE flag can be used only with new requests for NL
1158 	 * mode locks.  This flag is not valid for conversion requests.
1159 	 *
1160 	 * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
1161 	 * conversion or used with a non-NL requested mode.  We also know an
1162 	 * EXPEDITE request is always granted immediately, so now must always
1163 	 * be 1.  The full condition to grant an expedite request: (now &&
1164 	 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1165 	 * therefore be shortened to just checking the flag.
1166 	 */
1167 
1168 	if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1169 		return 1;
1170 
1171 	/*
1172 	 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1173 	 * added to the remaining conditions.
1174 	 */
1175 
1176 	if (queue_conflict(&r->res_grantqueue, lkb))
1177 		goto out;
1178 
1179 	/*
1180 	 * 6-3: By default, a conversion request is immediately granted if the
1181 	 * requested mode is compatible with the modes of all other granted
1182 	 * locks
1183 	 */
1184 
1185 	if (queue_conflict(&r->res_convertqueue, lkb))
1186 		goto out;
1187 
1188 	/*
1189 	 * 6-5: But the default algorithm for deciding whether to grant or
1190 	 * queue conversion requests does not by itself guarantee that such
1191 	 * requests are serviced on a "first come first serve" basis.  This, in
1192 	 * turn, can lead to a phenomenon known as "indefinate postponement".
1193 	 *
1194 	 * 6-7: This issue is dealt with by using the optional QUECVT flag with
1195 	 * the system service employed to request a lock conversion.  This flag
1196 	 * forces certain conversion requests to be queued, even if they are
1197 	 * compatible with the granted modes of other locks on the same
1198 	 * resource.  Thus, the use of this flag results in conversion requests
1199 	 * being ordered on a "first come first servce" basis.
1200 	 *
1201 	 * DCT: This condition is all about new conversions being able to occur
1202 	 * "in place" while the lock remains on the granted queue (assuming
1203 	 * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
1204 	 * doesn't _have_ to go onto the convert queue where it's processed in
1205 	 * order.  The "now" variable is necessary to distinguish converts
1206 	 * being received and processed for the first time now, because once a
1207 	 * convert is moved to the conversion queue the condition below applies
1208 	 * requiring fifo granting.
1209 	 */
1210 
1211 	if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1212 		return 1;
1213 
1214 	/*
1215 	 * The NOORDER flag is set to avoid the standard vms rules on grant
1216 	 * order.
1217 	 */
1218 
1219 	if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1220 		return 1;
1221 
1222 	/*
1223 	 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1224 	 * granted until all other conversion requests ahead of it are granted
1225 	 * and/or canceled.
1226 	 */
1227 
1228 	if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1229 		return 1;
1230 
1231 	/*
1232 	 * 6-4: By default, a new request is immediately granted only if all
1233 	 * three of the following conditions are satisfied when the request is
1234 	 * issued:
1235 	 * - The queue of ungranted conversion requests for the resource is
1236 	 *   empty.
1237 	 * - The queue of ungranted new requests for the resource is empty.
1238 	 * - The mode of the new request is compatible with the most
1239 	 *   restrictive mode of all granted locks on the resource.
1240 	 */
1241 
1242 	if (now && !conv && list_empty(&r->res_convertqueue) &&
1243 	    list_empty(&r->res_waitqueue))
1244 		return 1;
1245 
1246 	/*
1247 	 * 6-4: Once a lock request is in the queue of ungranted new requests,
1248 	 * it cannot be granted until the queue of ungranted conversion
1249 	 * requests is empty, all ungranted new requests ahead of it are
1250 	 * granted and/or canceled, and it is compatible with the granted mode
1251 	 * of the most restrictive lock granted on the resource.
1252 	 */
1253 
1254 	if (!now && !conv && list_empty(&r->res_convertqueue) &&
1255 	    first_in_list(lkb, &r->res_waitqueue))
1256 		return 1;
1257 
1258  out:
1259 	/*
1260 	 * The following, enabled by CONVDEADLK, departs from VMS.
1261 	 */
1262 
1263 	if (conv && (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) &&
1264 	    conversion_deadlock_detect(r, lkb)) {
1265 		lkb->lkb_grmode = DLM_LOCK_NL;
1266 		lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1267 	}
1268 
1269 	return 0;
1270 }
1271 
1272 /*
1273  * The ALTPR and ALTCW flags aren't traditional lock manager flags, but are a
1274  * simple way to provide a big optimization to applications that can use them.
1275  */
1276 
1277 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1278 {
1279 	uint32_t flags = lkb->lkb_exflags;
1280 	int rv;
1281 	int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1282 
1283 	rv = _can_be_granted(r, lkb, now);
1284 	if (rv)
1285 		goto out;
1286 
1287 	if (lkb->lkb_sbflags & DLM_SBF_DEMOTED)
1288 		goto out;
1289 
1290 	if (rqmode != DLM_LOCK_PR && flags & DLM_LKF_ALTPR)
1291 		alt = DLM_LOCK_PR;
1292 	else if (rqmode != DLM_LOCK_CW && flags & DLM_LKF_ALTCW)
1293 		alt = DLM_LOCK_CW;
1294 
1295 	if (alt) {
1296 		lkb->lkb_rqmode = alt;
1297 		rv = _can_be_granted(r, lkb, now);
1298 		if (rv)
1299 			lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1300 		else
1301 			lkb->lkb_rqmode = rqmode;
1302 	}
1303  out:
1304 	return rv;
1305 }
1306 
1307 static int grant_pending_convert(struct dlm_rsb *r, int high)
1308 {
1309 	struct dlm_lkb *lkb, *s;
1310 	int hi, demoted, quit, grant_restart, demote_restart;
1311 
1312 	quit = 0;
1313  restart:
1314 	grant_restart = 0;
1315 	demote_restart = 0;
1316 	hi = DLM_LOCK_IV;
1317 
1318 	list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1319 		demoted = is_demoted(lkb);
1320 		if (can_be_granted(r, lkb, 0)) {
1321 			grant_lock_pending(r, lkb);
1322 			grant_restart = 1;
1323 		} else {
1324 			hi = max_t(int, lkb->lkb_rqmode, hi);
1325 			if (!demoted && is_demoted(lkb))
1326 				demote_restart = 1;
1327 		}
1328 	}
1329 
1330 	if (grant_restart)
1331 		goto restart;
1332 	if (demote_restart && !quit) {
1333 		quit = 1;
1334 		goto restart;
1335 	}
1336 
1337 	return max_t(int, high, hi);
1338 }
1339 
1340 static int grant_pending_wait(struct dlm_rsb *r, int high)
1341 {
1342 	struct dlm_lkb *lkb, *s;
1343 
1344 	list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1345 		if (can_be_granted(r, lkb, 0))
1346 			grant_lock_pending(r, lkb);
1347                 else
1348 			high = max_t(int, lkb->lkb_rqmode, high);
1349 	}
1350 
1351 	return high;
1352 }
1353 
1354 static void grant_pending_locks(struct dlm_rsb *r)
1355 {
1356 	struct dlm_lkb *lkb, *s;
1357 	int high = DLM_LOCK_IV;
1358 
1359 	DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
1360 
1361 	high = grant_pending_convert(r, high);
1362 	high = grant_pending_wait(r, high);
1363 
1364 	if (high == DLM_LOCK_IV)
1365 		return;
1366 
1367 	/*
1368 	 * If there are locks left on the wait/convert queue then send blocking
1369 	 * ASTs to granted locks based on the largest requested mode (high)
1370 	 * found above. FIXME: highbast < high comparison not valid for PR/CW.
1371 	 */
1372 
1373 	list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1374 		if (lkb->lkb_bastaddr && (lkb->lkb_highbast < high) &&
1375 		    !__dlm_compat_matrix[lkb->lkb_grmode+1][high+1]) {
1376 			queue_bast(r, lkb, high);
1377 			lkb->lkb_highbast = high;
1378 		}
1379 	}
1380 }
1381 
1382 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1383 			    struct dlm_lkb *lkb)
1384 {
1385 	struct dlm_lkb *gr;
1386 
1387 	list_for_each_entry(gr, head, lkb_statequeue) {
1388 		if (gr->lkb_bastaddr &&
1389 		    gr->lkb_highbast < lkb->lkb_rqmode &&
1390 		    !modes_compat(gr, lkb)) {
1391 			queue_bast(r, gr, lkb->lkb_rqmode);
1392 			gr->lkb_highbast = lkb->lkb_rqmode;
1393 		}
1394 	}
1395 }
1396 
1397 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1398 {
1399 	send_bast_queue(r, &r->res_grantqueue, lkb);
1400 }
1401 
1402 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1403 {
1404 	send_bast_queue(r, &r->res_grantqueue, lkb);
1405 	send_bast_queue(r, &r->res_convertqueue, lkb);
1406 }
1407 
1408 /* set_master(r, lkb) -- set the master nodeid of a resource
1409 
1410    The purpose of this function is to set the nodeid field in the given
1411    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
1412    known, it can just be copied to the lkb and the function will return
1413    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
1414    before it can be copied to the lkb.
1415 
1416    When the rsb nodeid is being looked up remotely, the initial lkb
1417    causing the lookup is kept on the ls_waiters list waiting for the
1418    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
1419    on the rsb's res_lookup list until the master is verified.
1420 
1421    Return values:
1422    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1423    1: the rsb master is not available and the lkb has been placed on
1424       a wait queue
1425 */
1426 
1427 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1428 {
1429 	struct dlm_ls *ls = r->res_ls;
1430 	int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1431 
1432 	if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1433 		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1434 		r->res_first_lkid = lkb->lkb_id;
1435 		lkb->lkb_nodeid = r->res_nodeid;
1436 		return 0;
1437 	}
1438 
1439 	if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1440 		list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1441 		return 1;
1442 	}
1443 
1444 	if (r->res_nodeid == 0) {
1445 		lkb->lkb_nodeid = 0;
1446 		return 0;
1447 	}
1448 
1449 	if (r->res_nodeid > 0) {
1450 		lkb->lkb_nodeid = r->res_nodeid;
1451 		return 0;
1452 	}
1453 
1454 	DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
1455 
1456 	dir_nodeid = dlm_dir_nodeid(r);
1457 
1458 	if (dir_nodeid != our_nodeid) {
1459 		r->res_first_lkid = lkb->lkb_id;
1460 		send_lookup(r, lkb);
1461 		return 1;
1462 	}
1463 
1464 	for (;;) {
1465 		/* It's possible for dlm_scand to remove an old rsb for
1466 		   this same resource from the toss list, us to create
1467 		   a new one, look up the master locally, and find it
1468 		   already exists just before dlm_scand does the
1469 		   dir_remove() on the previous rsb. */
1470 
1471 		error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1472 				       r->res_length, &ret_nodeid);
1473 		if (!error)
1474 			break;
1475 		log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1476 		schedule();
1477 	}
1478 
1479 	if (ret_nodeid == our_nodeid) {
1480 		r->res_first_lkid = 0;
1481 		r->res_nodeid = 0;
1482 		lkb->lkb_nodeid = 0;
1483 	} else {
1484 		r->res_first_lkid = lkb->lkb_id;
1485 		r->res_nodeid = ret_nodeid;
1486 		lkb->lkb_nodeid = ret_nodeid;
1487 	}
1488 	return 0;
1489 }
1490 
1491 static void process_lookup_list(struct dlm_rsb *r)
1492 {
1493 	struct dlm_lkb *lkb, *safe;
1494 
1495 	list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1496 		list_del(&lkb->lkb_rsb_lookup);
1497 		_request_lock(r, lkb);
1498 		schedule();
1499 	}
1500 }
1501 
1502 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
1503 
1504 static void confirm_master(struct dlm_rsb *r, int error)
1505 {
1506 	struct dlm_lkb *lkb;
1507 
1508 	if (!r->res_first_lkid)
1509 		return;
1510 
1511 	switch (error) {
1512 	case 0:
1513 	case -EINPROGRESS:
1514 		r->res_first_lkid = 0;
1515 		process_lookup_list(r);
1516 		break;
1517 
1518 	case -EAGAIN:
1519 		/* the remote master didn't queue our NOQUEUE request;
1520 		   make a waiting lkb the first_lkid */
1521 
1522 		r->res_first_lkid = 0;
1523 
1524 		if (!list_empty(&r->res_lookup)) {
1525 			lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1526 					 lkb_rsb_lookup);
1527 			list_del(&lkb->lkb_rsb_lookup);
1528 			r->res_first_lkid = lkb->lkb_id;
1529 			_request_lock(r, lkb);
1530 		} else
1531 			r->res_nodeid = -1;
1532 		break;
1533 
1534 	default:
1535 		log_error(r->res_ls, "confirm_master unknown error %d", error);
1536 	}
1537 }
1538 
1539 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
1540 			 int namelen, uint32_t parent_lkid, void *ast,
1541 			 void *astarg, void *bast, struct dlm_args *args)
1542 {
1543 	int rv = -EINVAL;
1544 
1545 	/* check for invalid arg usage */
1546 
1547 	if (mode < 0 || mode > DLM_LOCK_EX)
1548 		goto out;
1549 
1550 	if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
1551 		goto out;
1552 
1553 	if (flags & DLM_LKF_CANCEL)
1554 		goto out;
1555 
1556 	if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
1557 		goto out;
1558 
1559 	if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
1560 		goto out;
1561 
1562 	if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
1563 		goto out;
1564 
1565 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
1566 		goto out;
1567 
1568 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
1569 		goto out;
1570 
1571 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
1572 		goto out;
1573 
1574 	if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
1575 		goto out;
1576 
1577 	if (!ast || !lksb)
1578 		goto out;
1579 
1580 	if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
1581 		goto out;
1582 
1583 	/* parent/child locks not yet supported */
1584 	if (parent_lkid)
1585 		goto out;
1586 
1587 	if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
1588 		goto out;
1589 
1590 	/* these args will be copied to the lkb in validate_lock_args,
1591 	   it cannot be done now because when converting locks, fields in
1592 	   an active lkb cannot be modified before locking the rsb */
1593 
1594 	args->flags = flags;
1595 	args->astaddr = ast;
1596 	args->astparam = (long) astarg;
1597 	args->bastaddr = bast;
1598 	args->mode = mode;
1599 	args->lksb = lksb;
1600 	rv = 0;
1601  out:
1602 	return rv;
1603 }
1604 
1605 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
1606 {
1607 	if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
1608  		      DLM_LKF_FORCEUNLOCK))
1609 		return -EINVAL;
1610 
1611 	args->flags = flags;
1612 	args->astparam = (long) astarg;
1613 	return 0;
1614 }
1615 
1616 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
1617 			      struct dlm_args *args)
1618 {
1619 	int rv = -EINVAL;
1620 
1621 	if (args->flags & DLM_LKF_CONVERT) {
1622 		if (lkb->lkb_flags & DLM_IFL_MSTCPY)
1623 			goto out;
1624 
1625 		if (args->flags & DLM_LKF_QUECVT &&
1626 		    !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
1627 			goto out;
1628 
1629 		rv = -EBUSY;
1630 		if (lkb->lkb_status != DLM_LKSTS_GRANTED)
1631 			goto out;
1632 
1633 		if (lkb->lkb_wait_type)
1634 			goto out;
1635 	}
1636 
1637 	lkb->lkb_exflags = args->flags;
1638 	lkb->lkb_sbflags = 0;
1639 	lkb->lkb_astaddr = args->astaddr;
1640 	lkb->lkb_astparam = args->astparam;
1641 	lkb->lkb_bastaddr = args->bastaddr;
1642 	lkb->lkb_rqmode = args->mode;
1643 	lkb->lkb_lksb = args->lksb;
1644 	lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
1645 	lkb->lkb_ownpid = (int) current->pid;
1646 	rv = 0;
1647  out:
1648 	return rv;
1649 }
1650 
1651 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
1652 {
1653 	int rv = -EINVAL;
1654 
1655 	if (lkb->lkb_flags & DLM_IFL_MSTCPY)
1656 		goto out;
1657 
1658 	if (args->flags & DLM_LKF_FORCEUNLOCK)
1659 		goto out_ok;
1660 
1661 	if (args->flags & DLM_LKF_CANCEL &&
1662 	    lkb->lkb_status == DLM_LKSTS_GRANTED)
1663 		goto out;
1664 
1665 	if (!(args->flags & DLM_LKF_CANCEL) &&
1666 	    lkb->lkb_status != DLM_LKSTS_GRANTED)
1667 		goto out;
1668 
1669 	rv = -EBUSY;
1670 	if (lkb->lkb_wait_type)
1671 		goto out;
1672 
1673  out_ok:
1674 	lkb->lkb_exflags = args->flags;
1675 	lkb->lkb_sbflags = 0;
1676 	lkb->lkb_astparam = args->astparam;
1677 
1678 	rv = 0;
1679  out:
1680 	return rv;
1681 }
1682 
1683 /*
1684  * Four stage 4 varieties:
1685  * do_request(), do_convert(), do_unlock(), do_cancel()
1686  * These are called on the master node for the given lock and
1687  * from the central locking logic.
1688  */
1689 
1690 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
1691 {
1692 	int error = 0;
1693 
1694 	if (can_be_granted(r, lkb, 1)) {
1695 		grant_lock(r, lkb);
1696 		queue_cast(r, lkb, 0);
1697 		goto out;
1698 	}
1699 
1700 	if (can_be_queued(lkb)) {
1701 		error = -EINPROGRESS;
1702 		add_lkb(r, lkb, DLM_LKSTS_WAITING);
1703 		send_blocking_asts(r, lkb);
1704 		goto out;
1705 	}
1706 
1707 	error = -EAGAIN;
1708 	if (force_blocking_asts(lkb))
1709 		send_blocking_asts_all(r, lkb);
1710 	queue_cast(r, lkb, -EAGAIN);
1711 
1712  out:
1713 	return error;
1714 }
1715 
1716 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
1717 {
1718 	int error = 0;
1719 
1720 	/* changing an existing lock may allow others to be granted */
1721 
1722 	if (can_be_granted(r, lkb, 1)) {
1723 		grant_lock(r, lkb);
1724 		queue_cast(r, lkb, 0);
1725 		grant_pending_locks(r);
1726 		goto out;
1727 	}
1728 
1729 	if (can_be_queued(lkb)) {
1730 		if (is_demoted(lkb))
1731 			grant_pending_locks(r);
1732 		error = -EINPROGRESS;
1733 		del_lkb(r, lkb);
1734 		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
1735 		send_blocking_asts(r, lkb);
1736 		goto out;
1737 	}
1738 
1739 	error = -EAGAIN;
1740 	if (force_blocking_asts(lkb))
1741 		send_blocking_asts_all(r, lkb);
1742 	queue_cast(r, lkb, -EAGAIN);
1743 
1744  out:
1745 	return error;
1746 }
1747 
1748 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1749 {
1750 	remove_lock(r, lkb);
1751 	queue_cast(r, lkb, -DLM_EUNLOCK);
1752 	grant_pending_locks(r);
1753 	return -DLM_EUNLOCK;
1754 }
1755 
1756 /* FIXME: if revert_lock() finds that the lkb is granted, we should
1757    skip the queue_cast(ECANCEL).  It indicates that the request/convert
1758    completed (and queued a normal ast) just before the cancel; we don't
1759    want to clobber the sb_result for the normal ast with ECANCEL. */
1760 
1761 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
1762 {
1763 	revert_lock(r, lkb);
1764 	queue_cast(r, lkb, -DLM_ECANCEL);
1765 	grant_pending_locks(r);
1766 	return -DLM_ECANCEL;
1767 }
1768 
1769 /*
1770  * Four stage 3 varieties:
1771  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
1772  */
1773 
1774 /* add a new lkb to a possibly new rsb, called by requesting process */
1775 
1776 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1777 {
1778 	int error;
1779 
1780 	/* set_master: sets lkb nodeid from r */
1781 
1782 	error = set_master(r, lkb);
1783 	if (error < 0)
1784 		goto out;
1785 	if (error) {
1786 		error = 0;
1787 		goto out;
1788 	}
1789 
1790 	if (is_remote(r))
1791 		/* receive_request() calls do_request() on remote node */
1792 		error = send_request(r, lkb);
1793 	else
1794 		error = do_request(r, lkb);
1795  out:
1796 	return error;
1797 }
1798 
1799 /* change some property of an existing lkb, e.g. mode */
1800 
1801 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1802 {
1803 	int error;
1804 
1805 	if (is_remote(r))
1806 		/* receive_convert() calls do_convert() on remote node */
1807 		error = send_convert(r, lkb);
1808 	else
1809 		error = do_convert(r, lkb);
1810 
1811 	return error;
1812 }
1813 
1814 /* remove an existing lkb from the granted queue */
1815 
1816 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1817 {
1818 	int error;
1819 
1820 	if (is_remote(r))
1821 		/* receive_unlock() calls do_unlock() on remote node */
1822 		error = send_unlock(r, lkb);
1823 	else
1824 		error = do_unlock(r, lkb);
1825 
1826 	return error;
1827 }
1828 
1829 /* remove an existing lkb from the convert or wait queue */
1830 
1831 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1832 {
1833 	int error;
1834 
1835 	if (is_remote(r))
1836 		/* receive_cancel() calls do_cancel() on remote node */
1837 		error = send_cancel(r, lkb);
1838 	else
1839 		error = do_cancel(r, lkb);
1840 
1841 	return error;
1842 }
1843 
1844 /*
1845  * Four stage 2 varieties:
1846  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
1847  */
1848 
1849 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
1850 			int len, struct dlm_args *args)
1851 {
1852 	struct dlm_rsb *r;
1853 	int error;
1854 
1855 	error = validate_lock_args(ls, lkb, args);
1856 	if (error)
1857 		goto out;
1858 
1859 	error = find_rsb(ls, name, len, R_CREATE, &r);
1860 	if (error)
1861 		goto out;
1862 
1863 	lock_rsb(r);
1864 
1865 	attach_lkb(r, lkb);
1866 	lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
1867 
1868 	error = _request_lock(r, lkb);
1869 
1870 	unlock_rsb(r);
1871 	put_rsb(r);
1872 
1873  out:
1874 	return error;
1875 }
1876 
1877 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1878 			struct dlm_args *args)
1879 {
1880 	struct dlm_rsb *r;
1881 	int error;
1882 
1883 	r = lkb->lkb_resource;
1884 
1885 	hold_rsb(r);
1886 	lock_rsb(r);
1887 
1888 	error = validate_lock_args(ls, lkb, args);
1889 	if (error)
1890 		goto out;
1891 
1892 	error = _convert_lock(r, lkb);
1893  out:
1894 	unlock_rsb(r);
1895 	put_rsb(r);
1896 	return error;
1897 }
1898 
1899 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1900 		       struct dlm_args *args)
1901 {
1902 	struct dlm_rsb *r;
1903 	int error;
1904 
1905 	r = lkb->lkb_resource;
1906 
1907 	hold_rsb(r);
1908 	lock_rsb(r);
1909 
1910 	error = validate_unlock_args(lkb, args);
1911 	if (error)
1912 		goto out;
1913 
1914 	error = _unlock_lock(r, lkb);
1915  out:
1916 	unlock_rsb(r);
1917 	put_rsb(r);
1918 	return error;
1919 }
1920 
1921 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1922 		       struct dlm_args *args)
1923 {
1924 	struct dlm_rsb *r;
1925 	int error;
1926 
1927 	r = lkb->lkb_resource;
1928 
1929 	hold_rsb(r);
1930 	lock_rsb(r);
1931 
1932 	error = validate_unlock_args(lkb, args);
1933 	if (error)
1934 		goto out;
1935 
1936 	error = _cancel_lock(r, lkb);
1937  out:
1938 	unlock_rsb(r);
1939 	put_rsb(r);
1940 	return error;
1941 }
1942 
1943 /*
1944  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
1945  */
1946 
1947 int dlm_lock(dlm_lockspace_t *lockspace,
1948 	     int mode,
1949 	     struct dlm_lksb *lksb,
1950 	     uint32_t flags,
1951 	     void *name,
1952 	     unsigned int namelen,
1953 	     uint32_t parent_lkid,
1954 	     void (*ast) (void *astarg),
1955 	     void *astarg,
1956 	     void (*bast) (void *astarg, int mode))
1957 {
1958 	struct dlm_ls *ls;
1959 	struct dlm_lkb *lkb;
1960 	struct dlm_args args;
1961 	int error, convert = flags & DLM_LKF_CONVERT;
1962 
1963 	ls = dlm_find_lockspace_local(lockspace);
1964 	if (!ls)
1965 		return -EINVAL;
1966 
1967 	lock_recovery(ls);
1968 
1969 	if (convert)
1970 		error = find_lkb(ls, lksb->sb_lkid, &lkb);
1971 	else
1972 		error = create_lkb(ls, &lkb);
1973 
1974 	if (error)
1975 		goto out;
1976 
1977 	error = set_lock_args(mode, lksb, flags, namelen, parent_lkid, ast,
1978 			      astarg, bast, &args);
1979 	if (error)
1980 		goto out_put;
1981 
1982 	if (convert)
1983 		error = convert_lock(ls, lkb, &args);
1984 	else
1985 		error = request_lock(ls, lkb, name, namelen, &args);
1986 
1987 	if (error == -EINPROGRESS)
1988 		error = 0;
1989  out_put:
1990 	if (convert || error)
1991 		__put_lkb(ls, lkb);
1992 	if (error == -EAGAIN)
1993 		error = 0;
1994  out:
1995 	unlock_recovery(ls);
1996 	dlm_put_lockspace(ls);
1997 	return error;
1998 }
1999 
2000 int dlm_unlock(dlm_lockspace_t *lockspace,
2001 	       uint32_t lkid,
2002 	       uint32_t flags,
2003 	       struct dlm_lksb *lksb,
2004 	       void *astarg)
2005 {
2006 	struct dlm_ls *ls;
2007 	struct dlm_lkb *lkb;
2008 	struct dlm_args args;
2009 	int error;
2010 
2011 	ls = dlm_find_lockspace_local(lockspace);
2012 	if (!ls)
2013 		return -EINVAL;
2014 
2015 	lock_recovery(ls);
2016 
2017 	error = find_lkb(ls, lkid, &lkb);
2018 	if (error)
2019 		goto out;
2020 
2021 	error = set_unlock_args(flags, astarg, &args);
2022 	if (error)
2023 		goto out_put;
2024 
2025 	if (flags & DLM_LKF_CANCEL)
2026 		error = cancel_lock(ls, lkb, &args);
2027 	else
2028 		error = unlock_lock(ls, lkb, &args);
2029 
2030 	if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2031 		error = 0;
2032  out_put:
2033 	dlm_put_lkb(lkb);
2034  out:
2035 	unlock_recovery(ls);
2036 	dlm_put_lockspace(ls);
2037 	return error;
2038 }
2039 
2040 /*
2041  * send/receive routines for remote operations and replies
2042  *
2043  * send_args
2044  * send_common
2045  * send_request			receive_request
2046  * send_convert			receive_convert
2047  * send_unlock			receive_unlock
2048  * send_cancel			receive_cancel
2049  * send_grant			receive_grant
2050  * send_bast			receive_bast
2051  * send_lookup			receive_lookup
2052  * send_remove			receive_remove
2053  *
2054  * 				send_common_reply
2055  * receive_request_reply	send_request_reply
2056  * receive_convert_reply	send_convert_reply
2057  * receive_unlock_reply		send_unlock_reply
2058  * receive_cancel_reply		send_cancel_reply
2059  * receive_lookup_reply		send_lookup_reply
2060  */
2061 
2062 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2063 			  int to_nodeid, int mstype,
2064 			  struct dlm_message **ms_ret,
2065 			  struct dlm_mhandle **mh_ret)
2066 {
2067 	struct dlm_message *ms;
2068 	struct dlm_mhandle *mh;
2069 	char *mb;
2070 	int mb_len = sizeof(struct dlm_message);
2071 
2072 	switch (mstype) {
2073 	case DLM_MSG_REQUEST:
2074 	case DLM_MSG_LOOKUP:
2075 	case DLM_MSG_REMOVE:
2076 		mb_len += r->res_length;
2077 		break;
2078 	case DLM_MSG_CONVERT:
2079 	case DLM_MSG_UNLOCK:
2080 	case DLM_MSG_REQUEST_REPLY:
2081 	case DLM_MSG_CONVERT_REPLY:
2082 	case DLM_MSG_GRANT:
2083 		if (lkb && lkb->lkb_lvbptr)
2084 			mb_len += r->res_ls->ls_lvblen;
2085 		break;
2086 	}
2087 
2088 	/* get_buffer gives us a message handle (mh) that we need to
2089 	   pass into lowcomms_commit and a message buffer (mb) that we
2090 	   write our data into */
2091 
2092 	mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_KERNEL, &mb);
2093 	if (!mh)
2094 		return -ENOBUFS;
2095 
2096 	memset(mb, 0, mb_len);
2097 
2098 	ms = (struct dlm_message *) mb;
2099 
2100 	ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2101 	ms->m_header.h_lockspace = r->res_ls->ls_global_id;
2102 	ms->m_header.h_nodeid = dlm_our_nodeid();
2103 	ms->m_header.h_length = mb_len;
2104 	ms->m_header.h_cmd = DLM_MSG;
2105 
2106 	ms->m_type = mstype;
2107 
2108 	*mh_ret = mh;
2109 	*ms_ret = ms;
2110 	return 0;
2111 }
2112 
2113 /* further lowcomms enhancements or alternate implementations may make
2114    the return value from this function useful at some point */
2115 
2116 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2117 {
2118 	dlm_message_out(ms);
2119 	dlm_lowcomms_commit_buffer(mh);
2120 	return 0;
2121 }
2122 
2123 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2124 		      struct dlm_message *ms)
2125 {
2126 	ms->m_nodeid   = lkb->lkb_nodeid;
2127 	ms->m_pid      = lkb->lkb_ownpid;
2128 	ms->m_lkid     = lkb->lkb_id;
2129 	ms->m_remid    = lkb->lkb_remid;
2130 	ms->m_exflags  = lkb->lkb_exflags;
2131 	ms->m_sbflags  = lkb->lkb_sbflags;
2132 	ms->m_flags    = lkb->lkb_flags;
2133 	ms->m_lvbseq   = lkb->lkb_lvbseq;
2134 	ms->m_status   = lkb->lkb_status;
2135 	ms->m_grmode   = lkb->lkb_grmode;
2136 	ms->m_rqmode   = lkb->lkb_rqmode;
2137 	ms->m_hash     = r->res_hash;
2138 
2139 	/* m_result and m_bastmode are set from function args,
2140 	   not from lkb fields */
2141 
2142 	if (lkb->lkb_bastaddr)
2143 		ms->m_asts |= AST_BAST;
2144 	if (lkb->lkb_astaddr)
2145 		ms->m_asts |= AST_COMP;
2146 
2147 	/* compare with switch in create_message; send_remove() doesn't
2148 	   use send_args() */
2149 
2150 	switch (ms->m_type) {
2151 	case DLM_MSG_REQUEST:
2152 	case DLM_MSG_LOOKUP:
2153 		memcpy(ms->m_extra, r->res_name, r->res_length);
2154 		break;
2155 	case DLM_MSG_CONVERT:
2156 	case DLM_MSG_UNLOCK:
2157 	case DLM_MSG_REQUEST_REPLY:
2158 	case DLM_MSG_CONVERT_REPLY:
2159 	case DLM_MSG_GRANT:
2160 		if (!lkb->lkb_lvbptr)
2161 			break;
2162 		memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2163 		break;
2164 	}
2165 }
2166 
2167 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2168 {
2169 	struct dlm_message *ms;
2170 	struct dlm_mhandle *mh;
2171 	int to_nodeid, error;
2172 
2173 	add_to_waiters(lkb, mstype);
2174 
2175 	to_nodeid = r->res_nodeid;
2176 
2177 	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2178 	if (error)
2179 		goto fail;
2180 
2181 	send_args(r, lkb, ms);
2182 
2183 	error = send_message(mh, ms);
2184 	if (error)
2185 		goto fail;
2186 	return 0;
2187 
2188  fail:
2189 	remove_from_waiters(lkb);
2190 	return error;
2191 }
2192 
2193 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2194 {
2195 	return send_common(r, lkb, DLM_MSG_REQUEST);
2196 }
2197 
2198 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2199 {
2200 	int error;
2201 
2202 	error = send_common(r, lkb, DLM_MSG_CONVERT);
2203 
2204 	/* down conversions go without a reply from the master */
2205 	if (!error && down_conversion(lkb)) {
2206 		remove_from_waiters(lkb);
2207 		r->res_ls->ls_stub_ms.m_result = 0;
2208 		r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
2209 		__receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2210 	}
2211 
2212 	return error;
2213 }
2214 
2215 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
2216    MASTER_UNCERTAIN to force the next request on the rsb to confirm
2217    that the master is still correct. */
2218 
2219 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2220 {
2221 	return send_common(r, lkb, DLM_MSG_UNLOCK);
2222 }
2223 
2224 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2225 {
2226 	return send_common(r, lkb, DLM_MSG_CANCEL);
2227 }
2228 
2229 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2230 {
2231 	struct dlm_message *ms;
2232 	struct dlm_mhandle *mh;
2233 	int to_nodeid, error;
2234 
2235 	to_nodeid = lkb->lkb_nodeid;
2236 
2237 	error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2238 	if (error)
2239 		goto out;
2240 
2241 	send_args(r, lkb, ms);
2242 
2243 	ms->m_result = 0;
2244 
2245 	error = send_message(mh, ms);
2246  out:
2247 	return error;
2248 }
2249 
2250 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2251 {
2252 	struct dlm_message *ms;
2253 	struct dlm_mhandle *mh;
2254 	int to_nodeid, error;
2255 
2256 	to_nodeid = lkb->lkb_nodeid;
2257 
2258 	error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2259 	if (error)
2260 		goto out;
2261 
2262 	send_args(r, lkb, ms);
2263 
2264 	ms->m_bastmode = mode;
2265 
2266 	error = send_message(mh, ms);
2267  out:
2268 	return error;
2269 }
2270 
2271 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2272 {
2273 	struct dlm_message *ms;
2274 	struct dlm_mhandle *mh;
2275 	int to_nodeid, error;
2276 
2277 	add_to_waiters(lkb, DLM_MSG_LOOKUP);
2278 
2279 	to_nodeid = dlm_dir_nodeid(r);
2280 
2281 	error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2282 	if (error)
2283 		goto fail;
2284 
2285 	send_args(r, lkb, ms);
2286 
2287 	error = send_message(mh, ms);
2288 	if (error)
2289 		goto fail;
2290 	return 0;
2291 
2292  fail:
2293 	remove_from_waiters(lkb);
2294 	return error;
2295 }
2296 
2297 static int send_remove(struct dlm_rsb *r)
2298 {
2299 	struct dlm_message *ms;
2300 	struct dlm_mhandle *mh;
2301 	int to_nodeid, error;
2302 
2303 	to_nodeid = dlm_dir_nodeid(r);
2304 
2305 	error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2306 	if (error)
2307 		goto out;
2308 
2309 	memcpy(ms->m_extra, r->res_name, r->res_length);
2310 	ms->m_hash = r->res_hash;
2311 
2312 	error = send_message(mh, ms);
2313  out:
2314 	return error;
2315 }
2316 
2317 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2318 			     int mstype, int rv)
2319 {
2320 	struct dlm_message *ms;
2321 	struct dlm_mhandle *mh;
2322 	int to_nodeid, error;
2323 
2324 	to_nodeid = lkb->lkb_nodeid;
2325 
2326 	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2327 	if (error)
2328 		goto out;
2329 
2330 	send_args(r, lkb, ms);
2331 
2332 	ms->m_result = rv;
2333 
2334 	error = send_message(mh, ms);
2335  out:
2336 	return error;
2337 }
2338 
2339 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2340 {
2341 	return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
2342 }
2343 
2344 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2345 {
2346 	return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
2347 }
2348 
2349 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2350 {
2351 	return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
2352 }
2353 
2354 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2355 {
2356 	return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
2357 }
2358 
2359 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
2360 			     int ret_nodeid, int rv)
2361 {
2362 	struct dlm_rsb *r = &ls->ls_stub_rsb;
2363 	struct dlm_message *ms;
2364 	struct dlm_mhandle *mh;
2365 	int error, nodeid = ms_in->m_header.h_nodeid;
2366 
2367 	error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
2368 	if (error)
2369 		goto out;
2370 
2371 	ms->m_lkid = ms_in->m_lkid;
2372 	ms->m_result = rv;
2373 	ms->m_nodeid = ret_nodeid;
2374 
2375 	error = send_message(mh, ms);
2376  out:
2377 	return error;
2378 }
2379 
2380 /* which args we save from a received message depends heavily on the type
2381    of message, unlike the send side where we can safely send everything about
2382    the lkb for any type of message */
2383 
2384 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
2385 {
2386 	lkb->lkb_exflags = ms->m_exflags;
2387 	lkb->lkb_sbflags = ms->m_sbflags;
2388 	lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2389 		         (ms->m_flags & 0x0000FFFF);
2390 }
2391 
2392 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2393 {
2394 	lkb->lkb_sbflags = ms->m_sbflags;
2395 	lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2396 		         (ms->m_flags & 0x0000FFFF);
2397 }
2398 
2399 static int receive_extralen(struct dlm_message *ms)
2400 {
2401 	return (ms->m_header.h_length - sizeof(struct dlm_message));
2402 }
2403 
2404 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
2405 		       struct dlm_message *ms)
2406 {
2407 	int len;
2408 
2409 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2410 		if (!lkb->lkb_lvbptr)
2411 			lkb->lkb_lvbptr = allocate_lvb(ls);
2412 		if (!lkb->lkb_lvbptr)
2413 			return -ENOMEM;
2414 		len = receive_extralen(ms);
2415 		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2416 	}
2417 	return 0;
2418 }
2419 
2420 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2421 				struct dlm_message *ms)
2422 {
2423 	lkb->lkb_nodeid = ms->m_header.h_nodeid;
2424 	lkb->lkb_ownpid = ms->m_pid;
2425 	lkb->lkb_remid = ms->m_lkid;
2426 	lkb->lkb_grmode = DLM_LOCK_IV;
2427 	lkb->lkb_rqmode = ms->m_rqmode;
2428 	lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST);
2429 	lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP);
2430 
2431 	DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb););
2432 
2433 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2434 		/* lkb was just created so there won't be an lvb yet */
2435 		lkb->lkb_lvbptr = allocate_lvb(ls);
2436 		if (!lkb->lkb_lvbptr)
2437 			return -ENOMEM;
2438 	}
2439 
2440 	return 0;
2441 }
2442 
2443 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2444 				struct dlm_message *ms)
2445 {
2446 	if (lkb->lkb_nodeid != ms->m_header.h_nodeid) {
2447 		log_error(ls, "convert_args nodeid %d %d lkid %x %x",
2448 			  lkb->lkb_nodeid, ms->m_header.h_nodeid,
2449 			  lkb->lkb_id, lkb->lkb_remid);
2450 		return -EINVAL;
2451 	}
2452 
2453 	if (!is_master_copy(lkb))
2454 		return -EINVAL;
2455 
2456 	if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2457 		return -EBUSY;
2458 
2459 	if (receive_lvb(ls, lkb, ms))
2460 		return -ENOMEM;
2461 
2462 	lkb->lkb_rqmode = ms->m_rqmode;
2463 	lkb->lkb_lvbseq = ms->m_lvbseq;
2464 
2465 	return 0;
2466 }
2467 
2468 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2469 			       struct dlm_message *ms)
2470 {
2471 	if (!is_master_copy(lkb))
2472 		return -EINVAL;
2473 	if (receive_lvb(ls, lkb, ms))
2474 		return -ENOMEM;
2475 	return 0;
2476 }
2477 
2478 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
2479    uses to send a reply and that the remote end uses to process the reply. */
2480 
2481 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
2482 {
2483 	struct dlm_lkb *lkb = &ls->ls_stub_lkb;
2484 	lkb->lkb_nodeid = ms->m_header.h_nodeid;
2485 	lkb->lkb_remid = ms->m_lkid;
2486 }
2487 
2488 static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
2489 {
2490 	struct dlm_lkb *lkb;
2491 	struct dlm_rsb *r;
2492 	int error, namelen;
2493 
2494 	error = create_lkb(ls, &lkb);
2495 	if (error)
2496 		goto fail;
2497 
2498 	receive_flags(lkb, ms);
2499 	lkb->lkb_flags |= DLM_IFL_MSTCPY;
2500 	error = receive_request_args(ls, lkb, ms);
2501 	if (error) {
2502 		__put_lkb(ls, lkb);
2503 		goto fail;
2504 	}
2505 
2506 	namelen = receive_extralen(ms);
2507 
2508 	error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
2509 	if (error) {
2510 		__put_lkb(ls, lkb);
2511 		goto fail;
2512 	}
2513 
2514 	lock_rsb(r);
2515 
2516 	attach_lkb(r, lkb);
2517 	error = do_request(r, lkb);
2518 	send_request_reply(r, lkb, error);
2519 
2520 	unlock_rsb(r);
2521 	put_rsb(r);
2522 
2523 	if (error == -EINPROGRESS)
2524 		error = 0;
2525 	if (error)
2526 		dlm_put_lkb(lkb);
2527 	return;
2528 
2529  fail:
2530 	setup_stub_lkb(ls, ms);
2531 	send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2532 }
2533 
2534 static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
2535 {
2536 	struct dlm_lkb *lkb;
2537 	struct dlm_rsb *r;
2538 	int error, reply = 1;
2539 
2540 	error = find_lkb(ls, ms->m_remid, &lkb);
2541 	if (error)
2542 		goto fail;
2543 
2544 	r = lkb->lkb_resource;
2545 
2546 	hold_rsb(r);
2547 	lock_rsb(r);
2548 
2549 	receive_flags(lkb, ms);
2550 	error = receive_convert_args(ls, lkb, ms);
2551 	if (error)
2552 		goto out;
2553 	reply = !down_conversion(lkb);
2554 
2555 	error = do_convert(r, lkb);
2556  out:
2557 	if (reply)
2558 		send_convert_reply(r, lkb, error);
2559 
2560 	unlock_rsb(r);
2561 	put_rsb(r);
2562 	dlm_put_lkb(lkb);
2563 	return;
2564 
2565  fail:
2566 	setup_stub_lkb(ls, ms);
2567 	send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2568 }
2569 
2570 static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
2571 {
2572 	struct dlm_lkb *lkb;
2573 	struct dlm_rsb *r;
2574 	int error;
2575 
2576 	error = find_lkb(ls, ms->m_remid, &lkb);
2577 	if (error)
2578 		goto fail;
2579 
2580 	r = lkb->lkb_resource;
2581 
2582 	hold_rsb(r);
2583 	lock_rsb(r);
2584 
2585 	receive_flags(lkb, ms);
2586 	error = receive_unlock_args(ls, lkb, ms);
2587 	if (error)
2588 		goto out;
2589 
2590 	error = do_unlock(r, lkb);
2591  out:
2592 	send_unlock_reply(r, lkb, error);
2593 
2594 	unlock_rsb(r);
2595 	put_rsb(r);
2596 	dlm_put_lkb(lkb);
2597 	return;
2598 
2599  fail:
2600 	setup_stub_lkb(ls, ms);
2601 	send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2602 }
2603 
2604 static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
2605 {
2606 	struct dlm_lkb *lkb;
2607 	struct dlm_rsb *r;
2608 	int error;
2609 
2610 	error = find_lkb(ls, ms->m_remid, &lkb);
2611 	if (error)
2612 		goto fail;
2613 
2614 	receive_flags(lkb, ms);
2615 
2616 	r = lkb->lkb_resource;
2617 
2618 	hold_rsb(r);
2619 	lock_rsb(r);
2620 
2621 	error = do_cancel(r, lkb);
2622 	send_cancel_reply(r, lkb, error);
2623 
2624 	unlock_rsb(r);
2625 	put_rsb(r);
2626 	dlm_put_lkb(lkb);
2627 	return;
2628 
2629  fail:
2630 	setup_stub_lkb(ls, ms);
2631 	send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2632 }
2633 
2634 static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
2635 {
2636 	struct dlm_lkb *lkb;
2637 	struct dlm_rsb *r;
2638 	int error;
2639 
2640 	error = find_lkb(ls, ms->m_remid, &lkb);
2641 	if (error) {
2642 		log_error(ls, "receive_grant no lkb");
2643 		return;
2644 	}
2645 	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2646 
2647 	r = lkb->lkb_resource;
2648 
2649 	hold_rsb(r);
2650 	lock_rsb(r);
2651 
2652 	receive_flags_reply(lkb, ms);
2653 	grant_lock_pc(r, lkb, ms);
2654 	queue_cast(r, lkb, 0);
2655 
2656 	unlock_rsb(r);
2657 	put_rsb(r);
2658 	dlm_put_lkb(lkb);
2659 }
2660 
2661 static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
2662 {
2663 	struct dlm_lkb *lkb;
2664 	struct dlm_rsb *r;
2665 	int error;
2666 
2667 	error = find_lkb(ls, ms->m_remid, &lkb);
2668 	if (error) {
2669 		log_error(ls, "receive_bast no lkb");
2670 		return;
2671 	}
2672 	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2673 
2674 	r = lkb->lkb_resource;
2675 
2676 	hold_rsb(r);
2677 	lock_rsb(r);
2678 
2679 	queue_bast(r, lkb, ms->m_bastmode);
2680 
2681 	unlock_rsb(r);
2682 	put_rsb(r);
2683 	dlm_put_lkb(lkb);
2684 }
2685 
2686 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
2687 {
2688 	int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
2689 
2690 	from_nodeid = ms->m_header.h_nodeid;
2691 	our_nodeid = dlm_our_nodeid();
2692 
2693 	len = receive_extralen(ms);
2694 
2695 	dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
2696 	if (dir_nodeid != our_nodeid) {
2697 		log_error(ls, "lookup dir_nodeid %d from %d",
2698 			  dir_nodeid, from_nodeid);
2699 		error = -EINVAL;
2700 		ret_nodeid = -1;
2701 		goto out;
2702 	}
2703 
2704 	error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
2705 
2706 	/* Optimization: we're master so treat lookup as a request */
2707 	if (!error && ret_nodeid == our_nodeid) {
2708 		receive_request(ls, ms);
2709 		return;
2710 	}
2711  out:
2712 	send_lookup_reply(ls, ms, ret_nodeid, error);
2713 }
2714 
2715 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
2716 {
2717 	int len, dir_nodeid, from_nodeid;
2718 
2719 	from_nodeid = ms->m_header.h_nodeid;
2720 
2721 	len = receive_extralen(ms);
2722 
2723 	dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
2724 	if (dir_nodeid != dlm_our_nodeid()) {
2725 		log_error(ls, "remove dir entry dir_nodeid %d from %d",
2726 			  dir_nodeid, from_nodeid);
2727 		return;
2728 	}
2729 
2730 	dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
2731 }
2732 
2733 static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
2734 {
2735 	struct dlm_lkb *lkb;
2736 	struct dlm_rsb *r;
2737 	int error, mstype;
2738 
2739 	error = find_lkb(ls, ms->m_remid, &lkb);
2740 	if (error) {
2741 		log_error(ls, "receive_request_reply no lkb");
2742 		return;
2743 	}
2744 	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2745 
2746 	mstype = lkb->lkb_wait_type;
2747 	error = remove_from_waiters(lkb);
2748 	if (error) {
2749 		log_error(ls, "receive_request_reply not on waiters");
2750 		goto out;
2751 	}
2752 
2753 	/* this is the value returned from do_request() on the master */
2754 	error = ms->m_result;
2755 
2756 	r = lkb->lkb_resource;
2757 	hold_rsb(r);
2758 	lock_rsb(r);
2759 
2760 	/* Optimization: the dir node was also the master, so it took our
2761 	   lookup as a request and sent request reply instead of lookup reply */
2762 	if (mstype == DLM_MSG_LOOKUP) {
2763 		r->res_nodeid = ms->m_header.h_nodeid;
2764 		lkb->lkb_nodeid = r->res_nodeid;
2765 	}
2766 
2767 	switch (error) {
2768 	case -EAGAIN:
2769 		/* request would block (be queued) on remote master;
2770 		   the unhold undoes the original ref from create_lkb()
2771 		   so it leads to the lkb being freed */
2772 		queue_cast(r, lkb, -EAGAIN);
2773 		confirm_master(r, -EAGAIN);
2774 		unhold_lkb(lkb);
2775 		break;
2776 
2777 	case -EINPROGRESS:
2778 	case 0:
2779 		/* request was queued or granted on remote master */
2780 		receive_flags_reply(lkb, ms);
2781 		lkb->lkb_remid = ms->m_lkid;
2782 		if (error)
2783 			add_lkb(r, lkb, DLM_LKSTS_WAITING);
2784 		else {
2785 			grant_lock_pc(r, lkb, ms);
2786 			queue_cast(r, lkb, 0);
2787 		}
2788 		confirm_master(r, error);
2789 		break;
2790 
2791 	case -EBADR:
2792 	case -ENOTBLK:
2793 		/* find_rsb failed to find rsb or rsb wasn't master */
2794 		r->res_nodeid = -1;
2795 		lkb->lkb_nodeid = -1;
2796 		_request_lock(r, lkb);
2797 		break;
2798 
2799 	default:
2800 		log_error(ls, "receive_request_reply error %d", error);
2801 	}
2802 
2803 	unlock_rsb(r);
2804 	put_rsb(r);
2805  out:
2806 	dlm_put_lkb(lkb);
2807 }
2808 
2809 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2810 				    struct dlm_message *ms)
2811 {
2812 	int error = ms->m_result;
2813 
2814 	/* this is the value returned from do_convert() on the master */
2815 
2816 	switch (error) {
2817 	case -EAGAIN:
2818 		/* convert would block (be queued) on remote master */
2819 		queue_cast(r, lkb, -EAGAIN);
2820 		break;
2821 
2822 	case -EINPROGRESS:
2823 		/* convert was queued on remote master */
2824 		del_lkb(r, lkb);
2825 		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2826 		break;
2827 
2828 	case 0:
2829 		/* convert was granted on remote master */
2830 		receive_flags_reply(lkb, ms);
2831 		grant_lock_pc(r, lkb, ms);
2832 		queue_cast(r, lkb, 0);
2833 		break;
2834 
2835 	default:
2836 		log_error(r->res_ls, "receive_convert_reply error %d", error);
2837 	}
2838 }
2839 
2840 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2841 {
2842 	struct dlm_rsb *r = lkb->lkb_resource;
2843 
2844 	hold_rsb(r);
2845 	lock_rsb(r);
2846 
2847 	__receive_convert_reply(r, lkb, ms);
2848 
2849 	unlock_rsb(r);
2850 	put_rsb(r);
2851 }
2852 
2853 static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
2854 {
2855 	struct dlm_lkb *lkb;
2856 	int error;
2857 
2858 	error = find_lkb(ls, ms->m_remid, &lkb);
2859 	if (error) {
2860 		log_error(ls, "receive_convert_reply no lkb");
2861 		return;
2862 	}
2863 	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2864 
2865 	error = remove_from_waiters(lkb);
2866 	if (error) {
2867 		log_error(ls, "receive_convert_reply not on waiters");
2868 		goto out;
2869 	}
2870 
2871 	_receive_convert_reply(lkb, ms);
2872  out:
2873 	dlm_put_lkb(lkb);
2874 }
2875 
2876 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2877 {
2878 	struct dlm_rsb *r = lkb->lkb_resource;
2879 	int error = ms->m_result;
2880 
2881 	hold_rsb(r);
2882 	lock_rsb(r);
2883 
2884 	/* this is the value returned from do_unlock() on the master */
2885 
2886 	switch (error) {
2887 	case -DLM_EUNLOCK:
2888 		receive_flags_reply(lkb, ms);
2889 		remove_lock_pc(r, lkb);
2890 		queue_cast(r, lkb, -DLM_EUNLOCK);
2891 		break;
2892 	default:
2893 		log_error(r->res_ls, "receive_unlock_reply error %d", error);
2894 	}
2895 
2896 	unlock_rsb(r);
2897 	put_rsb(r);
2898 }
2899 
2900 static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
2901 {
2902 	struct dlm_lkb *lkb;
2903 	int error;
2904 
2905 	error = find_lkb(ls, ms->m_remid, &lkb);
2906 	if (error) {
2907 		log_error(ls, "receive_unlock_reply no lkb");
2908 		return;
2909 	}
2910 	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2911 
2912 	error = remove_from_waiters(lkb);
2913 	if (error) {
2914 		log_error(ls, "receive_unlock_reply not on waiters");
2915 		goto out;
2916 	}
2917 
2918 	_receive_unlock_reply(lkb, ms);
2919  out:
2920 	dlm_put_lkb(lkb);
2921 }
2922 
2923 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2924 {
2925 	struct dlm_rsb *r = lkb->lkb_resource;
2926 	int error = ms->m_result;
2927 
2928 	hold_rsb(r);
2929 	lock_rsb(r);
2930 
2931 	/* this is the value returned from do_cancel() on the master */
2932 
2933 	switch (error) {
2934 	case -DLM_ECANCEL:
2935 		receive_flags_reply(lkb, ms);
2936 		revert_lock_pc(r, lkb);
2937 		queue_cast(r, lkb, -DLM_ECANCEL);
2938 		break;
2939 	default:
2940 		log_error(r->res_ls, "receive_cancel_reply error %d", error);
2941 	}
2942 
2943 	unlock_rsb(r);
2944 	put_rsb(r);
2945 }
2946 
2947 static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
2948 {
2949 	struct dlm_lkb *lkb;
2950 	int error;
2951 
2952 	error = find_lkb(ls, ms->m_remid, &lkb);
2953 	if (error) {
2954 		log_error(ls, "receive_cancel_reply no lkb");
2955 		return;
2956 	}
2957 	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2958 
2959 	error = remove_from_waiters(lkb);
2960 	if (error) {
2961 		log_error(ls, "receive_cancel_reply not on waiters");
2962 		goto out;
2963 	}
2964 
2965 	_receive_cancel_reply(lkb, ms);
2966  out:
2967 	dlm_put_lkb(lkb);
2968 }
2969 
2970 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
2971 {
2972 	struct dlm_lkb *lkb;
2973 	struct dlm_rsb *r;
2974 	int error, ret_nodeid;
2975 
2976 	error = find_lkb(ls, ms->m_lkid, &lkb);
2977 	if (error) {
2978 		log_error(ls, "receive_lookup_reply no lkb");
2979 		return;
2980 	}
2981 
2982 	error = remove_from_waiters(lkb);
2983 	if (error) {
2984 		log_error(ls, "receive_lookup_reply not on waiters");
2985 		goto out;
2986 	}
2987 
2988 	/* this is the value returned by dlm_dir_lookup on dir node
2989 	   FIXME: will a non-zero error ever be returned? */
2990 	error = ms->m_result;
2991 
2992 	r = lkb->lkb_resource;
2993 	hold_rsb(r);
2994 	lock_rsb(r);
2995 
2996 	ret_nodeid = ms->m_nodeid;
2997 	if (ret_nodeid == dlm_our_nodeid()) {
2998 		r->res_nodeid = 0;
2999 		ret_nodeid = 0;
3000 		r->res_first_lkid = 0;
3001 	} else {
3002 		/* set_master() will copy res_nodeid to lkb_nodeid */
3003 		r->res_nodeid = ret_nodeid;
3004 	}
3005 
3006 	_request_lock(r, lkb);
3007 
3008 	if (!ret_nodeid)
3009 		process_lookup_list(r);
3010 
3011 	unlock_rsb(r);
3012 	put_rsb(r);
3013  out:
3014 	dlm_put_lkb(lkb);
3015 }
3016 
3017 int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
3018 {
3019 	struct dlm_message *ms = (struct dlm_message *) hd;
3020 	struct dlm_ls *ls;
3021 	int error;
3022 
3023 	if (!recovery)
3024 		dlm_message_in(ms);
3025 
3026 	ls = dlm_find_lockspace_global(hd->h_lockspace);
3027 	if (!ls) {
3028 		log_print("drop message %d from %d for unknown lockspace %d",
3029 			  ms->m_type, nodeid, hd->h_lockspace);
3030 		return -EINVAL;
3031 	}
3032 
3033 	/* recovery may have just ended leaving a bunch of backed-up requests
3034 	   in the requestqueue; wait while dlm_recoverd clears them */
3035 
3036 	if (!recovery)
3037 		dlm_wait_requestqueue(ls);
3038 
3039 	/* recovery may have just started while there were a bunch of
3040 	   in-flight requests -- save them in requestqueue to be processed
3041 	   after recovery.  we can't let dlm_recvd block on the recovery
3042 	   lock.  if dlm_recoverd is calling this function to clear the
3043 	   requestqueue, it needs to be interrupted (-EINTR) if another
3044 	   recovery operation is starting. */
3045 
3046 	while (1) {
3047 		if (dlm_locking_stopped(ls)) {
3048 			if (recovery) {
3049 				error = -EINTR;
3050 				goto out;
3051 			}
3052 			error = dlm_add_requestqueue(ls, nodeid, hd);
3053 			if (error == -EAGAIN)
3054 				continue;
3055 			else {
3056 				error = -EINTR;
3057 				goto out;
3058 			}
3059 		}
3060 
3061 		if (lock_recovery_try(ls))
3062 			break;
3063 		schedule();
3064 	}
3065 
3066 	switch (ms->m_type) {
3067 
3068 	/* messages sent to a master node */
3069 
3070 	case DLM_MSG_REQUEST:
3071 		receive_request(ls, ms);
3072 		break;
3073 
3074 	case DLM_MSG_CONVERT:
3075 		receive_convert(ls, ms);
3076 		break;
3077 
3078 	case DLM_MSG_UNLOCK:
3079 		receive_unlock(ls, ms);
3080 		break;
3081 
3082 	case DLM_MSG_CANCEL:
3083 		receive_cancel(ls, ms);
3084 		break;
3085 
3086 	/* messages sent from a master node (replies to above) */
3087 
3088 	case DLM_MSG_REQUEST_REPLY:
3089 		receive_request_reply(ls, ms);
3090 		break;
3091 
3092 	case DLM_MSG_CONVERT_REPLY:
3093 		receive_convert_reply(ls, ms);
3094 		break;
3095 
3096 	case DLM_MSG_UNLOCK_REPLY:
3097 		receive_unlock_reply(ls, ms);
3098 		break;
3099 
3100 	case DLM_MSG_CANCEL_REPLY:
3101 		receive_cancel_reply(ls, ms);
3102 		break;
3103 
3104 	/* messages sent from a master node (only two types of async msg) */
3105 
3106 	case DLM_MSG_GRANT:
3107 		receive_grant(ls, ms);
3108 		break;
3109 
3110 	case DLM_MSG_BAST:
3111 		receive_bast(ls, ms);
3112 		break;
3113 
3114 	/* messages sent to a dir node */
3115 
3116 	case DLM_MSG_LOOKUP:
3117 		receive_lookup(ls, ms);
3118 		break;
3119 
3120 	case DLM_MSG_REMOVE:
3121 		receive_remove(ls, ms);
3122 		break;
3123 
3124 	/* messages sent from a dir node (remove has no reply) */
3125 
3126 	case DLM_MSG_LOOKUP_REPLY:
3127 		receive_lookup_reply(ls, ms);
3128 		break;
3129 
3130 	default:
3131 		log_error(ls, "unknown message type %d", ms->m_type);
3132 	}
3133 
3134 	unlock_recovery(ls);
3135  out:
3136 	dlm_put_lockspace(ls);
3137 	dlm_astd_wake();
3138 	return 0;
3139 }
3140 
3141 
3142 /*
3143  * Recovery related
3144  */
3145 
3146 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3147 {
3148 	if (middle_conversion(lkb)) {
3149 		hold_lkb(lkb);
3150 		ls->ls_stub_ms.m_result = -EINPROGRESS;
3151 		ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3152 		_remove_from_waiters(lkb);
3153 		_receive_convert_reply(lkb, &ls->ls_stub_ms);
3154 
3155 		/* Same special case as in receive_rcom_lock_args() */
3156 		lkb->lkb_grmode = DLM_LOCK_IV;
3157 		rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
3158 		unhold_lkb(lkb);
3159 
3160 	} else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
3161 		lkb->lkb_flags |= DLM_IFL_RESEND;
3162 	}
3163 
3164 	/* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
3165 	   conversions are async; there's no reply from the remote master */
3166 }
3167 
3168 /* A waiting lkb needs recovery if the master node has failed, or
3169    the master node is changing (only when no directory is used) */
3170 
3171 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
3172 {
3173 	if (dlm_is_removed(ls, lkb->lkb_nodeid))
3174 		return 1;
3175 
3176 	if (!dlm_no_directory(ls))
3177 		return 0;
3178 
3179 	if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
3180 		return 1;
3181 
3182 	return 0;
3183 }
3184 
3185 /* Recovery for locks that are waiting for replies from nodes that are now
3186    gone.  We can just complete unlocks and cancels by faking a reply from the
3187    dead node.  Requests and up-conversions we flag to be resent after
3188    recovery.  Down-conversions can just be completed with a fake reply like
3189    unlocks.  Conversions between PR and CW need special attention. */
3190 
3191 void dlm_recover_waiters_pre(struct dlm_ls *ls)
3192 {
3193 	struct dlm_lkb *lkb, *safe;
3194 
3195 	mutex_lock(&ls->ls_waiters_mutex);
3196 
3197 	list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
3198 		log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
3199 			  lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
3200 
3201 		/* all outstanding lookups, regardless of destination  will be
3202 		   resent after recovery is done */
3203 
3204 		if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
3205 			lkb->lkb_flags |= DLM_IFL_RESEND;
3206 			continue;
3207 		}
3208 
3209 		if (!waiter_needs_recovery(ls, lkb))
3210 			continue;
3211 
3212 		switch (lkb->lkb_wait_type) {
3213 
3214 		case DLM_MSG_REQUEST:
3215 			lkb->lkb_flags |= DLM_IFL_RESEND;
3216 			break;
3217 
3218 		case DLM_MSG_CONVERT:
3219 			recover_convert_waiter(ls, lkb);
3220 			break;
3221 
3222 		case DLM_MSG_UNLOCK:
3223 			hold_lkb(lkb);
3224 			ls->ls_stub_ms.m_result = -DLM_EUNLOCK;
3225 			ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3226 			_remove_from_waiters(lkb);
3227 			_receive_unlock_reply(lkb, &ls->ls_stub_ms);
3228 			dlm_put_lkb(lkb);
3229 			break;
3230 
3231 		case DLM_MSG_CANCEL:
3232 			hold_lkb(lkb);
3233 			ls->ls_stub_ms.m_result = -DLM_ECANCEL;
3234 			ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3235 			_remove_from_waiters(lkb);
3236 			_receive_cancel_reply(lkb, &ls->ls_stub_ms);
3237 			dlm_put_lkb(lkb);
3238 			break;
3239 
3240 		default:
3241 			log_error(ls, "invalid lkb wait_type %d",
3242 				  lkb->lkb_wait_type);
3243 		}
3244 		schedule();
3245 	}
3246 	mutex_unlock(&ls->ls_waiters_mutex);
3247 }
3248 
3249 static int remove_resend_waiter(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
3250 {
3251 	struct dlm_lkb *lkb;
3252 	int rv = 0;
3253 
3254 	mutex_lock(&ls->ls_waiters_mutex);
3255 	list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
3256 		if (lkb->lkb_flags & DLM_IFL_RESEND) {
3257 			rv = lkb->lkb_wait_type;
3258 			_remove_from_waiters(lkb);
3259 			lkb->lkb_flags &= ~DLM_IFL_RESEND;
3260 			break;
3261 		}
3262 	}
3263 	mutex_unlock(&ls->ls_waiters_mutex);
3264 
3265 	if (!rv)
3266 		lkb = NULL;
3267 	*lkb_ret = lkb;
3268 	return rv;
3269 }
3270 
3271 /* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
3272    master or dir-node for r.  Processing the lkb may result in it being placed
3273    back on waiters. */
3274 
3275 int dlm_recover_waiters_post(struct dlm_ls *ls)
3276 {
3277 	struct dlm_lkb *lkb;
3278 	struct dlm_rsb *r;
3279 	int error = 0, mstype;
3280 
3281 	while (1) {
3282 		if (dlm_locking_stopped(ls)) {
3283 			log_debug(ls, "recover_waiters_post aborted");
3284 			error = -EINTR;
3285 			break;
3286 		}
3287 
3288 		mstype = remove_resend_waiter(ls, &lkb);
3289 		if (!mstype)
3290 			break;
3291 
3292 		r = lkb->lkb_resource;
3293 
3294 		log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
3295 			  lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
3296 
3297 		switch (mstype) {
3298 
3299 		case DLM_MSG_LOOKUP:
3300 			hold_rsb(r);
3301 			lock_rsb(r);
3302 			_request_lock(r, lkb);
3303 			if (is_master(r))
3304 				confirm_master(r, 0);
3305 			unlock_rsb(r);
3306 			put_rsb(r);
3307 			break;
3308 
3309 		case DLM_MSG_REQUEST:
3310 			hold_rsb(r);
3311 			lock_rsb(r);
3312 			_request_lock(r, lkb);
3313 			if (is_master(r))
3314 				confirm_master(r, 0);
3315 			unlock_rsb(r);
3316 			put_rsb(r);
3317 			break;
3318 
3319 		case DLM_MSG_CONVERT:
3320 			hold_rsb(r);
3321 			lock_rsb(r);
3322 			_convert_lock(r, lkb);
3323 			unlock_rsb(r);
3324 			put_rsb(r);
3325 			break;
3326 
3327 		default:
3328 			log_error(ls, "recover_waiters_post type %d", mstype);
3329 		}
3330 	}
3331 
3332 	return error;
3333 }
3334 
3335 static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
3336 			int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
3337 {
3338 	struct dlm_ls *ls = r->res_ls;
3339 	struct dlm_lkb *lkb, *safe;
3340 
3341 	list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
3342 		if (test(ls, lkb)) {
3343 			rsb_set_flag(r, RSB_LOCKS_PURGED);
3344 			del_lkb(r, lkb);
3345 			/* this put should free the lkb */
3346 			if (!dlm_put_lkb(lkb))
3347 				log_error(ls, "purged lkb not released");
3348 		}
3349 	}
3350 }
3351 
3352 static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3353 {
3354 	return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
3355 }
3356 
3357 static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3358 {
3359 	return is_master_copy(lkb);
3360 }
3361 
3362 static void purge_dead_locks(struct dlm_rsb *r)
3363 {
3364 	purge_queue(r, &r->res_grantqueue, &purge_dead_test);
3365 	purge_queue(r, &r->res_convertqueue, &purge_dead_test);
3366 	purge_queue(r, &r->res_waitqueue, &purge_dead_test);
3367 }
3368 
3369 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
3370 {
3371 	purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
3372 	purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
3373 	purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
3374 }
3375 
3376 /* Get rid of locks held by nodes that are gone. */
3377 
3378 int dlm_purge_locks(struct dlm_ls *ls)
3379 {
3380 	struct dlm_rsb *r;
3381 
3382 	log_debug(ls, "dlm_purge_locks");
3383 
3384 	down_write(&ls->ls_root_sem);
3385 	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
3386 		hold_rsb(r);
3387 		lock_rsb(r);
3388 		if (is_master(r))
3389 			purge_dead_locks(r);
3390 		unlock_rsb(r);
3391 		unhold_rsb(r);
3392 
3393 		schedule();
3394 	}
3395 	up_write(&ls->ls_root_sem);
3396 
3397 	return 0;
3398 }
3399 
3400 static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
3401 {
3402 	struct dlm_rsb *r, *r_ret = NULL;
3403 
3404 	read_lock(&ls->ls_rsbtbl[bucket].lock);
3405 	list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
3406 		if (!rsb_flag(r, RSB_LOCKS_PURGED))
3407 			continue;
3408 		hold_rsb(r);
3409 		rsb_clear_flag(r, RSB_LOCKS_PURGED);
3410 		r_ret = r;
3411 		break;
3412 	}
3413 	read_unlock(&ls->ls_rsbtbl[bucket].lock);
3414 	return r_ret;
3415 }
3416 
3417 void dlm_grant_after_purge(struct dlm_ls *ls)
3418 {
3419 	struct dlm_rsb *r;
3420 	int bucket = 0;
3421 
3422 	while (1) {
3423 		r = find_purged_rsb(ls, bucket);
3424 		if (!r) {
3425 			if (bucket == ls->ls_rsbtbl_size - 1)
3426 				break;
3427 			bucket++;
3428 			continue;
3429 		}
3430 		lock_rsb(r);
3431 		if (is_master(r)) {
3432 			grant_pending_locks(r);
3433 			confirm_master(r, 0);
3434 		}
3435 		unlock_rsb(r);
3436 		put_rsb(r);
3437 		schedule();
3438 	}
3439 }
3440 
3441 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
3442 					 uint32_t remid)
3443 {
3444 	struct dlm_lkb *lkb;
3445 
3446 	list_for_each_entry(lkb, head, lkb_statequeue) {
3447 		if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
3448 			return lkb;
3449 	}
3450 	return NULL;
3451 }
3452 
3453 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
3454 				    uint32_t remid)
3455 {
3456 	struct dlm_lkb *lkb;
3457 
3458 	lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
3459 	if (lkb)
3460 		return lkb;
3461 	lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
3462 	if (lkb)
3463 		return lkb;
3464 	lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
3465 	if (lkb)
3466 		return lkb;
3467 	return NULL;
3468 }
3469 
3470 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3471 				  struct dlm_rsb *r, struct dlm_rcom *rc)
3472 {
3473 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3474 	int lvblen;
3475 
3476 	lkb->lkb_nodeid = rc->rc_header.h_nodeid;
3477 	lkb->lkb_ownpid = rl->rl_ownpid;
3478 	lkb->lkb_remid = rl->rl_lkid;
3479 	lkb->lkb_exflags = rl->rl_exflags;
3480 	lkb->lkb_flags = rl->rl_flags & 0x0000FFFF;
3481 	lkb->lkb_flags |= DLM_IFL_MSTCPY;
3482 	lkb->lkb_lvbseq = rl->rl_lvbseq;
3483 	lkb->lkb_rqmode = rl->rl_rqmode;
3484 	lkb->lkb_grmode = rl->rl_grmode;
3485 	/* don't set lkb_status because add_lkb wants to itself */
3486 
3487 	lkb->lkb_bastaddr = (void *) (long) (rl->rl_asts & AST_BAST);
3488 	lkb->lkb_astaddr = (void *) (long) (rl->rl_asts & AST_COMP);
3489 
3490 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3491 		lkb->lkb_lvbptr = allocate_lvb(ls);
3492 		if (!lkb->lkb_lvbptr)
3493 			return -ENOMEM;
3494 		lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
3495 			 sizeof(struct rcom_lock);
3496 		memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
3497 	}
3498 
3499 	/* Conversions between PR and CW (middle modes) need special handling.
3500 	   The real granted mode of these converting locks cannot be determined
3501 	   until all locks have been rebuilt on the rsb (recover_conversion) */
3502 
3503 	if (rl->rl_wait_type == DLM_MSG_CONVERT && middle_conversion(lkb)) {
3504 		rl->rl_status = DLM_LKSTS_CONVERT;
3505 		lkb->lkb_grmode = DLM_LOCK_IV;
3506 		rsb_set_flag(r, RSB_RECOVER_CONVERT);
3507 	}
3508 
3509 	return 0;
3510 }
3511 
3512 /* This lkb may have been recovered in a previous aborted recovery so we need
3513    to check if the rsb already has an lkb with the given remote nodeid/lkid.
3514    If so we just send back a standard reply.  If not, we create a new lkb with
3515    the given values and send back our lkid.  We send back our lkid by sending
3516    back the rcom_lock struct we got but with the remid field filled in. */
3517 
3518 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3519 {
3520 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3521 	struct dlm_rsb *r;
3522 	struct dlm_lkb *lkb;
3523 	int error;
3524 
3525 	if (rl->rl_parent_lkid) {
3526 		error = -EOPNOTSUPP;
3527 		goto out;
3528 	}
3529 
3530 	error = find_rsb(ls, rl->rl_name, rl->rl_namelen, R_MASTER, &r);
3531 	if (error)
3532 		goto out;
3533 
3534 	lock_rsb(r);
3535 
3536 	lkb = search_remid(r, rc->rc_header.h_nodeid, rl->rl_lkid);
3537 	if (lkb) {
3538 		error = -EEXIST;
3539 		goto out_remid;
3540 	}
3541 
3542 	error = create_lkb(ls, &lkb);
3543 	if (error)
3544 		goto out_unlock;
3545 
3546 	error = receive_rcom_lock_args(ls, lkb, r, rc);
3547 	if (error) {
3548 		__put_lkb(ls, lkb);
3549 		goto out_unlock;
3550 	}
3551 
3552 	attach_lkb(r, lkb);
3553 	add_lkb(r, lkb, rl->rl_status);
3554 	error = 0;
3555 
3556  out_remid:
3557 	/* this is the new value returned to the lock holder for
3558 	   saving in its process-copy lkb */
3559 	rl->rl_remid = lkb->lkb_id;
3560 
3561  out_unlock:
3562 	unlock_rsb(r);
3563 	put_rsb(r);
3564  out:
3565 	if (error)
3566 		log_print("recover_master_copy %d %x", error, rl->rl_lkid);
3567 	rl->rl_result = error;
3568 	return error;
3569 }
3570 
3571 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3572 {
3573 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3574 	struct dlm_rsb *r;
3575 	struct dlm_lkb *lkb;
3576 	int error;
3577 
3578 	error = find_lkb(ls, rl->rl_lkid, &lkb);
3579 	if (error) {
3580 		log_error(ls, "recover_process_copy no lkid %x", rl->rl_lkid);
3581 		return error;
3582 	}
3583 
3584 	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3585 
3586 	error = rl->rl_result;
3587 
3588 	r = lkb->lkb_resource;
3589 	hold_rsb(r);
3590 	lock_rsb(r);
3591 
3592 	switch (error) {
3593 	case -EBADR:
3594 		/* There's a chance the new master received our lock before
3595 		   dlm_recover_master_reply(), this wouldn't happen if we did
3596 		   a barrier between recover_masters and recover_locks. */
3597 		log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
3598 			  (unsigned long)r, r->res_name);
3599 		dlm_send_rcom_lock(r, lkb);
3600 		goto out;
3601 	case -EEXIST:
3602 		log_debug(ls, "master copy exists %x", lkb->lkb_id);
3603 		/* fall through */
3604 	case 0:
3605 		lkb->lkb_remid = rl->rl_remid;
3606 		break;
3607 	default:
3608 		log_error(ls, "dlm_recover_process_copy unknown error %d %x",
3609 			  error, lkb->lkb_id);
3610 	}
3611 
3612 	/* an ack for dlm_recover_locks() which waits for replies from
3613 	   all the locks it sends to new masters */
3614 	dlm_recovered_lock(r);
3615  out:
3616 	unlock_rsb(r);
3617 	put_rsb(r);
3618 	dlm_put_lkb(lkb);
3619 
3620 	return 0;
3621 }
3622 
3623 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
3624 		     int mode, uint32_t flags, void *name, unsigned int namelen,
3625 		     uint32_t parent_lkid)
3626 {
3627 	struct dlm_lkb *lkb;
3628 	struct dlm_args args;
3629 	int error;
3630 
3631 	lock_recovery(ls);
3632 
3633 	error = create_lkb(ls, &lkb);
3634 	if (error) {
3635 		kfree(ua);
3636 		goto out;
3637 	}
3638 
3639 	if (flags & DLM_LKF_VALBLK) {
3640 		ua->lksb.sb_lvbptr = kmalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
3641 		if (!ua->lksb.sb_lvbptr) {
3642 			kfree(ua);
3643 			__put_lkb(ls, lkb);
3644 			error = -ENOMEM;
3645 			goto out;
3646 		}
3647 	}
3648 
3649 	/* After ua is attached to lkb it will be freed by free_lkb().
3650 	   When DLM_IFL_USER is set, the dlm knows that this is a userspace
3651 	   lock and that lkb_astparam is the dlm_user_args structure. */
3652 
3653 	error = set_lock_args(mode, &ua->lksb, flags, namelen, parent_lkid,
3654 			      DLM_FAKE_USER_AST, ua, DLM_FAKE_USER_AST, &args);
3655 	lkb->lkb_flags |= DLM_IFL_USER;
3656 	ua->old_mode = DLM_LOCK_IV;
3657 
3658 	if (error) {
3659 		__put_lkb(ls, lkb);
3660 		goto out;
3661 	}
3662 
3663 	error = request_lock(ls, lkb, name, namelen, &args);
3664 
3665 	switch (error) {
3666 	case 0:
3667 		break;
3668 	case -EINPROGRESS:
3669 		error = 0;
3670 		break;
3671 	case -EAGAIN:
3672 		error = 0;
3673 		/* fall through */
3674 	default:
3675 		__put_lkb(ls, lkb);
3676 		goto out;
3677 	}
3678 
3679 	/* add this new lkb to the per-process list of locks */
3680 	spin_lock(&ua->proc->locks_spin);
3681 	kref_get(&lkb->lkb_ref);
3682 	list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
3683 	spin_unlock(&ua->proc->locks_spin);
3684  out:
3685 	unlock_recovery(ls);
3686 	return error;
3687 }
3688 
3689 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
3690 		     int mode, uint32_t flags, uint32_t lkid, char *lvb_in)
3691 {
3692 	struct dlm_lkb *lkb;
3693 	struct dlm_args args;
3694 	struct dlm_user_args *ua;
3695 	int error;
3696 
3697 	lock_recovery(ls);
3698 
3699 	error = find_lkb(ls, lkid, &lkb);
3700 	if (error)
3701 		goto out;
3702 
3703 	/* user can change the params on its lock when it converts it, or
3704 	   add an lvb that didn't exist before */
3705 
3706 	ua = (struct dlm_user_args *)lkb->lkb_astparam;
3707 
3708 	if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
3709 		ua->lksb.sb_lvbptr = kmalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
3710 		if (!ua->lksb.sb_lvbptr) {
3711 			error = -ENOMEM;
3712 			goto out_put;
3713 		}
3714 	}
3715 	if (lvb_in && ua->lksb.sb_lvbptr)
3716 		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
3717 
3718 	ua->castparam = ua_tmp->castparam;
3719 	ua->castaddr = ua_tmp->castaddr;
3720 	ua->bastparam = ua_tmp->bastparam;
3721 	ua->bastaddr = ua_tmp->bastaddr;
3722 	ua->user_lksb = ua_tmp->user_lksb;
3723 	ua->old_mode = lkb->lkb_grmode;
3724 
3725 	error = set_lock_args(mode, &ua->lksb, flags, 0, 0, DLM_FAKE_USER_AST,
3726 			      ua, DLM_FAKE_USER_AST, &args);
3727 	if (error)
3728 		goto out_put;
3729 
3730 	error = convert_lock(ls, lkb, &args);
3731 
3732 	if (error == -EINPROGRESS || error == -EAGAIN)
3733 		error = 0;
3734  out_put:
3735 	dlm_put_lkb(lkb);
3736  out:
3737 	unlock_recovery(ls);
3738 	kfree(ua_tmp);
3739 	return error;
3740 }
3741 
3742 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
3743 		    uint32_t flags, uint32_t lkid, char *lvb_in)
3744 {
3745 	struct dlm_lkb *lkb;
3746 	struct dlm_args args;
3747 	struct dlm_user_args *ua;
3748 	int error;
3749 
3750 	lock_recovery(ls);
3751 
3752 	error = find_lkb(ls, lkid, &lkb);
3753 	if (error)
3754 		goto out;
3755 
3756 	ua = (struct dlm_user_args *)lkb->lkb_astparam;
3757 
3758 	if (lvb_in && ua->lksb.sb_lvbptr)
3759 		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
3760 	ua->castparam = ua_tmp->castparam;
3761 	ua->user_lksb = ua_tmp->user_lksb;
3762 
3763 	error = set_unlock_args(flags, ua, &args);
3764 	if (error)
3765 		goto out_put;
3766 
3767 	error = unlock_lock(ls, lkb, &args);
3768 
3769 	if (error == -DLM_EUNLOCK)
3770 		error = 0;
3771 	if (error)
3772 		goto out_put;
3773 
3774 	spin_lock(&ua->proc->locks_spin);
3775 	/* dlm_user_add_ast() may have already taken lkb off the proc list */
3776 	if (!list_empty(&lkb->lkb_ownqueue))
3777 		list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
3778 	spin_unlock(&ua->proc->locks_spin);
3779  out_put:
3780 	dlm_put_lkb(lkb);
3781  out:
3782 	unlock_recovery(ls);
3783 	return error;
3784 }
3785 
3786 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
3787 		    uint32_t flags, uint32_t lkid)
3788 {
3789 	struct dlm_lkb *lkb;
3790 	struct dlm_args args;
3791 	struct dlm_user_args *ua;
3792 	int error;
3793 
3794 	lock_recovery(ls);
3795 
3796 	error = find_lkb(ls, lkid, &lkb);
3797 	if (error)
3798 		goto out;
3799 
3800 	ua = (struct dlm_user_args *)lkb->lkb_astparam;
3801 	ua->castparam = ua_tmp->castparam;
3802 	ua->user_lksb = ua_tmp->user_lksb;
3803 
3804 	error = set_unlock_args(flags, ua, &args);
3805 	if (error)
3806 		goto out_put;
3807 
3808 	error = cancel_lock(ls, lkb, &args);
3809 
3810 	if (error == -DLM_ECANCEL)
3811 		error = 0;
3812 	if (error)
3813 		goto out_put;
3814 
3815 	/* this lkb was removed from the WAITING queue */
3816 	if (lkb->lkb_grmode == DLM_LOCK_IV) {
3817 		spin_lock(&ua->proc->locks_spin);
3818 		list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
3819 		spin_unlock(&ua->proc->locks_spin);
3820 	}
3821  out_put:
3822 	dlm_put_lkb(lkb);
3823  out:
3824 	unlock_recovery(ls);
3825 	return error;
3826 }
3827 
3828 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
3829 {
3830 	struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
3831 
3832 	if (ua->lksb.sb_lvbptr)
3833 		kfree(ua->lksb.sb_lvbptr);
3834 	kfree(ua);
3835 	lkb->lkb_astparam = (long)NULL;
3836 
3837 	/* TODO: propogate to master if needed */
3838 	return 0;
3839 }
3840 
3841 /* The force flag allows the unlock to go ahead even if the lkb isn't granted.
3842    Regardless of what rsb queue the lock is on, it's removed and freed. */
3843 
3844 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
3845 {
3846 	struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
3847 	struct dlm_args args;
3848 	int error;
3849 
3850 	/* FIXME: we need to handle the case where the lkb is in limbo
3851 	   while the rsb is being looked up, currently we assert in
3852 	   _unlock_lock/is_remote because rsb nodeid is -1. */
3853 
3854 	set_unlock_args(DLM_LKF_FORCEUNLOCK, ua, &args);
3855 
3856 	error = unlock_lock(ls, lkb, &args);
3857 	if (error == -DLM_EUNLOCK)
3858 		error = 0;
3859 	return error;
3860 }
3861 
3862 /* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
3863    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
3864    which we clear here. */
3865 
3866 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
3867    list, and no more device_writes should add lkb's to proc->locks list; so we
3868    shouldn't need to take asts_spin or locks_spin here.  this assumes that
3869    device reads/writes/closes are serialized -- FIXME: we may need to serialize
3870    them ourself. */
3871 
3872 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
3873 {
3874 	struct dlm_lkb *lkb, *safe;
3875 
3876 	lock_recovery(ls);
3877 	mutex_lock(&ls->ls_clear_proc_locks);
3878 
3879 	list_for_each_entry_safe(lkb, safe, &proc->locks, lkb_ownqueue) {
3880 		list_del_init(&lkb->lkb_ownqueue);
3881 
3882 		if (lkb->lkb_exflags & DLM_LKF_PERSISTENT) {
3883 			lkb->lkb_flags |= DLM_IFL_ORPHAN;
3884 			orphan_proc_lock(ls, lkb);
3885 		} else {
3886 			lkb->lkb_flags |= DLM_IFL_DEAD;
3887 			unlock_proc_lock(ls, lkb);
3888 		}
3889 
3890 		/* this removes the reference for the proc->locks list
3891 		   added by dlm_user_request, it may result in the lkb
3892 		   being freed */
3893 
3894 		dlm_put_lkb(lkb);
3895 	}
3896 
3897 	/* in-progress unlocks */
3898 	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
3899 		list_del_init(&lkb->lkb_ownqueue);
3900 		lkb->lkb_flags |= DLM_IFL_DEAD;
3901 		dlm_put_lkb(lkb);
3902 	}
3903 
3904 	list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
3905 		list_del(&lkb->lkb_astqueue);
3906 		dlm_put_lkb(lkb);
3907 	}
3908 
3909 	mutex_unlock(&ls->ls_clear_proc_locks);
3910 	unlock_recovery(ls);
3911 }
3912 
3913