xref: /openbmc/linux/fs/dlm/lock.c (revision e868d61272caa648214046a096e5a6bfc068dc8c)
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005-2007 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12 
13 /* Central locking logic has four stages:
14 
15    dlm_lock()
16    dlm_unlock()
17 
18    request_lock(ls, lkb)
19    convert_lock(ls, lkb)
20    unlock_lock(ls, lkb)
21    cancel_lock(ls, lkb)
22 
23    _request_lock(r, lkb)
24    _convert_lock(r, lkb)
25    _unlock_lock(r, lkb)
26    _cancel_lock(r, lkb)
27 
28    do_request(r, lkb)
29    do_convert(r, lkb)
30    do_unlock(r, lkb)
31    do_cancel(r, lkb)
32 
33    Stage 1 (lock, unlock) is mainly about checking input args and
34    splitting into one of the four main operations:
35 
36        dlm_lock          = request_lock
37        dlm_lock+CONVERT  = convert_lock
38        dlm_unlock        = unlock_lock
39        dlm_unlock+CANCEL = cancel_lock
40 
41    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42    provided to the next stage.
43 
44    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45    When remote, it calls send_xxxx(), when local it calls do_xxxx().
46 
47    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
48    given rsb and lkb and queues callbacks.
49 
50    For remote operations, send_xxxx() results in the corresponding do_xxxx()
51    function being executed on the remote node.  The connecting send/receive
52    calls on local (L) and remote (R) nodes:
53 
54    L: send_xxxx()              ->  R: receive_xxxx()
55                                    R: do_xxxx()
56    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
57 */
58 #include <linux/types.h>
59 #include "dlm_internal.h"
60 #include <linux/dlm_device.h>
61 #include "memory.h"
62 #include "lowcomms.h"
63 #include "requestqueue.h"
64 #include "util.h"
65 #include "dir.h"
66 #include "member.h"
67 #include "lockspace.h"
68 #include "ast.h"
69 #include "lock.h"
70 #include "rcom.h"
71 #include "recover.h"
72 #include "lvb_table.h"
73 #include "user.h"
74 #include "config.h"
75 
76 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
77 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
78 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
82 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_remove(struct dlm_rsb *r);
84 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
86 				    struct dlm_message *ms);
87 static int receive_extralen(struct dlm_message *ms);
88 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
89 
90 /*
91  * Lock compatibilty matrix - thanks Steve
92  * UN = Unlocked state. Not really a state, used as a flag
93  * PD = Padding. Used to make the matrix a nice power of two in size
94  * Other states are the same as the VMS DLM.
95  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
96  */
97 
98 static const int __dlm_compat_matrix[8][8] = {
99       /* UN NL CR CW PR PW EX PD */
100         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
101         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
102         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
103         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
104         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
105         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
106         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
107         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
108 };
109 
110 /*
111  * This defines the direction of transfer of LVB data.
112  * Granted mode is the row; requested mode is the column.
113  * Usage: matrix[grmode+1][rqmode+1]
114  * 1 = LVB is returned to the caller
115  * 0 = LVB is written to the resource
116  * -1 = nothing happens to the LVB
117  */
118 
119 const int dlm_lvb_operations[8][8] = {
120         /* UN   NL  CR  CW  PR  PW  EX  PD*/
121         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
122         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
123         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
124         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
125         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
126         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
127         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
128         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
129 };
130 
131 #define modes_compat(gr, rq) \
132 	__dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
133 
134 int dlm_modes_compat(int mode1, int mode2)
135 {
136 	return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
137 }
138 
139 /*
140  * Compatibility matrix for conversions with QUECVT set.
141  * Granted mode is the row; requested mode is the column.
142  * Usage: matrix[grmode+1][rqmode+1]
143  */
144 
145 static const int __quecvt_compat_matrix[8][8] = {
146       /* UN NL CR CW PR PW EX PD */
147         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
148         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
149         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
150         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
151         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
152         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
153         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
154         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
155 };
156 
157 void dlm_print_lkb(struct dlm_lkb *lkb)
158 {
159 	printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
160 	       "     status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
161 	       lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
162 	       lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
163 	       lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
164 }
165 
166 void dlm_print_rsb(struct dlm_rsb *r)
167 {
168 	printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
169 	       r->res_nodeid, r->res_flags, r->res_first_lkid,
170 	       r->res_recover_locks_count, r->res_name);
171 }
172 
173 void dlm_dump_rsb(struct dlm_rsb *r)
174 {
175 	struct dlm_lkb *lkb;
176 
177 	dlm_print_rsb(r);
178 
179 	printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
180 	       list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
181 	printk(KERN_ERR "rsb lookup list\n");
182 	list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
183 		dlm_print_lkb(lkb);
184 	printk(KERN_ERR "rsb grant queue:\n");
185 	list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
186 		dlm_print_lkb(lkb);
187 	printk(KERN_ERR "rsb convert queue:\n");
188 	list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
189 		dlm_print_lkb(lkb);
190 	printk(KERN_ERR "rsb wait queue:\n");
191 	list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
192 		dlm_print_lkb(lkb);
193 }
194 
195 /* Threads cannot use the lockspace while it's being recovered */
196 
197 static inline void lock_recovery(struct dlm_ls *ls)
198 {
199 	down_read(&ls->ls_in_recovery);
200 }
201 
202 static inline void unlock_recovery(struct dlm_ls *ls)
203 {
204 	up_read(&ls->ls_in_recovery);
205 }
206 
207 static inline int lock_recovery_try(struct dlm_ls *ls)
208 {
209 	return down_read_trylock(&ls->ls_in_recovery);
210 }
211 
212 static inline int can_be_queued(struct dlm_lkb *lkb)
213 {
214 	return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
215 }
216 
217 static inline int force_blocking_asts(struct dlm_lkb *lkb)
218 {
219 	return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
220 }
221 
222 static inline int is_demoted(struct dlm_lkb *lkb)
223 {
224 	return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
225 }
226 
227 static inline int is_altmode(struct dlm_lkb *lkb)
228 {
229 	return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
230 }
231 
232 static inline int is_granted(struct dlm_lkb *lkb)
233 {
234 	return (lkb->lkb_status == DLM_LKSTS_GRANTED);
235 }
236 
237 static inline int is_remote(struct dlm_rsb *r)
238 {
239 	DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
240 	return !!r->res_nodeid;
241 }
242 
243 static inline int is_process_copy(struct dlm_lkb *lkb)
244 {
245 	return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
246 }
247 
248 static inline int is_master_copy(struct dlm_lkb *lkb)
249 {
250 	if (lkb->lkb_flags & DLM_IFL_MSTCPY)
251 		DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
252 	return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
253 }
254 
255 static inline int middle_conversion(struct dlm_lkb *lkb)
256 {
257 	if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
258 	    (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
259 		return 1;
260 	return 0;
261 }
262 
263 static inline int down_conversion(struct dlm_lkb *lkb)
264 {
265 	return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
266 }
267 
268 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
269 {
270 	return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
271 }
272 
273 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
274 {
275 	return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
276 }
277 
278 static inline int is_overlap(struct dlm_lkb *lkb)
279 {
280 	return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
281 				  DLM_IFL_OVERLAP_CANCEL));
282 }
283 
284 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
285 {
286 	if (is_master_copy(lkb))
287 		return;
288 
289 	DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
290 
291 	lkb->lkb_lksb->sb_status = rv;
292 	lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
293 
294 	dlm_add_ast(lkb, AST_COMP);
295 }
296 
297 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
298 {
299 	queue_cast(r, lkb,
300 		   is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
301 }
302 
303 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
304 {
305 	if (is_master_copy(lkb))
306 		send_bast(r, lkb, rqmode);
307 	else {
308 		lkb->lkb_bastmode = rqmode;
309 		dlm_add_ast(lkb, AST_BAST);
310 	}
311 }
312 
313 /*
314  * Basic operations on rsb's and lkb's
315  */
316 
317 static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
318 {
319 	struct dlm_rsb *r;
320 
321 	r = allocate_rsb(ls, len);
322 	if (!r)
323 		return NULL;
324 
325 	r->res_ls = ls;
326 	r->res_length = len;
327 	memcpy(r->res_name, name, len);
328 	mutex_init(&r->res_mutex);
329 
330 	INIT_LIST_HEAD(&r->res_lookup);
331 	INIT_LIST_HEAD(&r->res_grantqueue);
332 	INIT_LIST_HEAD(&r->res_convertqueue);
333 	INIT_LIST_HEAD(&r->res_waitqueue);
334 	INIT_LIST_HEAD(&r->res_root_list);
335 	INIT_LIST_HEAD(&r->res_recover_list);
336 
337 	return r;
338 }
339 
340 static int search_rsb_list(struct list_head *head, char *name, int len,
341 			   unsigned int flags, struct dlm_rsb **r_ret)
342 {
343 	struct dlm_rsb *r;
344 	int error = 0;
345 
346 	list_for_each_entry(r, head, res_hashchain) {
347 		if (len == r->res_length && !memcmp(name, r->res_name, len))
348 			goto found;
349 	}
350 	return -EBADR;
351 
352  found:
353 	if (r->res_nodeid && (flags & R_MASTER))
354 		error = -ENOTBLK;
355 	*r_ret = r;
356 	return error;
357 }
358 
359 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
360 		       unsigned int flags, struct dlm_rsb **r_ret)
361 {
362 	struct dlm_rsb *r;
363 	int error;
364 
365 	error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
366 	if (!error) {
367 		kref_get(&r->res_ref);
368 		goto out;
369 	}
370 	error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
371 	if (error)
372 		goto out;
373 
374 	list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
375 
376 	if (dlm_no_directory(ls))
377 		goto out;
378 
379 	if (r->res_nodeid == -1) {
380 		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
381 		r->res_first_lkid = 0;
382 	} else if (r->res_nodeid > 0) {
383 		rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
384 		r->res_first_lkid = 0;
385 	} else {
386 		DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
387 		DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
388 	}
389  out:
390 	*r_ret = r;
391 	return error;
392 }
393 
394 static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
395 		      unsigned int flags, struct dlm_rsb **r_ret)
396 {
397 	int error;
398 	write_lock(&ls->ls_rsbtbl[b].lock);
399 	error = _search_rsb(ls, name, len, b, flags, r_ret);
400 	write_unlock(&ls->ls_rsbtbl[b].lock);
401 	return error;
402 }
403 
404 /*
405  * Find rsb in rsbtbl and potentially create/add one
406  *
407  * Delaying the release of rsb's has a similar benefit to applications keeping
408  * NL locks on an rsb, but without the guarantee that the cached master value
409  * will still be valid when the rsb is reused.  Apps aren't always smart enough
410  * to keep NL locks on an rsb that they may lock again shortly; this can lead
411  * to excessive master lookups and removals if we don't delay the release.
412  *
413  * Searching for an rsb means looking through both the normal list and toss
414  * list.  When found on the toss list the rsb is moved to the normal list with
415  * ref count of 1; when found on normal list the ref count is incremented.
416  */
417 
418 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
419 		    unsigned int flags, struct dlm_rsb **r_ret)
420 {
421 	struct dlm_rsb *r, *tmp;
422 	uint32_t hash, bucket;
423 	int error = 0;
424 
425 	if (dlm_no_directory(ls))
426 		flags |= R_CREATE;
427 
428 	hash = jhash(name, namelen, 0);
429 	bucket = hash & (ls->ls_rsbtbl_size - 1);
430 
431 	error = search_rsb(ls, name, namelen, bucket, flags, &r);
432 	if (!error)
433 		goto out;
434 
435 	if (error == -EBADR && !(flags & R_CREATE))
436 		goto out;
437 
438 	/* the rsb was found but wasn't a master copy */
439 	if (error == -ENOTBLK)
440 		goto out;
441 
442 	error = -ENOMEM;
443 	r = create_rsb(ls, name, namelen);
444 	if (!r)
445 		goto out;
446 
447 	r->res_hash = hash;
448 	r->res_bucket = bucket;
449 	r->res_nodeid = -1;
450 	kref_init(&r->res_ref);
451 
452 	/* With no directory, the master can be set immediately */
453 	if (dlm_no_directory(ls)) {
454 		int nodeid = dlm_dir_nodeid(r);
455 		if (nodeid == dlm_our_nodeid())
456 			nodeid = 0;
457 		r->res_nodeid = nodeid;
458 	}
459 
460 	write_lock(&ls->ls_rsbtbl[bucket].lock);
461 	error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
462 	if (!error) {
463 		write_unlock(&ls->ls_rsbtbl[bucket].lock);
464 		free_rsb(r);
465 		r = tmp;
466 		goto out;
467 	}
468 	list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
469 	write_unlock(&ls->ls_rsbtbl[bucket].lock);
470 	error = 0;
471  out:
472 	*r_ret = r;
473 	return error;
474 }
475 
476 int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen,
477 		 unsigned int flags, struct dlm_rsb **r_ret)
478 {
479 	return find_rsb(ls, name, namelen, flags, r_ret);
480 }
481 
482 /* This is only called to add a reference when the code already holds
483    a valid reference to the rsb, so there's no need for locking. */
484 
485 static inline void hold_rsb(struct dlm_rsb *r)
486 {
487 	kref_get(&r->res_ref);
488 }
489 
490 void dlm_hold_rsb(struct dlm_rsb *r)
491 {
492 	hold_rsb(r);
493 }
494 
495 static void toss_rsb(struct kref *kref)
496 {
497 	struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
498 	struct dlm_ls *ls = r->res_ls;
499 
500 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
501 	kref_init(&r->res_ref);
502 	list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
503 	r->res_toss_time = jiffies;
504 	if (r->res_lvbptr) {
505 		free_lvb(r->res_lvbptr);
506 		r->res_lvbptr = NULL;
507 	}
508 }
509 
510 /* When all references to the rsb are gone it's transfered to
511    the tossed list for later disposal. */
512 
513 static void put_rsb(struct dlm_rsb *r)
514 {
515 	struct dlm_ls *ls = r->res_ls;
516 	uint32_t bucket = r->res_bucket;
517 
518 	write_lock(&ls->ls_rsbtbl[bucket].lock);
519 	kref_put(&r->res_ref, toss_rsb);
520 	write_unlock(&ls->ls_rsbtbl[bucket].lock);
521 }
522 
523 void dlm_put_rsb(struct dlm_rsb *r)
524 {
525 	put_rsb(r);
526 }
527 
528 /* See comment for unhold_lkb */
529 
530 static void unhold_rsb(struct dlm_rsb *r)
531 {
532 	int rv;
533 	rv = kref_put(&r->res_ref, toss_rsb);
534 	DLM_ASSERT(!rv, dlm_dump_rsb(r););
535 }
536 
537 static void kill_rsb(struct kref *kref)
538 {
539 	struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
540 
541 	/* All work is done after the return from kref_put() so we
542 	   can release the write_lock before the remove and free. */
543 
544 	DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
545 	DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
546 	DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
547 	DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
548 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
549 	DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
550 }
551 
552 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
553    The rsb must exist as long as any lkb's for it do. */
554 
555 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
556 {
557 	hold_rsb(r);
558 	lkb->lkb_resource = r;
559 }
560 
561 static void detach_lkb(struct dlm_lkb *lkb)
562 {
563 	if (lkb->lkb_resource) {
564 		put_rsb(lkb->lkb_resource);
565 		lkb->lkb_resource = NULL;
566 	}
567 }
568 
569 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
570 {
571 	struct dlm_lkb *lkb, *tmp;
572 	uint32_t lkid = 0;
573 	uint16_t bucket;
574 
575 	lkb = allocate_lkb(ls);
576 	if (!lkb)
577 		return -ENOMEM;
578 
579 	lkb->lkb_nodeid = -1;
580 	lkb->lkb_grmode = DLM_LOCK_IV;
581 	kref_init(&lkb->lkb_ref);
582 	INIT_LIST_HEAD(&lkb->lkb_ownqueue);
583 	INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
584 
585 	get_random_bytes(&bucket, sizeof(bucket));
586 	bucket &= (ls->ls_lkbtbl_size - 1);
587 
588 	write_lock(&ls->ls_lkbtbl[bucket].lock);
589 
590 	/* counter can roll over so we must verify lkid is not in use */
591 
592 	while (lkid == 0) {
593 		lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++;
594 
595 		list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
596 				    lkb_idtbl_list) {
597 			if (tmp->lkb_id != lkid)
598 				continue;
599 			lkid = 0;
600 			break;
601 		}
602 	}
603 
604 	lkb->lkb_id = lkid;
605 	list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
606 	write_unlock(&ls->ls_lkbtbl[bucket].lock);
607 
608 	*lkb_ret = lkb;
609 	return 0;
610 }
611 
612 static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
613 {
614 	struct dlm_lkb *lkb;
615 	uint16_t bucket = (lkid >> 16);
616 
617 	list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
618 		if (lkb->lkb_id == lkid)
619 			return lkb;
620 	}
621 	return NULL;
622 }
623 
624 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
625 {
626 	struct dlm_lkb *lkb;
627 	uint16_t bucket = (lkid >> 16);
628 
629 	if (bucket >= ls->ls_lkbtbl_size)
630 		return -EBADSLT;
631 
632 	read_lock(&ls->ls_lkbtbl[bucket].lock);
633 	lkb = __find_lkb(ls, lkid);
634 	if (lkb)
635 		kref_get(&lkb->lkb_ref);
636 	read_unlock(&ls->ls_lkbtbl[bucket].lock);
637 
638 	*lkb_ret = lkb;
639 	return lkb ? 0 : -ENOENT;
640 }
641 
642 static void kill_lkb(struct kref *kref)
643 {
644 	struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
645 
646 	/* All work is done after the return from kref_put() so we
647 	   can release the write_lock before the detach_lkb */
648 
649 	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
650 }
651 
652 /* __put_lkb() is used when an lkb may not have an rsb attached to
653    it so we need to provide the lockspace explicitly */
654 
655 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
656 {
657 	uint16_t bucket = (lkb->lkb_id >> 16);
658 
659 	write_lock(&ls->ls_lkbtbl[bucket].lock);
660 	if (kref_put(&lkb->lkb_ref, kill_lkb)) {
661 		list_del(&lkb->lkb_idtbl_list);
662 		write_unlock(&ls->ls_lkbtbl[bucket].lock);
663 
664 		detach_lkb(lkb);
665 
666 		/* for local/process lkbs, lvbptr points to caller's lksb */
667 		if (lkb->lkb_lvbptr && is_master_copy(lkb))
668 			free_lvb(lkb->lkb_lvbptr);
669 		free_lkb(lkb);
670 		return 1;
671 	} else {
672 		write_unlock(&ls->ls_lkbtbl[bucket].lock);
673 		return 0;
674 	}
675 }
676 
677 int dlm_put_lkb(struct dlm_lkb *lkb)
678 {
679 	struct dlm_ls *ls;
680 
681 	DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
682 	DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
683 
684 	ls = lkb->lkb_resource->res_ls;
685 	return __put_lkb(ls, lkb);
686 }
687 
688 /* This is only called to add a reference when the code already holds
689    a valid reference to the lkb, so there's no need for locking. */
690 
691 static inline void hold_lkb(struct dlm_lkb *lkb)
692 {
693 	kref_get(&lkb->lkb_ref);
694 }
695 
696 /* This is called when we need to remove a reference and are certain
697    it's not the last ref.  e.g. del_lkb is always called between a
698    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
699    put_lkb would work fine, but would involve unnecessary locking */
700 
701 static inline void unhold_lkb(struct dlm_lkb *lkb)
702 {
703 	int rv;
704 	rv = kref_put(&lkb->lkb_ref, kill_lkb);
705 	DLM_ASSERT(!rv, dlm_print_lkb(lkb););
706 }
707 
708 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
709 			    int mode)
710 {
711 	struct dlm_lkb *lkb = NULL;
712 
713 	list_for_each_entry(lkb, head, lkb_statequeue)
714 		if (lkb->lkb_rqmode < mode)
715 			break;
716 
717 	if (!lkb)
718 		list_add_tail(new, head);
719 	else
720 		__list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
721 }
722 
723 /* add/remove lkb to rsb's grant/convert/wait queue */
724 
725 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
726 {
727 	kref_get(&lkb->lkb_ref);
728 
729 	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
730 
731 	lkb->lkb_status = status;
732 
733 	switch (status) {
734 	case DLM_LKSTS_WAITING:
735 		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
736 			list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
737 		else
738 			list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
739 		break;
740 	case DLM_LKSTS_GRANTED:
741 		/* convention says granted locks kept in order of grmode */
742 		lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
743 				lkb->lkb_grmode);
744 		break;
745 	case DLM_LKSTS_CONVERT:
746 		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
747 			list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
748 		else
749 			list_add_tail(&lkb->lkb_statequeue,
750 				      &r->res_convertqueue);
751 		break;
752 	default:
753 		DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
754 	}
755 }
756 
757 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
758 {
759 	lkb->lkb_status = 0;
760 	list_del(&lkb->lkb_statequeue);
761 	unhold_lkb(lkb);
762 }
763 
764 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
765 {
766 	hold_lkb(lkb);
767 	del_lkb(r, lkb);
768 	add_lkb(r, lkb, sts);
769 	unhold_lkb(lkb);
770 }
771 
772 static int msg_reply_type(int mstype)
773 {
774 	switch (mstype) {
775 	case DLM_MSG_REQUEST:
776 		return DLM_MSG_REQUEST_REPLY;
777 	case DLM_MSG_CONVERT:
778 		return DLM_MSG_CONVERT_REPLY;
779 	case DLM_MSG_UNLOCK:
780 		return DLM_MSG_UNLOCK_REPLY;
781 	case DLM_MSG_CANCEL:
782 		return DLM_MSG_CANCEL_REPLY;
783 	case DLM_MSG_LOOKUP:
784 		return DLM_MSG_LOOKUP_REPLY;
785 	}
786 	return -1;
787 }
788 
789 /* add/remove lkb from global waiters list of lkb's waiting for
790    a reply from a remote node */
791 
792 static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
793 {
794 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
795 	int error = 0;
796 
797 	mutex_lock(&ls->ls_waiters_mutex);
798 
799 	if (is_overlap_unlock(lkb) ||
800 	    (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
801 		error = -EINVAL;
802 		goto out;
803 	}
804 
805 	if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
806 		switch (mstype) {
807 		case DLM_MSG_UNLOCK:
808 			lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
809 			break;
810 		case DLM_MSG_CANCEL:
811 			lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
812 			break;
813 		default:
814 			error = -EBUSY;
815 			goto out;
816 		}
817 		lkb->lkb_wait_count++;
818 		hold_lkb(lkb);
819 
820 		log_debug(ls, "add overlap %x cur %d new %d count %d flags %x",
821 			  lkb->lkb_id, lkb->lkb_wait_type, mstype,
822 			  lkb->lkb_wait_count, lkb->lkb_flags);
823 		goto out;
824 	}
825 
826 	DLM_ASSERT(!lkb->lkb_wait_count,
827 		   dlm_print_lkb(lkb);
828 		   printk("wait_count %d\n", lkb->lkb_wait_count););
829 
830 	lkb->lkb_wait_count++;
831 	lkb->lkb_wait_type = mstype;
832 	hold_lkb(lkb);
833 	list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
834  out:
835 	if (error)
836 		log_error(ls, "add_to_waiters %x error %d flags %x %d %d %s",
837 			  lkb->lkb_id, error, lkb->lkb_flags, mstype,
838 			  lkb->lkb_wait_type, lkb->lkb_resource->res_name);
839 	mutex_unlock(&ls->ls_waiters_mutex);
840 	return error;
841 }
842 
843 /* We clear the RESEND flag because we might be taking an lkb off the waiters
844    list as part of process_requestqueue (e.g. a lookup that has an optimized
845    request reply on the requestqueue) between dlm_recover_waiters_pre() which
846    set RESEND and dlm_recover_waiters_post() */
847 
848 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype)
849 {
850 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
851 	int overlap_done = 0;
852 
853 	if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
854 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
855 		overlap_done = 1;
856 		goto out_del;
857 	}
858 
859 	if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
860 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
861 		overlap_done = 1;
862 		goto out_del;
863 	}
864 
865 	/* N.B. type of reply may not always correspond to type of original
866 	   msg due to lookup->request optimization, verify others? */
867 
868 	if (lkb->lkb_wait_type) {
869 		lkb->lkb_wait_type = 0;
870 		goto out_del;
871 	}
872 
873 	log_error(ls, "remove_from_waiters lkid %x flags %x types %d %d",
874 		  lkb->lkb_id, lkb->lkb_flags, mstype, lkb->lkb_wait_type);
875 	return -1;
876 
877  out_del:
878 	/* the force-unlock/cancel has completed and we haven't recvd a reply
879 	   to the op that was in progress prior to the unlock/cancel; we
880 	   give up on any reply to the earlier op.  FIXME: not sure when/how
881 	   this would happen */
882 
883 	if (overlap_done && lkb->lkb_wait_type) {
884 		log_error(ls, "remove_from_waiters %x reply %d give up on %d",
885 			  lkb->lkb_id, mstype, lkb->lkb_wait_type);
886 		lkb->lkb_wait_count--;
887 		lkb->lkb_wait_type = 0;
888 	}
889 
890 	DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
891 
892 	lkb->lkb_flags &= ~DLM_IFL_RESEND;
893 	lkb->lkb_wait_count--;
894 	if (!lkb->lkb_wait_count)
895 		list_del_init(&lkb->lkb_wait_reply);
896 	unhold_lkb(lkb);
897 	return 0;
898 }
899 
900 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
901 {
902 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
903 	int error;
904 
905 	mutex_lock(&ls->ls_waiters_mutex);
906 	error = _remove_from_waiters(lkb, mstype);
907 	mutex_unlock(&ls->ls_waiters_mutex);
908 	return error;
909 }
910 
911 /* Handles situations where we might be processing a "fake" or "stub" reply in
912    which we can't try to take waiters_mutex again. */
913 
914 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
915 {
916 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
917 	int error;
918 
919 	if (ms != &ls->ls_stub_ms)
920 		mutex_lock(&ls->ls_waiters_mutex);
921 	error = _remove_from_waiters(lkb, ms->m_type);
922 	if (ms != &ls->ls_stub_ms)
923 		mutex_unlock(&ls->ls_waiters_mutex);
924 	return error;
925 }
926 
927 static void dir_remove(struct dlm_rsb *r)
928 {
929 	int to_nodeid;
930 
931 	if (dlm_no_directory(r->res_ls))
932 		return;
933 
934 	to_nodeid = dlm_dir_nodeid(r);
935 	if (to_nodeid != dlm_our_nodeid())
936 		send_remove(r);
937 	else
938 		dlm_dir_remove_entry(r->res_ls, to_nodeid,
939 				     r->res_name, r->res_length);
940 }
941 
942 /* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
943    found since they are in order of newest to oldest? */
944 
945 static int shrink_bucket(struct dlm_ls *ls, int b)
946 {
947 	struct dlm_rsb *r;
948 	int count = 0, found;
949 
950 	for (;;) {
951 		found = 0;
952 		write_lock(&ls->ls_rsbtbl[b].lock);
953 		list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
954 					    res_hashchain) {
955 			if (!time_after_eq(jiffies, r->res_toss_time +
956 					   dlm_config.ci_toss_secs * HZ))
957 				continue;
958 			found = 1;
959 			break;
960 		}
961 
962 		if (!found) {
963 			write_unlock(&ls->ls_rsbtbl[b].lock);
964 			break;
965 		}
966 
967 		if (kref_put(&r->res_ref, kill_rsb)) {
968 			list_del(&r->res_hashchain);
969 			write_unlock(&ls->ls_rsbtbl[b].lock);
970 
971 			if (is_master(r))
972 				dir_remove(r);
973 			free_rsb(r);
974 			count++;
975 		} else {
976 			write_unlock(&ls->ls_rsbtbl[b].lock);
977 			log_error(ls, "tossed rsb in use %s", r->res_name);
978 		}
979 	}
980 
981 	return count;
982 }
983 
984 void dlm_scan_rsbs(struct dlm_ls *ls)
985 {
986 	int i;
987 
988 	if (dlm_locking_stopped(ls))
989 		return;
990 
991 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
992 		shrink_bucket(ls, i);
993 		cond_resched();
994 	}
995 }
996 
997 /* lkb is master or local copy */
998 
999 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1000 {
1001 	int b, len = r->res_ls->ls_lvblen;
1002 
1003 	/* b=1 lvb returned to caller
1004 	   b=0 lvb written to rsb or invalidated
1005 	   b=-1 do nothing */
1006 
1007 	b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1008 
1009 	if (b == 1) {
1010 		if (!lkb->lkb_lvbptr)
1011 			return;
1012 
1013 		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1014 			return;
1015 
1016 		if (!r->res_lvbptr)
1017 			return;
1018 
1019 		memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1020 		lkb->lkb_lvbseq = r->res_lvbseq;
1021 
1022 	} else if (b == 0) {
1023 		if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1024 			rsb_set_flag(r, RSB_VALNOTVALID);
1025 			return;
1026 		}
1027 
1028 		if (!lkb->lkb_lvbptr)
1029 			return;
1030 
1031 		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1032 			return;
1033 
1034 		if (!r->res_lvbptr)
1035 			r->res_lvbptr = allocate_lvb(r->res_ls);
1036 
1037 		if (!r->res_lvbptr)
1038 			return;
1039 
1040 		memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1041 		r->res_lvbseq++;
1042 		lkb->lkb_lvbseq = r->res_lvbseq;
1043 		rsb_clear_flag(r, RSB_VALNOTVALID);
1044 	}
1045 
1046 	if (rsb_flag(r, RSB_VALNOTVALID))
1047 		lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1048 }
1049 
1050 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1051 {
1052 	if (lkb->lkb_grmode < DLM_LOCK_PW)
1053 		return;
1054 
1055 	if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1056 		rsb_set_flag(r, RSB_VALNOTVALID);
1057 		return;
1058 	}
1059 
1060 	if (!lkb->lkb_lvbptr)
1061 		return;
1062 
1063 	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1064 		return;
1065 
1066 	if (!r->res_lvbptr)
1067 		r->res_lvbptr = allocate_lvb(r->res_ls);
1068 
1069 	if (!r->res_lvbptr)
1070 		return;
1071 
1072 	memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1073 	r->res_lvbseq++;
1074 	rsb_clear_flag(r, RSB_VALNOTVALID);
1075 }
1076 
1077 /* lkb is process copy (pc) */
1078 
1079 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1080 			    struct dlm_message *ms)
1081 {
1082 	int b;
1083 
1084 	if (!lkb->lkb_lvbptr)
1085 		return;
1086 
1087 	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1088 		return;
1089 
1090 	b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1091 	if (b == 1) {
1092 		int len = receive_extralen(ms);
1093 		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1094 		lkb->lkb_lvbseq = ms->m_lvbseq;
1095 	}
1096 }
1097 
1098 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1099    remove_lock -- used for unlock, removes lkb from granted
1100    revert_lock -- used for cancel, moves lkb from convert to granted
1101    grant_lock  -- used for request and convert, adds lkb to granted or
1102                   moves lkb from convert or waiting to granted
1103 
1104    Each of these is used for master or local copy lkb's.  There is
1105    also a _pc() variation used to make the corresponding change on
1106    a process copy (pc) lkb. */
1107 
1108 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1109 {
1110 	del_lkb(r, lkb);
1111 	lkb->lkb_grmode = DLM_LOCK_IV;
1112 	/* this unhold undoes the original ref from create_lkb()
1113 	   so this leads to the lkb being freed */
1114 	unhold_lkb(lkb);
1115 }
1116 
1117 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1118 {
1119 	set_lvb_unlock(r, lkb);
1120 	_remove_lock(r, lkb);
1121 }
1122 
1123 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1124 {
1125 	_remove_lock(r, lkb);
1126 }
1127 
1128 /* returns: 0 did nothing
1129 	    1 moved lock to granted
1130 	   -1 removed lock */
1131 
1132 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1133 {
1134 	int rv = 0;
1135 
1136 	lkb->lkb_rqmode = DLM_LOCK_IV;
1137 
1138 	switch (lkb->lkb_status) {
1139 	case DLM_LKSTS_GRANTED:
1140 		break;
1141 	case DLM_LKSTS_CONVERT:
1142 		move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1143 		rv = 1;
1144 		break;
1145 	case DLM_LKSTS_WAITING:
1146 		del_lkb(r, lkb);
1147 		lkb->lkb_grmode = DLM_LOCK_IV;
1148 		/* this unhold undoes the original ref from create_lkb()
1149 		   so this leads to the lkb being freed */
1150 		unhold_lkb(lkb);
1151 		rv = -1;
1152 		break;
1153 	default:
1154 		log_print("invalid status for revert %d", lkb->lkb_status);
1155 	}
1156 	return rv;
1157 }
1158 
1159 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1160 {
1161 	return revert_lock(r, lkb);
1162 }
1163 
1164 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1165 {
1166 	if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1167 		lkb->lkb_grmode = lkb->lkb_rqmode;
1168 		if (lkb->lkb_status)
1169 			move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1170 		else
1171 			add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1172 	}
1173 
1174 	lkb->lkb_rqmode = DLM_LOCK_IV;
1175 }
1176 
1177 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1178 {
1179 	set_lvb_lock(r, lkb);
1180 	_grant_lock(r, lkb);
1181 	lkb->lkb_highbast = 0;
1182 }
1183 
1184 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1185 			  struct dlm_message *ms)
1186 {
1187 	set_lvb_lock_pc(r, lkb, ms);
1188 	_grant_lock(r, lkb);
1189 }
1190 
1191 /* called by grant_pending_locks() which means an async grant message must
1192    be sent to the requesting node in addition to granting the lock if the
1193    lkb belongs to a remote node. */
1194 
1195 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1196 {
1197 	grant_lock(r, lkb);
1198 	if (is_master_copy(lkb))
1199 		send_grant(r, lkb);
1200 	else
1201 		queue_cast(r, lkb, 0);
1202 }
1203 
1204 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1205    change the granted/requested modes.  We're munging things accordingly in
1206    the process copy.
1207    CONVDEADLK: our grmode may have been forced down to NL to resolve a
1208    conversion deadlock
1209    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1210    compatible with other granted locks */
1211 
1212 static void munge_demoted(struct dlm_lkb *lkb, struct dlm_message *ms)
1213 {
1214 	if (ms->m_type != DLM_MSG_CONVERT_REPLY) {
1215 		log_print("munge_demoted %x invalid reply type %d",
1216 			  lkb->lkb_id, ms->m_type);
1217 		return;
1218 	}
1219 
1220 	if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1221 		log_print("munge_demoted %x invalid modes gr %d rq %d",
1222 			  lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1223 		return;
1224 	}
1225 
1226 	lkb->lkb_grmode = DLM_LOCK_NL;
1227 }
1228 
1229 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1230 {
1231 	if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1232 	    ms->m_type != DLM_MSG_GRANT) {
1233 		log_print("munge_altmode %x invalid reply type %d",
1234 			  lkb->lkb_id, ms->m_type);
1235 		return;
1236 	}
1237 
1238 	if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1239 		lkb->lkb_rqmode = DLM_LOCK_PR;
1240 	else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1241 		lkb->lkb_rqmode = DLM_LOCK_CW;
1242 	else {
1243 		log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1244 		dlm_print_lkb(lkb);
1245 	}
1246 }
1247 
1248 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1249 {
1250 	struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1251 					   lkb_statequeue);
1252 	if (lkb->lkb_id == first->lkb_id)
1253 		return 1;
1254 
1255 	return 0;
1256 }
1257 
1258 /* Check if the given lkb conflicts with another lkb on the queue. */
1259 
1260 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1261 {
1262 	struct dlm_lkb *this;
1263 
1264 	list_for_each_entry(this, head, lkb_statequeue) {
1265 		if (this == lkb)
1266 			continue;
1267 		if (!modes_compat(this, lkb))
1268 			return 1;
1269 	}
1270 	return 0;
1271 }
1272 
1273 /*
1274  * "A conversion deadlock arises with a pair of lock requests in the converting
1275  * queue for one resource.  The granted mode of each lock blocks the requested
1276  * mode of the other lock."
1277  *
1278  * Part 2: if the granted mode of lkb is preventing the first lkb in the
1279  * convert queue from being granted, then demote lkb (set grmode to NL).
1280  * This second form requires that we check for conv-deadlk even when
1281  * now == 0 in _can_be_granted().
1282  *
1283  * Example:
1284  * Granted Queue: empty
1285  * Convert Queue: NL->EX (first lock)
1286  *                PR->EX (second lock)
1287  *
1288  * The first lock can't be granted because of the granted mode of the second
1289  * lock and the second lock can't be granted because it's not first in the
1290  * list.  We demote the granted mode of the second lock (the lkb passed to this
1291  * function).
1292  *
1293  * After the resolution, the "grant pending" function needs to go back and try
1294  * to grant locks on the convert queue again since the first lock can now be
1295  * granted.
1296  */
1297 
1298 static int conversion_deadlock_detect(struct dlm_rsb *rsb, struct dlm_lkb *lkb)
1299 {
1300 	struct dlm_lkb *this, *first = NULL, *self = NULL;
1301 
1302 	list_for_each_entry(this, &rsb->res_convertqueue, lkb_statequeue) {
1303 		if (!first)
1304 			first = this;
1305 		if (this == lkb) {
1306 			self = lkb;
1307 			continue;
1308 		}
1309 
1310 		if (!modes_compat(this, lkb) && !modes_compat(lkb, this))
1311 			return 1;
1312 	}
1313 
1314 	/* if lkb is on the convert queue and is preventing the first
1315 	   from being granted, then there's deadlock and we demote lkb.
1316 	   multiple converting locks may need to do this before the first
1317 	   converting lock can be granted. */
1318 
1319 	if (self && self != first) {
1320 		if (!modes_compat(lkb, first) &&
1321 		    !queue_conflict(&rsb->res_grantqueue, first))
1322 			return 1;
1323 	}
1324 
1325 	return 0;
1326 }
1327 
1328 /*
1329  * Return 1 if the lock can be granted, 0 otherwise.
1330  * Also detect and resolve conversion deadlocks.
1331  *
1332  * lkb is the lock to be granted
1333  *
1334  * now is 1 if the function is being called in the context of the
1335  * immediate request, it is 0 if called later, after the lock has been
1336  * queued.
1337  *
1338  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1339  */
1340 
1341 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1342 {
1343 	int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1344 
1345 	/*
1346 	 * 6-10: Version 5.4 introduced an option to address the phenomenon of
1347 	 * a new request for a NL mode lock being blocked.
1348 	 *
1349 	 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1350 	 * request, then it would be granted.  In essence, the use of this flag
1351 	 * tells the Lock Manager to expedite theis request by not considering
1352 	 * what may be in the CONVERTING or WAITING queues...  As of this
1353 	 * writing, the EXPEDITE flag can be used only with new requests for NL
1354 	 * mode locks.  This flag is not valid for conversion requests.
1355 	 *
1356 	 * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
1357 	 * conversion or used with a non-NL requested mode.  We also know an
1358 	 * EXPEDITE request is always granted immediately, so now must always
1359 	 * be 1.  The full condition to grant an expedite request: (now &&
1360 	 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1361 	 * therefore be shortened to just checking the flag.
1362 	 */
1363 
1364 	if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1365 		return 1;
1366 
1367 	/*
1368 	 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1369 	 * added to the remaining conditions.
1370 	 */
1371 
1372 	if (queue_conflict(&r->res_grantqueue, lkb))
1373 		goto out;
1374 
1375 	/*
1376 	 * 6-3: By default, a conversion request is immediately granted if the
1377 	 * requested mode is compatible with the modes of all other granted
1378 	 * locks
1379 	 */
1380 
1381 	if (queue_conflict(&r->res_convertqueue, lkb))
1382 		goto out;
1383 
1384 	/*
1385 	 * 6-5: But the default algorithm for deciding whether to grant or
1386 	 * queue conversion requests does not by itself guarantee that such
1387 	 * requests are serviced on a "first come first serve" basis.  This, in
1388 	 * turn, can lead to a phenomenon known as "indefinate postponement".
1389 	 *
1390 	 * 6-7: This issue is dealt with by using the optional QUECVT flag with
1391 	 * the system service employed to request a lock conversion.  This flag
1392 	 * forces certain conversion requests to be queued, even if they are
1393 	 * compatible with the granted modes of other locks on the same
1394 	 * resource.  Thus, the use of this flag results in conversion requests
1395 	 * being ordered on a "first come first servce" basis.
1396 	 *
1397 	 * DCT: This condition is all about new conversions being able to occur
1398 	 * "in place" while the lock remains on the granted queue (assuming
1399 	 * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
1400 	 * doesn't _have_ to go onto the convert queue where it's processed in
1401 	 * order.  The "now" variable is necessary to distinguish converts
1402 	 * being received and processed for the first time now, because once a
1403 	 * convert is moved to the conversion queue the condition below applies
1404 	 * requiring fifo granting.
1405 	 */
1406 
1407 	if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1408 		return 1;
1409 
1410 	/*
1411 	 * The NOORDER flag is set to avoid the standard vms rules on grant
1412 	 * order.
1413 	 */
1414 
1415 	if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1416 		return 1;
1417 
1418 	/*
1419 	 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1420 	 * granted until all other conversion requests ahead of it are granted
1421 	 * and/or canceled.
1422 	 */
1423 
1424 	if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1425 		return 1;
1426 
1427 	/*
1428 	 * 6-4: By default, a new request is immediately granted only if all
1429 	 * three of the following conditions are satisfied when the request is
1430 	 * issued:
1431 	 * - The queue of ungranted conversion requests for the resource is
1432 	 *   empty.
1433 	 * - The queue of ungranted new requests for the resource is empty.
1434 	 * - The mode of the new request is compatible with the most
1435 	 *   restrictive mode of all granted locks on the resource.
1436 	 */
1437 
1438 	if (now && !conv && list_empty(&r->res_convertqueue) &&
1439 	    list_empty(&r->res_waitqueue))
1440 		return 1;
1441 
1442 	/*
1443 	 * 6-4: Once a lock request is in the queue of ungranted new requests,
1444 	 * it cannot be granted until the queue of ungranted conversion
1445 	 * requests is empty, all ungranted new requests ahead of it are
1446 	 * granted and/or canceled, and it is compatible with the granted mode
1447 	 * of the most restrictive lock granted on the resource.
1448 	 */
1449 
1450 	if (!now && !conv && list_empty(&r->res_convertqueue) &&
1451 	    first_in_list(lkb, &r->res_waitqueue))
1452 		return 1;
1453 
1454  out:
1455 	/*
1456 	 * The following, enabled by CONVDEADLK, departs from VMS.
1457 	 */
1458 
1459 	if (conv && (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) &&
1460 	    conversion_deadlock_detect(r, lkb)) {
1461 		lkb->lkb_grmode = DLM_LOCK_NL;
1462 		lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1463 	}
1464 
1465 	return 0;
1466 }
1467 
1468 /*
1469  * The ALTPR and ALTCW flags aren't traditional lock manager flags, but are a
1470  * simple way to provide a big optimization to applications that can use them.
1471  */
1472 
1473 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1474 {
1475 	uint32_t flags = lkb->lkb_exflags;
1476 	int rv;
1477 	int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1478 
1479 	rv = _can_be_granted(r, lkb, now);
1480 	if (rv)
1481 		goto out;
1482 
1483 	if (lkb->lkb_sbflags & DLM_SBF_DEMOTED)
1484 		goto out;
1485 
1486 	if (rqmode != DLM_LOCK_PR && flags & DLM_LKF_ALTPR)
1487 		alt = DLM_LOCK_PR;
1488 	else if (rqmode != DLM_LOCK_CW && flags & DLM_LKF_ALTCW)
1489 		alt = DLM_LOCK_CW;
1490 
1491 	if (alt) {
1492 		lkb->lkb_rqmode = alt;
1493 		rv = _can_be_granted(r, lkb, now);
1494 		if (rv)
1495 			lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1496 		else
1497 			lkb->lkb_rqmode = rqmode;
1498 	}
1499  out:
1500 	return rv;
1501 }
1502 
1503 static int grant_pending_convert(struct dlm_rsb *r, int high)
1504 {
1505 	struct dlm_lkb *lkb, *s;
1506 	int hi, demoted, quit, grant_restart, demote_restart;
1507 
1508 	quit = 0;
1509  restart:
1510 	grant_restart = 0;
1511 	demote_restart = 0;
1512 	hi = DLM_LOCK_IV;
1513 
1514 	list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1515 		demoted = is_demoted(lkb);
1516 		if (can_be_granted(r, lkb, 0)) {
1517 			grant_lock_pending(r, lkb);
1518 			grant_restart = 1;
1519 		} else {
1520 			hi = max_t(int, lkb->lkb_rqmode, hi);
1521 			if (!demoted && is_demoted(lkb))
1522 				demote_restart = 1;
1523 		}
1524 	}
1525 
1526 	if (grant_restart)
1527 		goto restart;
1528 	if (demote_restart && !quit) {
1529 		quit = 1;
1530 		goto restart;
1531 	}
1532 
1533 	return max_t(int, high, hi);
1534 }
1535 
1536 static int grant_pending_wait(struct dlm_rsb *r, int high)
1537 {
1538 	struct dlm_lkb *lkb, *s;
1539 
1540 	list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1541 		if (can_be_granted(r, lkb, 0))
1542 			grant_lock_pending(r, lkb);
1543                 else
1544 			high = max_t(int, lkb->lkb_rqmode, high);
1545 	}
1546 
1547 	return high;
1548 }
1549 
1550 static void grant_pending_locks(struct dlm_rsb *r)
1551 {
1552 	struct dlm_lkb *lkb, *s;
1553 	int high = DLM_LOCK_IV;
1554 
1555 	DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
1556 
1557 	high = grant_pending_convert(r, high);
1558 	high = grant_pending_wait(r, high);
1559 
1560 	if (high == DLM_LOCK_IV)
1561 		return;
1562 
1563 	/*
1564 	 * If there are locks left on the wait/convert queue then send blocking
1565 	 * ASTs to granted locks based on the largest requested mode (high)
1566 	 * found above. FIXME: highbast < high comparison not valid for PR/CW.
1567 	 */
1568 
1569 	list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1570 		if (lkb->lkb_bastaddr && (lkb->lkb_highbast < high) &&
1571 		    !__dlm_compat_matrix[lkb->lkb_grmode+1][high+1]) {
1572 			queue_bast(r, lkb, high);
1573 			lkb->lkb_highbast = high;
1574 		}
1575 	}
1576 }
1577 
1578 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1579 			    struct dlm_lkb *lkb)
1580 {
1581 	struct dlm_lkb *gr;
1582 
1583 	list_for_each_entry(gr, head, lkb_statequeue) {
1584 		if (gr->lkb_bastaddr &&
1585 		    gr->lkb_highbast < lkb->lkb_rqmode &&
1586 		    !modes_compat(gr, lkb)) {
1587 			queue_bast(r, gr, lkb->lkb_rqmode);
1588 			gr->lkb_highbast = lkb->lkb_rqmode;
1589 		}
1590 	}
1591 }
1592 
1593 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1594 {
1595 	send_bast_queue(r, &r->res_grantqueue, lkb);
1596 }
1597 
1598 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1599 {
1600 	send_bast_queue(r, &r->res_grantqueue, lkb);
1601 	send_bast_queue(r, &r->res_convertqueue, lkb);
1602 }
1603 
1604 /* set_master(r, lkb) -- set the master nodeid of a resource
1605 
1606    The purpose of this function is to set the nodeid field in the given
1607    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
1608    known, it can just be copied to the lkb and the function will return
1609    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
1610    before it can be copied to the lkb.
1611 
1612    When the rsb nodeid is being looked up remotely, the initial lkb
1613    causing the lookup is kept on the ls_waiters list waiting for the
1614    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
1615    on the rsb's res_lookup list until the master is verified.
1616 
1617    Return values:
1618    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1619    1: the rsb master is not available and the lkb has been placed on
1620       a wait queue
1621 */
1622 
1623 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1624 {
1625 	struct dlm_ls *ls = r->res_ls;
1626 	int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1627 
1628 	if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1629 		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1630 		r->res_first_lkid = lkb->lkb_id;
1631 		lkb->lkb_nodeid = r->res_nodeid;
1632 		return 0;
1633 	}
1634 
1635 	if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1636 		list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1637 		return 1;
1638 	}
1639 
1640 	if (r->res_nodeid == 0) {
1641 		lkb->lkb_nodeid = 0;
1642 		return 0;
1643 	}
1644 
1645 	if (r->res_nodeid > 0) {
1646 		lkb->lkb_nodeid = r->res_nodeid;
1647 		return 0;
1648 	}
1649 
1650 	DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
1651 
1652 	dir_nodeid = dlm_dir_nodeid(r);
1653 
1654 	if (dir_nodeid != our_nodeid) {
1655 		r->res_first_lkid = lkb->lkb_id;
1656 		send_lookup(r, lkb);
1657 		return 1;
1658 	}
1659 
1660 	for (;;) {
1661 		/* It's possible for dlm_scand to remove an old rsb for
1662 		   this same resource from the toss list, us to create
1663 		   a new one, look up the master locally, and find it
1664 		   already exists just before dlm_scand does the
1665 		   dir_remove() on the previous rsb. */
1666 
1667 		error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1668 				       r->res_length, &ret_nodeid);
1669 		if (!error)
1670 			break;
1671 		log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1672 		schedule();
1673 	}
1674 
1675 	if (ret_nodeid == our_nodeid) {
1676 		r->res_first_lkid = 0;
1677 		r->res_nodeid = 0;
1678 		lkb->lkb_nodeid = 0;
1679 	} else {
1680 		r->res_first_lkid = lkb->lkb_id;
1681 		r->res_nodeid = ret_nodeid;
1682 		lkb->lkb_nodeid = ret_nodeid;
1683 	}
1684 	return 0;
1685 }
1686 
1687 static void process_lookup_list(struct dlm_rsb *r)
1688 {
1689 	struct dlm_lkb *lkb, *safe;
1690 
1691 	list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1692 		list_del_init(&lkb->lkb_rsb_lookup);
1693 		_request_lock(r, lkb);
1694 		schedule();
1695 	}
1696 }
1697 
1698 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
1699 
1700 static void confirm_master(struct dlm_rsb *r, int error)
1701 {
1702 	struct dlm_lkb *lkb;
1703 
1704 	if (!r->res_first_lkid)
1705 		return;
1706 
1707 	switch (error) {
1708 	case 0:
1709 	case -EINPROGRESS:
1710 		r->res_first_lkid = 0;
1711 		process_lookup_list(r);
1712 		break;
1713 
1714 	case -EAGAIN:
1715 		/* the remote master didn't queue our NOQUEUE request;
1716 		   make a waiting lkb the first_lkid */
1717 
1718 		r->res_first_lkid = 0;
1719 
1720 		if (!list_empty(&r->res_lookup)) {
1721 			lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1722 					 lkb_rsb_lookup);
1723 			list_del_init(&lkb->lkb_rsb_lookup);
1724 			r->res_first_lkid = lkb->lkb_id;
1725 			_request_lock(r, lkb);
1726 		} else
1727 			r->res_nodeid = -1;
1728 		break;
1729 
1730 	default:
1731 		log_error(r->res_ls, "confirm_master unknown error %d", error);
1732 	}
1733 }
1734 
1735 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
1736 			 int namelen, uint32_t parent_lkid, void *ast,
1737 			 void *astarg, void *bast, struct dlm_args *args)
1738 {
1739 	int rv = -EINVAL;
1740 
1741 	/* check for invalid arg usage */
1742 
1743 	if (mode < 0 || mode > DLM_LOCK_EX)
1744 		goto out;
1745 
1746 	if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
1747 		goto out;
1748 
1749 	if (flags & DLM_LKF_CANCEL)
1750 		goto out;
1751 
1752 	if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
1753 		goto out;
1754 
1755 	if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
1756 		goto out;
1757 
1758 	if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
1759 		goto out;
1760 
1761 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
1762 		goto out;
1763 
1764 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
1765 		goto out;
1766 
1767 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
1768 		goto out;
1769 
1770 	if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
1771 		goto out;
1772 
1773 	if (!ast || !lksb)
1774 		goto out;
1775 
1776 	if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
1777 		goto out;
1778 
1779 	/* parent/child locks not yet supported */
1780 	if (parent_lkid)
1781 		goto out;
1782 
1783 	if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
1784 		goto out;
1785 
1786 	/* these args will be copied to the lkb in validate_lock_args,
1787 	   it cannot be done now because when converting locks, fields in
1788 	   an active lkb cannot be modified before locking the rsb */
1789 
1790 	args->flags = flags;
1791 	args->astaddr = ast;
1792 	args->astparam = (long) astarg;
1793 	args->bastaddr = bast;
1794 	args->mode = mode;
1795 	args->lksb = lksb;
1796 	rv = 0;
1797  out:
1798 	return rv;
1799 }
1800 
1801 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
1802 {
1803 	if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
1804  		      DLM_LKF_FORCEUNLOCK))
1805 		return -EINVAL;
1806 
1807 	if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
1808 		return -EINVAL;
1809 
1810 	args->flags = flags;
1811 	args->astparam = (long) astarg;
1812 	return 0;
1813 }
1814 
1815 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
1816 			      struct dlm_args *args)
1817 {
1818 	int rv = -EINVAL;
1819 
1820 	if (args->flags & DLM_LKF_CONVERT) {
1821 		if (lkb->lkb_flags & DLM_IFL_MSTCPY)
1822 			goto out;
1823 
1824 		if (args->flags & DLM_LKF_QUECVT &&
1825 		    !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
1826 			goto out;
1827 
1828 		rv = -EBUSY;
1829 		if (lkb->lkb_status != DLM_LKSTS_GRANTED)
1830 			goto out;
1831 
1832 		if (lkb->lkb_wait_type)
1833 			goto out;
1834 
1835 		if (is_overlap(lkb))
1836 			goto out;
1837 	}
1838 
1839 	lkb->lkb_exflags = args->flags;
1840 	lkb->lkb_sbflags = 0;
1841 	lkb->lkb_astaddr = args->astaddr;
1842 	lkb->lkb_astparam = args->astparam;
1843 	lkb->lkb_bastaddr = args->bastaddr;
1844 	lkb->lkb_rqmode = args->mode;
1845 	lkb->lkb_lksb = args->lksb;
1846 	lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
1847 	lkb->lkb_ownpid = (int) current->pid;
1848 	rv = 0;
1849  out:
1850 	return rv;
1851 }
1852 
1853 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
1854    for success */
1855 
1856 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
1857    because there may be a lookup in progress and it's valid to do
1858    cancel/unlockf on it */
1859 
1860 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
1861 {
1862 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1863 	int rv = -EINVAL;
1864 
1865 	if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
1866 		log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
1867 		dlm_print_lkb(lkb);
1868 		goto out;
1869 	}
1870 
1871 	/* an lkb may still exist even though the lock is EOL'ed due to a
1872 	   cancel, unlock or failed noqueue request; an app can't use these
1873 	   locks; return same error as if the lkid had not been found at all */
1874 
1875 	if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
1876 		log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
1877 		rv = -ENOENT;
1878 		goto out;
1879 	}
1880 
1881 	/* an lkb may be waiting for an rsb lookup to complete where the
1882 	   lookup was initiated by another lock */
1883 
1884 	if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
1885 		if (!list_empty(&lkb->lkb_rsb_lookup)) {
1886 			log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
1887 			list_del_init(&lkb->lkb_rsb_lookup);
1888 			queue_cast(lkb->lkb_resource, lkb,
1889 				   args->flags & DLM_LKF_CANCEL ?
1890 				   -DLM_ECANCEL : -DLM_EUNLOCK);
1891 			unhold_lkb(lkb); /* undoes create_lkb() */
1892 			rv = -EBUSY;
1893 			goto out;
1894 		}
1895 	}
1896 
1897 	/* cancel not allowed with another cancel/unlock in progress */
1898 
1899 	if (args->flags & DLM_LKF_CANCEL) {
1900 		if (lkb->lkb_exflags & DLM_LKF_CANCEL)
1901 			goto out;
1902 
1903 		if (is_overlap(lkb))
1904 			goto out;
1905 
1906 		if (lkb->lkb_flags & DLM_IFL_RESEND) {
1907 			lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
1908 			rv = -EBUSY;
1909 			goto out;
1910 		}
1911 
1912 		switch (lkb->lkb_wait_type) {
1913 		case DLM_MSG_LOOKUP:
1914 		case DLM_MSG_REQUEST:
1915 			lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
1916 			rv = -EBUSY;
1917 			goto out;
1918 		case DLM_MSG_UNLOCK:
1919 		case DLM_MSG_CANCEL:
1920 			goto out;
1921 		}
1922 		/* add_to_waiters() will set OVERLAP_CANCEL */
1923 		goto out_ok;
1924 	}
1925 
1926 	/* do we need to allow a force-unlock if there's a normal unlock
1927 	   already in progress?  in what conditions could the normal unlock
1928 	   fail such that we'd want to send a force-unlock to be sure? */
1929 
1930 	if (args->flags & DLM_LKF_FORCEUNLOCK) {
1931 		if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
1932 			goto out;
1933 
1934 		if (is_overlap_unlock(lkb))
1935 			goto out;
1936 
1937 		if (lkb->lkb_flags & DLM_IFL_RESEND) {
1938 			lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
1939 			rv = -EBUSY;
1940 			goto out;
1941 		}
1942 
1943 		switch (lkb->lkb_wait_type) {
1944 		case DLM_MSG_LOOKUP:
1945 		case DLM_MSG_REQUEST:
1946 			lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
1947 			rv = -EBUSY;
1948 			goto out;
1949 		case DLM_MSG_UNLOCK:
1950 			goto out;
1951 		}
1952 		/* add_to_waiters() will set OVERLAP_UNLOCK */
1953 		goto out_ok;
1954 	}
1955 
1956 	/* normal unlock not allowed if there's any op in progress */
1957 	rv = -EBUSY;
1958 	if (lkb->lkb_wait_type || lkb->lkb_wait_count)
1959 		goto out;
1960 
1961  out_ok:
1962 	/* an overlapping op shouldn't blow away exflags from other op */
1963 	lkb->lkb_exflags |= args->flags;
1964 	lkb->lkb_sbflags = 0;
1965 	lkb->lkb_astparam = args->astparam;
1966 	rv = 0;
1967  out:
1968 	if (rv)
1969 		log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
1970 			  lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
1971 			  args->flags, lkb->lkb_wait_type,
1972 			  lkb->lkb_resource->res_name);
1973 	return rv;
1974 }
1975 
1976 /*
1977  * Four stage 4 varieties:
1978  * do_request(), do_convert(), do_unlock(), do_cancel()
1979  * These are called on the master node for the given lock and
1980  * from the central locking logic.
1981  */
1982 
1983 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
1984 {
1985 	int error = 0;
1986 
1987 	if (can_be_granted(r, lkb, 1)) {
1988 		grant_lock(r, lkb);
1989 		queue_cast(r, lkb, 0);
1990 		goto out;
1991 	}
1992 
1993 	if (can_be_queued(lkb)) {
1994 		error = -EINPROGRESS;
1995 		add_lkb(r, lkb, DLM_LKSTS_WAITING);
1996 		send_blocking_asts(r, lkb);
1997 		goto out;
1998 	}
1999 
2000 	error = -EAGAIN;
2001 	if (force_blocking_asts(lkb))
2002 		send_blocking_asts_all(r, lkb);
2003 	queue_cast(r, lkb, -EAGAIN);
2004 
2005  out:
2006 	return error;
2007 }
2008 
2009 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2010 {
2011 	int error = 0;
2012 
2013 	/* changing an existing lock may allow others to be granted */
2014 
2015 	if (can_be_granted(r, lkb, 1)) {
2016 		grant_lock(r, lkb);
2017 		queue_cast(r, lkb, 0);
2018 		grant_pending_locks(r);
2019 		goto out;
2020 	}
2021 
2022 	/* is_demoted() means the can_be_granted() above set the grmode
2023 	   to NL, and left us on the granted queue.  This auto-demotion
2024 	   (due to CONVDEADLK) might mean other locks, and/or this lock, are
2025 	   now grantable.  We have to try to grant other converting locks
2026 	   before we try again to grant this one. */
2027 
2028 	if (is_demoted(lkb)) {
2029 		grant_pending_convert(r, DLM_LOCK_IV);
2030 		if (_can_be_granted(r, lkb, 1)) {
2031 			grant_lock(r, lkb);
2032 			queue_cast(r, lkb, 0);
2033 			grant_pending_locks(r);
2034 			goto out;
2035 		}
2036 		/* else fall through and move to convert queue */
2037 	}
2038 
2039 	if (can_be_queued(lkb)) {
2040 		error = -EINPROGRESS;
2041 		del_lkb(r, lkb);
2042 		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2043 		send_blocking_asts(r, lkb);
2044 		goto out;
2045 	}
2046 
2047 	error = -EAGAIN;
2048 	if (force_blocking_asts(lkb))
2049 		send_blocking_asts_all(r, lkb);
2050 	queue_cast(r, lkb, -EAGAIN);
2051 
2052  out:
2053 	return error;
2054 }
2055 
2056 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2057 {
2058 	remove_lock(r, lkb);
2059 	queue_cast(r, lkb, -DLM_EUNLOCK);
2060 	grant_pending_locks(r);
2061 	return -DLM_EUNLOCK;
2062 }
2063 
2064 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
2065 
2066 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2067 {
2068 	int error;
2069 
2070 	error = revert_lock(r, lkb);
2071 	if (error) {
2072 		queue_cast(r, lkb, -DLM_ECANCEL);
2073 		grant_pending_locks(r);
2074 		return -DLM_ECANCEL;
2075 	}
2076 	return 0;
2077 }
2078 
2079 /*
2080  * Four stage 3 varieties:
2081  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2082  */
2083 
2084 /* add a new lkb to a possibly new rsb, called by requesting process */
2085 
2086 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2087 {
2088 	int error;
2089 
2090 	/* set_master: sets lkb nodeid from r */
2091 
2092 	error = set_master(r, lkb);
2093 	if (error < 0)
2094 		goto out;
2095 	if (error) {
2096 		error = 0;
2097 		goto out;
2098 	}
2099 
2100 	if (is_remote(r))
2101 		/* receive_request() calls do_request() on remote node */
2102 		error = send_request(r, lkb);
2103 	else
2104 		error = do_request(r, lkb);
2105  out:
2106 	return error;
2107 }
2108 
2109 /* change some property of an existing lkb, e.g. mode */
2110 
2111 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2112 {
2113 	int error;
2114 
2115 	if (is_remote(r))
2116 		/* receive_convert() calls do_convert() on remote node */
2117 		error = send_convert(r, lkb);
2118 	else
2119 		error = do_convert(r, lkb);
2120 
2121 	return error;
2122 }
2123 
2124 /* remove an existing lkb from the granted queue */
2125 
2126 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2127 {
2128 	int error;
2129 
2130 	if (is_remote(r))
2131 		/* receive_unlock() calls do_unlock() on remote node */
2132 		error = send_unlock(r, lkb);
2133 	else
2134 		error = do_unlock(r, lkb);
2135 
2136 	return error;
2137 }
2138 
2139 /* remove an existing lkb from the convert or wait queue */
2140 
2141 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2142 {
2143 	int error;
2144 
2145 	if (is_remote(r))
2146 		/* receive_cancel() calls do_cancel() on remote node */
2147 		error = send_cancel(r, lkb);
2148 	else
2149 		error = do_cancel(r, lkb);
2150 
2151 	return error;
2152 }
2153 
2154 /*
2155  * Four stage 2 varieties:
2156  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2157  */
2158 
2159 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2160 			int len, struct dlm_args *args)
2161 {
2162 	struct dlm_rsb *r;
2163 	int error;
2164 
2165 	error = validate_lock_args(ls, lkb, args);
2166 	if (error)
2167 		goto out;
2168 
2169 	error = find_rsb(ls, name, len, R_CREATE, &r);
2170 	if (error)
2171 		goto out;
2172 
2173 	lock_rsb(r);
2174 
2175 	attach_lkb(r, lkb);
2176 	lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2177 
2178 	error = _request_lock(r, lkb);
2179 
2180 	unlock_rsb(r);
2181 	put_rsb(r);
2182 
2183  out:
2184 	return error;
2185 }
2186 
2187 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2188 			struct dlm_args *args)
2189 {
2190 	struct dlm_rsb *r;
2191 	int error;
2192 
2193 	r = lkb->lkb_resource;
2194 
2195 	hold_rsb(r);
2196 	lock_rsb(r);
2197 
2198 	error = validate_lock_args(ls, lkb, args);
2199 	if (error)
2200 		goto out;
2201 
2202 	error = _convert_lock(r, lkb);
2203  out:
2204 	unlock_rsb(r);
2205 	put_rsb(r);
2206 	return error;
2207 }
2208 
2209 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2210 		       struct dlm_args *args)
2211 {
2212 	struct dlm_rsb *r;
2213 	int error;
2214 
2215 	r = lkb->lkb_resource;
2216 
2217 	hold_rsb(r);
2218 	lock_rsb(r);
2219 
2220 	error = validate_unlock_args(lkb, args);
2221 	if (error)
2222 		goto out;
2223 
2224 	error = _unlock_lock(r, lkb);
2225  out:
2226 	unlock_rsb(r);
2227 	put_rsb(r);
2228 	return error;
2229 }
2230 
2231 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2232 		       struct dlm_args *args)
2233 {
2234 	struct dlm_rsb *r;
2235 	int error;
2236 
2237 	r = lkb->lkb_resource;
2238 
2239 	hold_rsb(r);
2240 	lock_rsb(r);
2241 
2242 	error = validate_unlock_args(lkb, args);
2243 	if (error)
2244 		goto out;
2245 
2246 	error = _cancel_lock(r, lkb);
2247  out:
2248 	unlock_rsb(r);
2249 	put_rsb(r);
2250 	return error;
2251 }
2252 
2253 /*
2254  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
2255  */
2256 
2257 int dlm_lock(dlm_lockspace_t *lockspace,
2258 	     int mode,
2259 	     struct dlm_lksb *lksb,
2260 	     uint32_t flags,
2261 	     void *name,
2262 	     unsigned int namelen,
2263 	     uint32_t parent_lkid,
2264 	     void (*ast) (void *astarg),
2265 	     void *astarg,
2266 	     void (*bast) (void *astarg, int mode))
2267 {
2268 	struct dlm_ls *ls;
2269 	struct dlm_lkb *lkb;
2270 	struct dlm_args args;
2271 	int error, convert = flags & DLM_LKF_CONVERT;
2272 
2273 	ls = dlm_find_lockspace_local(lockspace);
2274 	if (!ls)
2275 		return -EINVAL;
2276 
2277 	lock_recovery(ls);
2278 
2279 	if (convert)
2280 		error = find_lkb(ls, lksb->sb_lkid, &lkb);
2281 	else
2282 		error = create_lkb(ls, &lkb);
2283 
2284 	if (error)
2285 		goto out;
2286 
2287 	error = set_lock_args(mode, lksb, flags, namelen, parent_lkid, ast,
2288 			      astarg, bast, &args);
2289 	if (error)
2290 		goto out_put;
2291 
2292 	if (convert)
2293 		error = convert_lock(ls, lkb, &args);
2294 	else
2295 		error = request_lock(ls, lkb, name, namelen, &args);
2296 
2297 	if (error == -EINPROGRESS)
2298 		error = 0;
2299  out_put:
2300 	if (convert || error)
2301 		__put_lkb(ls, lkb);
2302 	if (error == -EAGAIN)
2303 		error = 0;
2304  out:
2305 	unlock_recovery(ls);
2306 	dlm_put_lockspace(ls);
2307 	return error;
2308 }
2309 
2310 int dlm_unlock(dlm_lockspace_t *lockspace,
2311 	       uint32_t lkid,
2312 	       uint32_t flags,
2313 	       struct dlm_lksb *lksb,
2314 	       void *astarg)
2315 {
2316 	struct dlm_ls *ls;
2317 	struct dlm_lkb *lkb;
2318 	struct dlm_args args;
2319 	int error;
2320 
2321 	ls = dlm_find_lockspace_local(lockspace);
2322 	if (!ls)
2323 		return -EINVAL;
2324 
2325 	lock_recovery(ls);
2326 
2327 	error = find_lkb(ls, lkid, &lkb);
2328 	if (error)
2329 		goto out;
2330 
2331 	error = set_unlock_args(flags, astarg, &args);
2332 	if (error)
2333 		goto out_put;
2334 
2335 	if (flags & DLM_LKF_CANCEL)
2336 		error = cancel_lock(ls, lkb, &args);
2337 	else
2338 		error = unlock_lock(ls, lkb, &args);
2339 
2340 	if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2341 		error = 0;
2342 	if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2343 		error = 0;
2344  out_put:
2345 	dlm_put_lkb(lkb);
2346  out:
2347 	unlock_recovery(ls);
2348 	dlm_put_lockspace(ls);
2349 	return error;
2350 }
2351 
2352 /*
2353  * send/receive routines for remote operations and replies
2354  *
2355  * send_args
2356  * send_common
2357  * send_request			receive_request
2358  * send_convert			receive_convert
2359  * send_unlock			receive_unlock
2360  * send_cancel			receive_cancel
2361  * send_grant			receive_grant
2362  * send_bast			receive_bast
2363  * send_lookup			receive_lookup
2364  * send_remove			receive_remove
2365  *
2366  * 				send_common_reply
2367  * receive_request_reply	send_request_reply
2368  * receive_convert_reply	send_convert_reply
2369  * receive_unlock_reply		send_unlock_reply
2370  * receive_cancel_reply		send_cancel_reply
2371  * receive_lookup_reply		send_lookup_reply
2372  */
2373 
2374 static int _create_message(struct dlm_ls *ls, int mb_len,
2375 			   int to_nodeid, int mstype,
2376 			   struct dlm_message **ms_ret,
2377 			   struct dlm_mhandle **mh_ret)
2378 {
2379 	struct dlm_message *ms;
2380 	struct dlm_mhandle *mh;
2381 	char *mb;
2382 
2383 	/* get_buffer gives us a message handle (mh) that we need to
2384 	   pass into lowcomms_commit and a message buffer (mb) that we
2385 	   write our data into */
2386 
2387 	mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_KERNEL, &mb);
2388 	if (!mh)
2389 		return -ENOBUFS;
2390 
2391 	memset(mb, 0, mb_len);
2392 
2393 	ms = (struct dlm_message *) mb;
2394 
2395 	ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2396 	ms->m_header.h_lockspace = ls->ls_global_id;
2397 	ms->m_header.h_nodeid = dlm_our_nodeid();
2398 	ms->m_header.h_length = mb_len;
2399 	ms->m_header.h_cmd = DLM_MSG;
2400 
2401 	ms->m_type = mstype;
2402 
2403 	*mh_ret = mh;
2404 	*ms_ret = ms;
2405 	return 0;
2406 }
2407 
2408 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2409 			  int to_nodeid, int mstype,
2410 			  struct dlm_message **ms_ret,
2411 			  struct dlm_mhandle **mh_ret)
2412 {
2413 	int mb_len = sizeof(struct dlm_message);
2414 
2415 	switch (mstype) {
2416 	case DLM_MSG_REQUEST:
2417 	case DLM_MSG_LOOKUP:
2418 	case DLM_MSG_REMOVE:
2419 		mb_len += r->res_length;
2420 		break;
2421 	case DLM_MSG_CONVERT:
2422 	case DLM_MSG_UNLOCK:
2423 	case DLM_MSG_REQUEST_REPLY:
2424 	case DLM_MSG_CONVERT_REPLY:
2425 	case DLM_MSG_GRANT:
2426 		if (lkb && lkb->lkb_lvbptr)
2427 			mb_len += r->res_ls->ls_lvblen;
2428 		break;
2429 	}
2430 
2431 	return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2432 			       ms_ret, mh_ret);
2433 }
2434 
2435 /* further lowcomms enhancements or alternate implementations may make
2436    the return value from this function useful at some point */
2437 
2438 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2439 {
2440 	dlm_message_out(ms);
2441 	dlm_lowcomms_commit_buffer(mh);
2442 	return 0;
2443 }
2444 
2445 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2446 		      struct dlm_message *ms)
2447 {
2448 	ms->m_nodeid   = lkb->lkb_nodeid;
2449 	ms->m_pid      = lkb->lkb_ownpid;
2450 	ms->m_lkid     = lkb->lkb_id;
2451 	ms->m_remid    = lkb->lkb_remid;
2452 	ms->m_exflags  = lkb->lkb_exflags;
2453 	ms->m_sbflags  = lkb->lkb_sbflags;
2454 	ms->m_flags    = lkb->lkb_flags;
2455 	ms->m_lvbseq   = lkb->lkb_lvbseq;
2456 	ms->m_status   = lkb->lkb_status;
2457 	ms->m_grmode   = lkb->lkb_grmode;
2458 	ms->m_rqmode   = lkb->lkb_rqmode;
2459 	ms->m_hash     = r->res_hash;
2460 
2461 	/* m_result and m_bastmode are set from function args,
2462 	   not from lkb fields */
2463 
2464 	if (lkb->lkb_bastaddr)
2465 		ms->m_asts |= AST_BAST;
2466 	if (lkb->lkb_astaddr)
2467 		ms->m_asts |= AST_COMP;
2468 
2469 	/* compare with switch in create_message; send_remove() doesn't
2470 	   use send_args() */
2471 
2472 	switch (ms->m_type) {
2473 	case DLM_MSG_REQUEST:
2474 	case DLM_MSG_LOOKUP:
2475 		memcpy(ms->m_extra, r->res_name, r->res_length);
2476 		break;
2477 	case DLM_MSG_CONVERT:
2478 	case DLM_MSG_UNLOCK:
2479 	case DLM_MSG_REQUEST_REPLY:
2480 	case DLM_MSG_CONVERT_REPLY:
2481 	case DLM_MSG_GRANT:
2482 		if (!lkb->lkb_lvbptr)
2483 			break;
2484 		memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2485 		break;
2486 	}
2487 }
2488 
2489 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2490 {
2491 	struct dlm_message *ms;
2492 	struct dlm_mhandle *mh;
2493 	int to_nodeid, error;
2494 
2495 	error = add_to_waiters(lkb, mstype);
2496 	if (error)
2497 		return error;
2498 
2499 	to_nodeid = r->res_nodeid;
2500 
2501 	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2502 	if (error)
2503 		goto fail;
2504 
2505 	send_args(r, lkb, ms);
2506 
2507 	error = send_message(mh, ms);
2508 	if (error)
2509 		goto fail;
2510 	return 0;
2511 
2512  fail:
2513 	remove_from_waiters(lkb, msg_reply_type(mstype));
2514 	return error;
2515 }
2516 
2517 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2518 {
2519 	return send_common(r, lkb, DLM_MSG_REQUEST);
2520 }
2521 
2522 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2523 {
2524 	int error;
2525 
2526 	error = send_common(r, lkb, DLM_MSG_CONVERT);
2527 
2528 	/* down conversions go without a reply from the master */
2529 	if (!error && down_conversion(lkb)) {
2530 		remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2531 		r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
2532 		r->res_ls->ls_stub_ms.m_result = 0;
2533 		r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
2534 		__receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2535 	}
2536 
2537 	return error;
2538 }
2539 
2540 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
2541    MASTER_UNCERTAIN to force the next request on the rsb to confirm
2542    that the master is still correct. */
2543 
2544 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2545 {
2546 	return send_common(r, lkb, DLM_MSG_UNLOCK);
2547 }
2548 
2549 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2550 {
2551 	return send_common(r, lkb, DLM_MSG_CANCEL);
2552 }
2553 
2554 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2555 {
2556 	struct dlm_message *ms;
2557 	struct dlm_mhandle *mh;
2558 	int to_nodeid, error;
2559 
2560 	to_nodeid = lkb->lkb_nodeid;
2561 
2562 	error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2563 	if (error)
2564 		goto out;
2565 
2566 	send_args(r, lkb, ms);
2567 
2568 	ms->m_result = 0;
2569 
2570 	error = send_message(mh, ms);
2571  out:
2572 	return error;
2573 }
2574 
2575 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2576 {
2577 	struct dlm_message *ms;
2578 	struct dlm_mhandle *mh;
2579 	int to_nodeid, error;
2580 
2581 	to_nodeid = lkb->lkb_nodeid;
2582 
2583 	error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2584 	if (error)
2585 		goto out;
2586 
2587 	send_args(r, lkb, ms);
2588 
2589 	ms->m_bastmode = mode;
2590 
2591 	error = send_message(mh, ms);
2592  out:
2593 	return error;
2594 }
2595 
2596 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2597 {
2598 	struct dlm_message *ms;
2599 	struct dlm_mhandle *mh;
2600 	int to_nodeid, error;
2601 
2602 	error = add_to_waiters(lkb, DLM_MSG_LOOKUP);
2603 	if (error)
2604 		return error;
2605 
2606 	to_nodeid = dlm_dir_nodeid(r);
2607 
2608 	error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2609 	if (error)
2610 		goto fail;
2611 
2612 	send_args(r, lkb, ms);
2613 
2614 	error = send_message(mh, ms);
2615 	if (error)
2616 		goto fail;
2617 	return 0;
2618 
2619  fail:
2620 	remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
2621 	return error;
2622 }
2623 
2624 static int send_remove(struct dlm_rsb *r)
2625 {
2626 	struct dlm_message *ms;
2627 	struct dlm_mhandle *mh;
2628 	int to_nodeid, error;
2629 
2630 	to_nodeid = dlm_dir_nodeid(r);
2631 
2632 	error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2633 	if (error)
2634 		goto out;
2635 
2636 	memcpy(ms->m_extra, r->res_name, r->res_length);
2637 	ms->m_hash = r->res_hash;
2638 
2639 	error = send_message(mh, ms);
2640  out:
2641 	return error;
2642 }
2643 
2644 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2645 			     int mstype, int rv)
2646 {
2647 	struct dlm_message *ms;
2648 	struct dlm_mhandle *mh;
2649 	int to_nodeid, error;
2650 
2651 	to_nodeid = lkb->lkb_nodeid;
2652 
2653 	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2654 	if (error)
2655 		goto out;
2656 
2657 	send_args(r, lkb, ms);
2658 
2659 	ms->m_result = rv;
2660 
2661 	error = send_message(mh, ms);
2662  out:
2663 	return error;
2664 }
2665 
2666 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2667 {
2668 	return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
2669 }
2670 
2671 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2672 {
2673 	return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
2674 }
2675 
2676 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2677 {
2678 	return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
2679 }
2680 
2681 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2682 {
2683 	return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
2684 }
2685 
2686 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
2687 			     int ret_nodeid, int rv)
2688 {
2689 	struct dlm_rsb *r = &ls->ls_stub_rsb;
2690 	struct dlm_message *ms;
2691 	struct dlm_mhandle *mh;
2692 	int error, nodeid = ms_in->m_header.h_nodeid;
2693 
2694 	error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
2695 	if (error)
2696 		goto out;
2697 
2698 	ms->m_lkid = ms_in->m_lkid;
2699 	ms->m_result = rv;
2700 	ms->m_nodeid = ret_nodeid;
2701 
2702 	error = send_message(mh, ms);
2703  out:
2704 	return error;
2705 }
2706 
2707 /* which args we save from a received message depends heavily on the type
2708    of message, unlike the send side where we can safely send everything about
2709    the lkb for any type of message */
2710 
2711 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
2712 {
2713 	lkb->lkb_exflags = ms->m_exflags;
2714 	lkb->lkb_sbflags = ms->m_sbflags;
2715 	lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2716 		         (ms->m_flags & 0x0000FFFF);
2717 }
2718 
2719 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2720 {
2721 	lkb->lkb_sbflags = ms->m_sbflags;
2722 	lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2723 		         (ms->m_flags & 0x0000FFFF);
2724 }
2725 
2726 static int receive_extralen(struct dlm_message *ms)
2727 {
2728 	return (ms->m_header.h_length - sizeof(struct dlm_message));
2729 }
2730 
2731 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
2732 		       struct dlm_message *ms)
2733 {
2734 	int len;
2735 
2736 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2737 		if (!lkb->lkb_lvbptr)
2738 			lkb->lkb_lvbptr = allocate_lvb(ls);
2739 		if (!lkb->lkb_lvbptr)
2740 			return -ENOMEM;
2741 		len = receive_extralen(ms);
2742 		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2743 	}
2744 	return 0;
2745 }
2746 
2747 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2748 				struct dlm_message *ms)
2749 {
2750 	lkb->lkb_nodeid = ms->m_header.h_nodeid;
2751 	lkb->lkb_ownpid = ms->m_pid;
2752 	lkb->lkb_remid = ms->m_lkid;
2753 	lkb->lkb_grmode = DLM_LOCK_IV;
2754 	lkb->lkb_rqmode = ms->m_rqmode;
2755 	lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST);
2756 	lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP);
2757 
2758 	DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb););
2759 
2760 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2761 		/* lkb was just created so there won't be an lvb yet */
2762 		lkb->lkb_lvbptr = allocate_lvb(ls);
2763 		if (!lkb->lkb_lvbptr)
2764 			return -ENOMEM;
2765 	}
2766 
2767 	return 0;
2768 }
2769 
2770 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2771 				struct dlm_message *ms)
2772 {
2773 	if (lkb->lkb_nodeid != ms->m_header.h_nodeid) {
2774 		log_error(ls, "convert_args nodeid %d %d lkid %x %x",
2775 			  lkb->lkb_nodeid, ms->m_header.h_nodeid,
2776 			  lkb->lkb_id, lkb->lkb_remid);
2777 		return -EINVAL;
2778 	}
2779 
2780 	if (!is_master_copy(lkb))
2781 		return -EINVAL;
2782 
2783 	if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2784 		return -EBUSY;
2785 
2786 	if (receive_lvb(ls, lkb, ms))
2787 		return -ENOMEM;
2788 
2789 	lkb->lkb_rqmode = ms->m_rqmode;
2790 	lkb->lkb_lvbseq = ms->m_lvbseq;
2791 
2792 	return 0;
2793 }
2794 
2795 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2796 			       struct dlm_message *ms)
2797 {
2798 	if (!is_master_copy(lkb))
2799 		return -EINVAL;
2800 	if (receive_lvb(ls, lkb, ms))
2801 		return -ENOMEM;
2802 	return 0;
2803 }
2804 
2805 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
2806    uses to send a reply and that the remote end uses to process the reply. */
2807 
2808 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
2809 {
2810 	struct dlm_lkb *lkb = &ls->ls_stub_lkb;
2811 	lkb->lkb_nodeid = ms->m_header.h_nodeid;
2812 	lkb->lkb_remid = ms->m_lkid;
2813 }
2814 
2815 static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
2816 {
2817 	struct dlm_lkb *lkb;
2818 	struct dlm_rsb *r;
2819 	int error, namelen;
2820 
2821 	error = create_lkb(ls, &lkb);
2822 	if (error)
2823 		goto fail;
2824 
2825 	receive_flags(lkb, ms);
2826 	lkb->lkb_flags |= DLM_IFL_MSTCPY;
2827 	error = receive_request_args(ls, lkb, ms);
2828 	if (error) {
2829 		__put_lkb(ls, lkb);
2830 		goto fail;
2831 	}
2832 
2833 	namelen = receive_extralen(ms);
2834 
2835 	error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
2836 	if (error) {
2837 		__put_lkb(ls, lkb);
2838 		goto fail;
2839 	}
2840 
2841 	lock_rsb(r);
2842 
2843 	attach_lkb(r, lkb);
2844 	error = do_request(r, lkb);
2845 	send_request_reply(r, lkb, error);
2846 
2847 	unlock_rsb(r);
2848 	put_rsb(r);
2849 
2850 	if (error == -EINPROGRESS)
2851 		error = 0;
2852 	if (error)
2853 		dlm_put_lkb(lkb);
2854 	return;
2855 
2856  fail:
2857 	setup_stub_lkb(ls, ms);
2858 	send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2859 }
2860 
2861 static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
2862 {
2863 	struct dlm_lkb *lkb;
2864 	struct dlm_rsb *r;
2865 	int error, reply = 1;
2866 
2867 	error = find_lkb(ls, ms->m_remid, &lkb);
2868 	if (error)
2869 		goto fail;
2870 
2871 	r = lkb->lkb_resource;
2872 
2873 	hold_rsb(r);
2874 	lock_rsb(r);
2875 
2876 	receive_flags(lkb, ms);
2877 	error = receive_convert_args(ls, lkb, ms);
2878 	if (error)
2879 		goto out;
2880 	reply = !down_conversion(lkb);
2881 
2882 	error = do_convert(r, lkb);
2883  out:
2884 	if (reply)
2885 		send_convert_reply(r, lkb, error);
2886 
2887 	unlock_rsb(r);
2888 	put_rsb(r);
2889 	dlm_put_lkb(lkb);
2890 	return;
2891 
2892  fail:
2893 	setup_stub_lkb(ls, ms);
2894 	send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2895 }
2896 
2897 static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
2898 {
2899 	struct dlm_lkb *lkb;
2900 	struct dlm_rsb *r;
2901 	int error;
2902 
2903 	error = find_lkb(ls, ms->m_remid, &lkb);
2904 	if (error)
2905 		goto fail;
2906 
2907 	r = lkb->lkb_resource;
2908 
2909 	hold_rsb(r);
2910 	lock_rsb(r);
2911 
2912 	receive_flags(lkb, ms);
2913 	error = receive_unlock_args(ls, lkb, ms);
2914 	if (error)
2915 		goto out;
2916 
2917 	error = do_unlock(r, lkb);
2918  out:
2919 	send_unlock_reply(r, lkb, error);
2920 
2921 	unlock_rsb(r);
2922 	put_rsb(r);
2923 	dlm_put_lkb(lkb);
2924 	return;
2925 
2926  fail:
2927 	setup_stub_lkb(ls, ms);
2928 	send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2929 }
2930 
2931 static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
2932 {
2933 	struct dlm_lkb *lkb;
2934 	struct dlm_rsb *r;
2935 	int error;
2936 
2937 	error = find_lkb(ls, ms->m_remid, &lkb);
2938 	if (error)
2939 		goto fail;
2940 
2941 	receive_flags(lkb, ms);
2942 
2943 	r = lkb->lkb_resource;
2944 
2945 	hold_rsb(r);
2946 	lock_rsb(r);
2947 
2948 	error = do_cancel(r, lkb);
2949 	send_cancel_reply(r, lkb, error);
2950 
2951 	unlock_rsb(r);
2952 	put_rsb(r);
2953 	dlm_put_lkb(lkb);
2954 	return;
2955 
2956  fail:
2957 	setup_stub_lkb(ls, ms);
2958 	send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2959 }
2960 
2961 static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
2962 {
2963 	struct dlm_lkb *lkb;
2964 	struct dlm_rsb *r;
2965 	int error;
2966 
2967 	error = find_lkb(ls, ms->m_remid, &lkb);
2968 	if (error) {
2969 		log_error(ls, "receive_grant no lkb");
2970 		return;
2971 	}
2972 	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2973 
2974 	r = lkb->lkb_resource;
2975 
2976 	hold_rsb(r);
2977 	lock_rsb(r);
2978 
2979 	receive_flags_reply(lkb, ms);
2980 	if (is_altmode(lkb))
2981 		munge_altmode(lkb, ms);
2982 	grant_lock_pc(r, lkb, ms);
2983 	queue_cast(r, lkb, 0);
2984 
2985 	unlock_rsb(r);
2986 	put_rsb(r);
2987 	dlm_put_lkb(lkb);
2988 }
2989 
2990 static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
2991 {
2992 	struct dlm_lkb *lkb;
2993 	struct dlm_rsb *r;
2994 	int error;
2995 
2996 	error = find_lkb(ls, ms->m_remid, &lkb);
2997 	if (error) {
2998 		log_error(ls, "receive_bast no lkb");
2999 		return;
3000 	}
3001 	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3002 
3003 	r = lkb->lkb_resource;
3004 
3005 	hold_rsb(r);
3006 	lock_rsb(r);
3007 
3008 	queue_bast(r, lkb, ms->m_bastmode);
3009 
3010 	unlock_rsb(r);
3011 	put_rsb(r);
3012 	dlm_put_lkb(lkb);
3013 }
3014 
3015 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3016 {
3017 	int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
3018 
3019 	from_nodeid = ms->m_header.h_nodeid;
3020 	our_nodeid = dlm_our_nodeid();
3021 
3022 	len = receive_extralen(ms);
3023 
3024 	dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3025 	if (dir_nodeid != our_nodeid) {
3026 		log_error(ls, "lookup dir_nodeid %d from %d",
3027 			  dir_nodeid, from_nodeid);
3028 		error = -EINVAL;
3029 		ret_nodeid = -1;
3030 		goto out;
3031 	}
3032 
3033 	error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
3034 
3035 	/* Optimization: we're master so treat lookup as a request */
3036 	if (!error && ret_nodeid == our_nodeid) {
3037 		receive_request(ls, ms);
3038 		return;
3039 	}
3040  out:
3041 	send_lookup_reply(ls, ms, ret_nodeid, error);
3042 }
3043 
3044 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
3045 {
3046 	int len, dir_nodeid, from_nodeid;
3047 
3048 	from_nodeid = ms->m_header.h_nodeid;
3049 
3050 	len = receive_extralen(ms);
3051 
3052 	dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3053 	if (dir_nodeid != dlm_our_nodeid()) {
3054 		log_error(ls, "remove dir entry dir_nodeid %d from %d",
3055 			  dir_nodeid, from_nodeid);
3056 		return;
3057 	}
3058 
3059 	dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
3060 }
3061 
3062 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3063 {
3064 	do_purge(ls, ms->m_nodeid, ms->m_pid);
3065 }
3066 
3067 static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3068 {
3069 	struct dlm_lkb *lkb;
3070 	struct dlm_rsb *r;
3071 	int error, mstype, result;
3072 
3073 	error = find_lkb(ls, ms->m_remid, &lkb);
3074 	if (error) {
3075 		log_error(ls, "receive_request_reply no lkb");
3076 		return;
3077 	}
3078 	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3079 
3080 	r = lkb->lkb_resource;
3081 	hold_rsb(r);
3082 	lock_rsb(r);
3083 
3084 	mstype = lkb->lkb_wait_type;
3085 	error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3086 	if (error)
3087 		goto out;
3088 
3089 	/* Optimization: the dir node was also the master, so it took our
3090 	   lookup as a request and sent request reply instead of lookup reply */
3091 	if (mstype == DLM_MSG_LOOKUP) {
3092 		r->res_nodeid = ms->m_header.h_nodeid;
3093 		lkb->lkb_nodeid = r->res_nodeid;
3094 	}
3095 
3096 	/* this is the value returned from do_request() on the master */
3097 	result = ms->m_result;
3098 
3099 	switch (result) {
3100 	case -EAGAIN:
3101 		/* request would block (be queued) on remote master */
3102 		queue_cast(r, lkb, -EAGAIN);
3103 		confirm_master(r, -EAGAIN);
3104 		unhold_lkb(lkb); /* undoes create_lkb() */
3105 		break;
3106 
3107 	case -EINPROGRESS:
3108 	case 0:
3109 		/* request was queued or granted on remote master */
3110 		receive_flags_reply(lkb, ms);
3111 		lkb->lkb_remid = ms->m_lkid;
3112 		if (is_altmode(lkb))
3113 			munge_altmode(lkb, ms);
3114 		if (result)
3115 			add_lkb(r, lkb, DLM_LKSTS_WAITING);
3116 		else {
3117 			grant_lock_pc(r, lkb, ms);
3118 			queue_cast(r, lkb, 0);
3119 		}
3120 		confirm_master(r, result);
3121 		break;
3122 
3123 	case -EBADR:
3124 	case -ENOTBLK:
3125 		/* find_rsb failed to find rsb or rsb wasn't master */
3126 		log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3127 			  lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
3128 		r->res_nodeid = -1;
3129 		lkb->lkb_nodeid = -1;
3130 
3131 		if (is_overlap(lkb)) {
3132 			/* we'll ignore error in cancel/unlock reply */
3133 			queue_cast_overlap(r, lkb);
3134 			unhold_lkb(lkb); /* undoes create_lkb() */
3135 		} else
3136 			_request_lock(r, lkb);
3137 		break;
3138 
3139 	default:
3140 		log_error(ls, "receive_request_reply %x error %d",
3141 			  lkb->lkb_id, result);
3142 	}
3143 
3144 	if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3145 		log_debug(ls, "receive_request_reply %x result %d unlock",
3146 			  lkb->lkb_id, result);
3147 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3148 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3149 		send_unlock(r, lkb);
3150 	} else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3151 		log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3152 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3153 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3154 		send_cancel(r, lkb);
3155 	} else {
3156 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3157 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3158 	}
3159  out:
3160 	unlock_rsb(r);
3161 	put_rsb(r);
3162 	dlm_put_lkb(lkb);
3163 }
3164 
3165 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3166 				    struct dlm_message *ms)
3167 {
3168 	/* this is the value returned from do_convert() on the master */
3169 	switch (ms->m_result) {
3170 	case -EAGAIN:
3171 		/* convert would block (be queued) on remote master */
3172 		queue_cast(r, lkb, -EAGAIN);
3173 		break;
3174 
3175 	case -EINPROGRESS:
3176 		/* convert was queued on remote master */
3177 		receive_flags_reply(lkb, ms);
3178 		if (is_demoted(lkb))
3179 			munge_demoted(lkb, ms);
3180 		del_lkb(r, lkb);
3181 		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3182 		break;
3183 
3184 	case 0:
3185 		/* convert was granted on remote master */
3186 		receive_flags_reply(lkb, ms);
3187 		if (is_demoted(lkb))
3188 			munge_demoted(lkb, ms);
3189 		grant_lock_pc(r, lkb, ms);
3190 		queue_cast(r, lkb, 0);
3191 		break;
3192 
3193 	default:
3194 		log_error(r->res_ls, "receive_convert_reply %x error %d",
3195 			  lkb->lkb_id, ms->m_result);
3196 	}
3197 }
3198 
3199 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3200 {
3201 	struct dlm_rsb *r = lkb->lkb_resource;
3202 	int error;
3203 
3204 	hold_rsb(r);
3205 	lock_rsb(r);
3206 
3207 	/* stub reply can happen with waiters_mutex held */
3208 	error = remove_from_waiters_ms(lkb, ms);
3209 	if (error)
3210 		goto out;
3211 
3212 	__receive_convert_reply(r, lkb, ms);
3213  out:
3214 	unlock_rsb(r);
3215 	put_rsb(r);
3216 }
3217 
3218 static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3219 {
3220 	struct dlm_lkb *lkb;
3221 	int error;
3222 
3223 	error = find_lkb(ls, ms->m_remid, &lkb);
3224 	if (error) {
3225 		log_error(ls, "receive_convert_reply no lkb");
3226 		return;
3227 	}
3228 	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3229 
3230 	_receive_convert_reply(lkb, ms);
3231 	dlm_put_lkb(lkb);
3232 }
3233 
3234 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3235 {
3236 	struct dlm_rsb *r = lkb->lkb_resource;
3237 	int error;
3238 
3239 	hold_rsb(r);
3240 	lock_rsb(r);
3241 
3242 	/* stub reply can happen with waiters_mutex held */
3243 	error = remove_from_waiters_ms(lkb, ms);
3244 	if (error)
3245 		goto out;
3246 
3247 	/* this is the value returned from do_unlock() on the master */
3248 
3249 	switch (ms->m_result) {
3250 	case -DLM_EUNLOCK:
3251 		receive_flags_reply(lkb, ms);
3252 		remove_lock_pc(r, lkb);
3253 		queue_cast(r, lkb, -DLM_EUNLOCK);
3254 		break;
3255 	case -ENOENT:
3256 		break;
3257 	default:
3258 		log_error(r->res_ls, "receive_unlock_reply %x error %d",
3259 			  lkb->lkb_id, ms->m_result);
3260 	}
3261  out:
3262 	unlock_rsb(r);
3263 	put_rsb(r);
3264 }
3265 
3266 static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3267 {
3268 	struct dlm_lkb *lkb;
3269 	int error;
3270 
3271 	error = find_lkb(ls, ms->m_remid, &lkb);
3272 	if (error) {
3273 		log_error(ls, "receive_unlock_reply no lkb");
3274 		return;
3275 	}
3276 	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3277 
3278 	_receive_unlock_reply(lkb, ms);
3279 	dlm_put_lkb(lkb);
3280 }
3281 
3282 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3283 {
3284 	struct dlm_rsb *r = lkb->lkb_resource;
3285 	int error;
3286 
3287 	hold_rsb(r);
3288 	lock_rsb(r);
3289 
3290 	/* stub reply can happen with waiters_mutex held */
3291 	error = remove_from_waiters_ms(lkb, ms);
3292 	if (error)
3293 		goto out;
3294 
3295 	/* this is the value returned from do_cancel() on the master */
3296 
3297 	switch (ms->m_result) {
3298 	case -DLM_ECANCEL:
3299 		receive_flags_reply(lkb, ms);
3300 		revert_lock_pc(r, lkb);
3301 		if (ms->m_result)
3302 			queue_cast(r, lkb, -DLM_ECANCEL);
3303 		break;
3304 	case 0:
3305 		break;
3306 	default:
3307 		log_error(r->res_ls, "receive_cancel_reply %x error %d",
3308 			  lkb->lkb_id, ms->m_result);
3309 	}
3310  out:
3311 	unlock_rsb(r);
3312 	put_rsb(r);
3313 }
3314 
3315 static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3316 {
3317 	struct dlm_lkb *lkb;
3318 	int error;
3319 
3320 	error = find_lkb(ls, ms->m_remid, &lkb);
3321 	if (error) {
3322 		log_error(ls, "receive_cancel_reply no lkb");
3323 		return;
3324 	}
3325 	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3326 
3327 	_receive_cancel_reply(lkb, ms);
3328 	dlm_put_lkb(lkb);
3329 }
3330 
3331 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3332 {
3333 	struct dlm_lkb *lkb;
3334 	struct dlm_rsb *r;
3335 	int error, ret_nodeid;
3336 
3337 	error = find_lkb(ls, ms->m_lkid, &lkb);
3338 	if (error) {
3339 		log_error(ls, "receive_lookup_reply no lkb");
3340 		return;
3341 	}
3342 
3343 	/* ms->m_result is the value returned by dlm_dir_lookup on dir node
3344 	   FIXME: will a non-zero error ever be returned? */
3345 
3346 	r = lkb->lkb_resource;
3347 	hold_rsb(r);
3348 	lock_rsb(r);
3349 
3350 	error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3351 	if (error)
3352 		goto out;
3353 
3354 	ret_nodeid = ms->m_nodeid;
3355 	if (ret_nodeid == dlm_our_nodeid()) {
3356 		r->res_nodeid = 0;
3357 		ret_nodeid = 0;
3358 		r->res_first_lkid = 0;
3359 	} else {
3360 		/* set_master() will copy res_nodeid to lkb_nodeid */
3361 		r->res_nodeid = ret_nodeid;
3362 	}
3363 
3364 	if (is_overlap(lkb)) {
3365 		log_debug(ls, "receive_lookup_reply %x unlock %x",
3366 			  lkb->lkb_id, lkb->lkb_flags);
3367 		queue_cast_overlap(r, lkb);
3368 		unhold_lkb(lkb); /* undoes create_lkb() */
3369 		goto out_list;
3370 	}
3371 
3372 	_request_lock(r, lkb);
3373 
3374  out_list:
3375 	if (!ret_nodeid)
3376 		process_lookup_list(r);
3377  out:
3378 	unlock_rsb(r);
3379 	put_rsb(r);
3380 	dlm_put_lkb(lkb);
3381 }
3382 
3383 int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
3384 {
3385 	struct dlm_message *ms = (struct dlm_message *) hd;
3386 	struct dlm_ls *ls;
3387 	int error = 0;
3388 
3389 	if (!recovery)
3390 		dlm_message_in(ms);
3391 
3392 	ls = dlm_find_lockspace_global(hd->h_lockspace);
3393 	if (!ls) {
3394 		log_print("drop message %d from %d for unknown lockspace %d",
3395 			  ms->m_type, nodeid, hd->h_lockspace);
3396 		return -EINVAL;
3397 	}
3398 
3399 	/* recovery may have just ended leaving a bunch of backed-up requests
3400 	   in the requestqueue; wait while dlm_recoverd clears them */
3401 
3402 	if (!recovery)
3403 		dlm_wait_requestqueue(ls);
3404 
3405 	/* recovery may have just started while there were a bunch of
3406 	   in-flight requests -- save them in requestqueue to be processed
3407 	   after recovery.  we can't let dlm_recvd block on the recovery
3408 	   lock.  if dlm_recoverd is calling this function to clear the
3409 	   requestqueue, it needs to be interrupted (-EINTR) if another
3410 	   recovery operation is starting. */
3411 
3412 	while (1) {
3413 		if (dlm_locking_stopped(ls)) {
3414 			if (recovery) {
3415 				error = -EINTR;
3416 				goto out;
3417 			}
3418 			error = dlm_add_requestqueue(ls, nodeid, hd);
3419 			if (error == -EAGAIN)
3420 				continue;
3421 			else {
3422 				error = -EINTR;
3423 				goto out;
3424 			}
3425 		}
3426 
3427 		if (lock_recovery_try(ls))
3428 			break;
3429 		schedule();
3430 	}
3431 
3432 	switch (ms->m_type) {
3433 
3434 	/* messages sent to a master node */
3435 
3436 	case DLM_MSG_REQUEST:
3437 		receive_request(ls, ms);
3438 		break;
3439 
3440 	case DLM_MSG_CONVERT:
3441 		receive_convert(ls, ms);
3442 		break;
3443 
3444 	case DLM_MSG_UNLOCK:
3445 		receive_unlock(ls, ms);
3446 		break;
3447 
3448 	case DLM_MSG_CANCEL:
3449 		receive_cancel(ls, ms);
3450 		break;
3451 
3452 	/* messages sent from a master node (replies to above) */
3453 
3454 	case DLM_MSG_REQUEST_REPLY:
3455 		receive_request_reply(ls, ms);
3456 		break;
3457 
3458 	case DLM_MSG_CONVERT_REPLY:
3459 		receive_convert_reply(ls, ms);
3460 		break;
3461 
3462 	case DLM_MSG_UNLOCK_REPLY:
3463 		receive_unlock_reply(ls, ms);
3464 		break;
3465 
3466 	case DLM_MSG_CANCEL_REPLY:
3467 		receive_cancel_reply(ls, ms);
3468 		break;
3469 
3470 	/* messages sent from a master node (only two types of async msg) */
3471 
3472 	case DLM_MSG_GRANT:
3473 		receive_grant(ls, ms);
3474 		break;
3475 
3476 	case DLM_MSG_BAST:
3477 		receive_bast(ls, ms);
3478 		break;
3479 
3480 	/* messages sent to a dir node */
3481 
3482 	case DLM_MSG_LOOKUP:
3483 		receive_lookup(ls, ms);
3484 		break;
3485 
3486 	case DLM_MSG_REMOVE:
3487 		receive_remove(ls, ms);
3488 		break;
3489 
3490 	/* messages sent from a dir node (remove has no reply) */
3491 
3492 	case DLM_MSG_LOOKUP_REPLY:
3493 		receive_lookup_reply(ls, ms);
3494 		break;
3495 
3496 	/* other messages */
3497 
3498 	case DLM_MSG_PURGE:
3499 		receive_purge(ls, ms);
3500 		break;
3501 
3502 	default:
3503 		log_error(ls, "unknown message type %d", ms->m_type);
3504 	}
3505 
3506 	unlock_recovery(ls);
3507  out:
3508 	dlm_put_lockspace(ls);
3509 	dlm_astd_wake();
3510 	return error;
3511 }
3512 
3513 
3514 /*
3515  * Recovery related
3516  */
3517 
3518 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3519 {
3520 	if (middle_conversion(lkb)) {
3521 		hold_lkb(lkb);
3522 		ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
3523 		ls->ls_stub_ms.m_result = -EINPROGRESS;
3524 		ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3525 		_receive_convert_reply(lkb, &ls->ls_stub_ms);
3526 
3527 		/* Same special case as in receive_rcom_lock_args() */
3528 		lkb->lkb_grmode = DLM_LOCK_IV;
3529 		rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
3530 		unhold_lkb(lkb);
3531 
3532 	} else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
3533 		lkb->lkb_flags |= DLM_IFL_RESEND;
3534 	}
3535 
3536 	/* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
3537 	   conversions are async; there's no reply from the remote master */
3538 }
3539 
3540 /* A waiting lkb needs recovery if the master node has failed, or
3541    the master node is changing (only when no directory is used) */
3542 
3543 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
3544 {
3545 	if (dlm_is_removed(ls, lkb->lkb_nodeid))
3546 		return 1;
3547 
3548 	if (!dlm_no_directory(ls))
3549 		return 0;
3550 
3551 	if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
3552 		return 1;
3553 
3554 	return 0;
3555 }
3556 
3557 /* Recovery for locks that are waiting for replies from nodes that are now
3558    gone.  We can just complete unlocks and cancels by faking a reply from the
3559    dead node.  Requests and up-conversions we flag to be resent after
3560    recovery.  Down-conversions can just be completed with a fake reply like
3561    unlocks.  Conversions between PR and CW need special attention. */
3562 
3563 void dlm_recover_waiters_pre(struct dlm_ls *ls)
3564 {
3565 	struct dlm_lkb *lkb, *safe;
3566 
3567 	mutex_lock(&ls->ls_waiters_mutex);
3568 
3569 	list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
3570 		log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
3571 			  lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
3572 
3573 		/* all outstanding lookups, regardless of destination  will be
3574 		   resent after recovery is done */
3575 
3576 		if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
3577 			lkb->lkb_flags |= DLM_IFL_RESEND;
3578 			continue;
3579 		}
3580 
3581 		if (!waiter_needs_recovery(ls, lkb))
3582 			continue;
3583 
3584 		switch (lkb->lkb_wait_type) {
3585 
3586 		case DLM_MSG_REQUEST:
3587 			lkb->lkb_flags |= DLM_IFL_RESEND;
3588 			break;
3589 
3590 		case DLM_MSG_CONVERT:
3591 			recover_convert_waiter(ls, lkb);
3592 			break;
3593 
3594 		case DLM_MSG_UNLOCK:
3595 			hold_lkb(lkb);
3596 			ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
3597 			ls->ls_stub_ms.m_result = -DLM_EUNLOCK;
3598 			ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3599 			_receive_unlock_reply(lkb, &ls->ls_stub_ms);
3600 			dlm_put_lkb(lkb);
3601 			break;
3602 
3603 		case DLM_MSG_CANCEL:
3604 			hold_lkb(lkb);
3605 			ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
3606 			ls->ls_stub_ms.m_result = -DLM_ECANCEL;
3607 			ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3608 			_receive_cancel_reply(lkb, &ls->ls_stub_ms);
3609 			dlm_put_lkb(lkb);
3610 			break;
3611 
3612 		default:
3613 			log_error(ls, "invalid lkb wait_type %d",
3614 				  lkb->lkb_wait_type);
3615 		}
3616 		schedule();
3617 	}
3618 	mutex_unlock(&ls->ls_waiters_mutex);
3619 }
3620 
3621 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
3622 {
3623 	struct dlm_lkb *lkb;
3624 	int found = 0;
3625 
3626 	mutex_lock(&ls->ls_waiters_mutex);
3627 	list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
3628 		if (lkb->lkb_flags & DLM_IFL_RESEND) {
3629 			hold_lkb(lkb);
3630 			found = 1;
3631 			break;
3632 		}
3633 	}
3634 	mutex_unlock(&ls->ls_waiters_mutex);
3635 
3636 	if (!found)
3637 		lkb = NULL;
3638 	return lkb;
3639 }
3640 
3641 /* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
3642    master or dir-node for r.  Processing the lkb may result in it being placed
3643    back on waiters. */
3644 
3645 /* We do this after normal locking has been enabled and any saved messages
3646    (in requestqueue) have been processed.  We should be confident that at
3647    this point we won't get or process a reply to any of these waiting
3648    operations.  But, new ops may be coming in on the rsbs/locks here from
3649    userspace or remotely. */
3650 
3651 /* there may have been an overlap unlock/cancel prior to recovery or after
3652    recovery.  if before, the lkb may still have a pos wait_count; if after, the
3653    overlap flag would just have been set and nothing new sent.  we can be
3654    confident here than any replies to either the initial op or overlap ops
3655    prior to recovery have been received. */
3656 
3657 int dlm_recover_waiters_post(struct dlm_ls *ls)
3658 {
3659 	struct dlm_lkb *lkb;
3660 	struct dlm_rsb *r;
3661 	int error = 0, mstype, err, oc, ou;
3662 
3663 	while (1) {
3664 		if (dlm_locking_stopped(ls)) {
3665 			log_debug(ls, "recover_waiters_post aborted");
3666 			error = -EINTR;
3667 			break;
3668 		}
3669 
3670 		lkb = find_resend_waiter(ls);
3671 		if (!lkb)
3672 			break;
3673 
3674 		r = lkb->lkb_resource;
3675 		hold_rsb(r);
3676 		lock_rsb(r);
3677 
3678 		mstype = lkb->lkb_wait_type;
3679 		oc = is_overlap_cancel(lkb);
3680 		ou = is_overlap_unlock(lkb);
3681 		err = 0;
3682 
3683 		log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
3684 			  lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
3685 
3686 		/* At this point we assume that we won't get a reply to any
3687 		   previous op or overlap op on this lock.  First, do a big
3688 		   remove_from_waiters() for all previous ops. */
3689 
3690 		lkb->lkb_flags &= ~DLM_IFL_RESEND;
3691 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3692 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3693 		lkb->lkb_wait_type = 0;
3694 		lkb->lkb_wait_count = 0;
3695 		mutex_lock(&ls->ls_waiters_mutex);
3696 		list_del_init(&lkb->lkb_wait_reply);
3697 		mutex_unlock(&ls->ls_waiters_mutex);
3698 		unhold_lkb(lkb); /* for waiters list */
3699 
3700 		if (oc || ou) {
3701 			/* do an unlock or cancel instead of resending */
3702 			switch (mstype) {
3703 			case DLM_MSG_LOOKUP:
3704 			case DLM_MSG_REQUEST:
3705 				queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
3706 							-DLM_ECANCEL);
3707 				unhold_lkb(lkb); /* undoes create_lkb() */
3708 				break;
3709 			case DLM_MSG_CONVERT:
3710 				if (oc) {
3711 					queue_cast(r, lkb, -DLM_ECANCEL);
3712 				} else {
3713 					lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
3714 					_unlock_lock(r, lkb);
3715 				}
3716 				break;
3717 			default:
3718 				err = 1;
3719 			}
3720 		} else {
3721 			switch (mstype) {
3722 			case DLM_MSG_LOOKUP:
3723 			case DLM_MSG_REQUEST:
3724 				_request_lock(r, lkb);
3725 				if (is_master(r))
3726 					confirm_master(r, 0);
3727 				break;
3728 			case DLM_MSG_CONVERT:
3729 				_convert_lock(r, lkb);
3730 				break;
3731 			default:
3732 				err = 1;
3733 			}
3734 		}
3735 
3736 		if (err)
3737 			log_error(ls, "recover_waiters_post %x %d %x %d %d",
3738 			  	  lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
3739 		unlock_rsb(r);
3740 		put_rsb(r);
3741 		dlm_put_lkb(lkb);
3742 	}
3743 
3744 	return error;
3745 }
3746 
3747 static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
3748 			int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
3749 {
3750 	struct dlm_ls *ls = r->res_ls;
3751 	struct dlm_lkb *lkb, *safe;
3752 
3753 	list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
3754 		if (test(ls, lkb)) {
3755 			rsb_set_flag(r, RSB_LOCKS_PURGED);
3756 			del_lkb(r, lkb);
3757 			/* this put should free the lkb */
3758 			if (!dlm_put_lkb(lkb))
3759 				log_error(ls, "purged lkb not released");
3760 		}
3761 	}
3762 }
3763 
3764 static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3765 {
3766 	return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
3767 }
3768 
3769 static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3770 {
3771 	return is_master_copy(lkb);
3772 }
3773 
3774 static void purge_dead_locks(struct dlm_rsb *r)
3775 {
3776 	purge_queue(r, &r->res_grantqueue, &purge_dead_test);
3777 	purge_queue(r, &r->res_convertqueue, &purge_dead_test);
3778 	purge_queue(r, &r->res_waitqueue, &purge_dead_test);
3779 }
3780 
3781 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
3782 {
3783 	purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
3784 	purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
3785 	purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
3786 }
3787 
3788 /* Get rid of locks held by nodes that are gone. */
3789 
3790 int dlm_purge_locks(struct dlm_ls *ls)
3791 {
3792 	struct dlm_rsb *r;
3793 
3794 	log_debug(ls, "dlm_purge_locks");
3795 
3796 	down_write(&ls->ls_root_sem);
3797 	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
3798 		hold_rsb(r);
3799 		lock_rsb(r);
3800 		if (is_master(r))
3801 			purge_dead_locks(r);
3802 		unlock_rsb(r);
3803 		unhold_rsb(r);
3804 
3805 		schedule();
3806 	}
3807 	up_write(&ls->ls_root_sem);
3808 
3809 	return 0;
3810 }
3811 
3812 static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
3813 {
3814 	struct dlm_rsb *r, *r_ret = NULL;
3815 
3816 	read_lock(&ls->ls_rsbtbl[bucket].lock);
3817 	list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
3818 		if (!rsb_flag(r, RSB_LOCKS_PURGED))
3819 			continue;
3820 		hold_rsb(r);
3821 		rsb_clear_flag(r, RSB_LOCKS_PURGED);
3822 		r_ret = r;
3823 		break;
3824 	}
3825 	read_unlock(&ls->ls_rsbtbl[bucket].lock);
3826 	return r_ret;
3827 }
3828 
3829 void dlm_grant_after_purge(struct dlm_ls *ls)
3830 {
3831 	struct dlm_rsb *r;
3832 	int bucket = 0;
3833 
3834 	while (1) {
3835 		r = find_purged_rsb(ls, bucket);
3836 		if (!r) {
3837 			if (bucket == ls->ls_rsbtbl_size - 1)
3838 				break;
3839 			bucket++;
3840 			continue;
3841 		}
3842 		lock_rsb(r);
3843 		if (is_master(r)) {
3844 			grant_pending_locks(r);
3845 			confirm_master(r, 0);
3846 		}
3847 		unlock_rsb(r);
3848 		put_rsb(r);
3849 		schedule();
3850 	}
3851 }
3852 
3853 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
3854 					 uint32_t remid)
3855 {
3856 	struct dlm_lkb *lkb;
3857 
3858 	list_for_each_entry(lkb, head, lkb_statequeue) {
3859 		if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
3860 			return lkb;
3861 	}
3862 	return NULL;
3863 }
3864 
3865 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
3866 				    uint32_t remid)
3867 {
3868 	struct dlm_lkb *lkb;
3869 
3870 	lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
3871 	if (lkb)
3872 		return lkb;
3873 	lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
3874 	if (lkb)
3875 		return lkb;
3876 	lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
3877 	if (lkb)
3878 		return lkb;
3879 	return NULL;
3880 }
3881 
3882 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3883 				  struct dlm_rsb *r, struct dlm_rcom *rc)
3884 {
3885 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3886 	int lvblen;
3887 
3888 	lkb->lkb_nodeid = rc->rc_header.h_nodeid;
3889 	lkb->lkb_ownpid = rl->rl_ownpid;
3890 	lkb->lkb_remid = rl->rl_lkid;
3891 	lkb->lkb_exflags = rl->rl_exflags;
3892 	lkb->lkb_flags = rl->rl_flags & 0x0000FFFF;
3893 	lkb->lkb_flags |= DLM_IFL_MSTCPY;
3894 	lkb->lkb_lvbseq = rl->rl_lvbseq;
3895 	lkb->lkb_rqmode = rl->rl_rqmode;
3896 	lkb->lkb_grmode = rl->rl_grmode;
3897 	/* don't set lkb_status because add_lkb wants to itself */
3898 
3899 	lkb->lkb_bastaddr = (void *) (long) (rl->rl_asts & AST_BAST);
3900 	lkb->lkb_astaddr = (void *) (long) (rl->rl_asts & AST_COMP);
3901 
3902 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3903 		lkb->lkb_lvbptr = allocate_lvb(ls);
3904 		if (!lkb->lkb_lvbptr)
3905 			return -ENOMEM;
3906 		lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
3907 			 sizeof(struct rcom_lock);
3908 		memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
3909 	}
3910 
3911 	/* Conversions between PR and CW (middle modes) need special handling.
3912 	   The real granted mode of these converting locks cannot be determined
3913 	   until all locks have been rebuilt on the rsb (recover_conversion) */
3914 
3915 	if (rl->rl_wait_type == DLM_MSG_CONVERT && middle_conversion(lkb)) {
3916 		rl->rl_status = DLM_LKSTS_CONVERT;
3917 		lkb->lkb_grmode = DLM_LOCK_IV;
3918 		rsb_set_flag(r, RSB_RECOVER_CONVERT);
3919 	}
3920 
3921 	return 0;
3922 }
3923 
3924 /* This lkb may have been recovered in a previous aborted recovery so we need
3925    to check if the rsb already has an lkb with the given remote nodeid/lkid.
3926    If so we just send back a standard reply.  If not, we create a new lkb with
3927    the given values and send back our lkid.  We send back our lkid by sending
3928    back the rcom_lock struct we got but with the remid field filled in. */
3929 
3930 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3931 {
3932 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3933 	struct dlm_rsb *r;
3934 	struct dlm_lkb *lkb;
3935 	int error;
3936 
3937 	if (rl->rl_parent_lkid) {
3938 		error = -EOPNOTSUPP;
3939 		goto out;
3940 	}
3941 
3942 	error = find_rsb(ls, rl->rl_name, rl->rl_namelen, R_MASTER, &r);
3943 	if (error)
3944 		goto out;
3945 
3946 	lock_rsb(r);
3947 
3948 	lkb = search_remid(r, rc->rc_header.h_nodeid, rl->rl_lkid);
3949 	if (lkb) {
3950 		error = -EEXIST;
3951 		goto out_remid;
3952 	}
3953 
3954 	error = create_lkb(ls, &lkb);
3955 	if (error)
3956 		goto out_unlock;
3957 
3958 	error = receive_rcom_lock_args(ls, lkb, r, rc);
3959 	if (error) {
3960 		__put_lkb(ls, lkb);
3961 		goto out_unlock;
3962 	}
3963 
3964 	attach_lkb(r, lkb);
3965 	add_lkb(r, lkb, rl->rl_status);
3966 	error = 0;
3967 
3968  out_remid:
3969 	/* this is the new value returned to the lock holder for
3970 	   saving in its process-copy lkb */
3971 	rl->rl_remid = lkb->lkb_id;
3972 
3973  out_unlock:
3974 	unlock_rsb(r);
3975 	put_rsb(r);
3976  out:
3977 	if (error)
3978 		log_print("recover_master_copy %d %x", error, rl->rl_lkid);
3979 	rl->rl_result = error;
3980 	return error;
3981 }
3982 
3983 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3984 {
3985 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3986 	struct dlm_rsb *r;
3987 	struct dlm_lkb *lkb;
3988 	int error;
3989 
3990 	error = find_lkb(ls, rl->rl_lkid, &lkb);
3991 	if (error) {
3992 		log_error(ls, "recover_process_copy no lkid %x", rl->rl_lkid);
3993 		return error;
3994 	}
3995 
3996 	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3997 
3998 	error = rl->rl_result;
3999 
4000 	r = lkb->lkb_resource;
4001 	hold_rsb(r);
4002 	lock_rsb(r);
4003 
4004 	switch (error) {
4005 	case -EBADR:
4006 		/* There's a chance the new master received our lock before
4007 		   dlm_recover_master_reply(), this wouldn't happen if we did
4008 		   a barrier between recover_masters and recover_locks. */
4009 		log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
4010 			  (unsigned long)r, r->res_name);
4011 		dlm_send_rcom_lock(r, lkb);
4012 		goto out;
4013 	case -EEXIST:
4014 		log_debug(ls, "master copy exists %x", lkb->lkb_id);
4015 		/* fall through */
4016 	case 0:
4017 		lkb->lkb_remid = rl->rl_remid;
4018 		break;
4019 	default:
4020 		log_error(ls, "dlm_recover_process_copy unknown error %d %x",
4021 			  error, lkb->lkb_id);
4022 	}
4023 
4024 	/* an ack for dlm_recover_locks() which waits for replies from
4025 	   all the locks it sends to new masters */
4026 	dlm_recovered_lock(r);
4027  out:
4028 	unlock_rsb(r);
4029 	put_rsb(r);
4030 	dlm_put_lkb(lkb);
4031 
4032 	return 0;
4033 }
4034 
4035 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
4036 		     int mode, uint32_t flags, void *name, unsigned int namelen,
4037 		     uint32_t parent_lkid)
4038 {
4039 	struct dlm_lkb *lkb;
4040 	struct dlm_args args;
4041 	int error;
4042 
4043 	lock_recovery(ls);
4044 
4045 	error = create_lkb(ls, &lkb);
4046 	if (error) {
4047 		kfree(ua);
4048 		goto out;
4049 	}
4050 
4051 	if (flags & DLM_LKF_VALBLK) {
4052 		ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
4053 		if (!ua->lksb.sb_lvbptr) {
4054 			kfree(ua);
4055 			__put_lkb(ls, lkb);
4056 			error = -ENOMEM;
4057 			goto out;
4058 		}
4059 	}
4060 
4061 	/* After ua is attached to lkb it will be freed by free_lkb().
4062 	   When DLM_IFL_USER is set, the dlm knows that this is a userspace
4063 	   lock and that lkb_astparam is the dlm_user_args structure. */
4064 
4065 	error = set_lock_args(mode, &ua->lksb, flags, namelen, parent_lkid,
4066 			      DLM_FAKE_USER_AST, ua, DLM_FAKE_USER_AST, &args);
4067 	lkb->lkb_flags |= DLM_IFL_USER;
4068 	ua->old_mode = DLM_LOCK_IV;
4069 
4070 	if (error) {
4071 		__put_lkb(ls, lkb);
4072 		goto out;
4073 	}
4074 
4075 	error = request_lock(ls, lkb, name, namelen, &args);
4076 
4077 	switch (error) {
4078 	case 0:
4079 		break;
4080 	case -EINPROGRESS:
4081 		error = 0;
4082 		break;
4083 	case -EAGAIN:
4084 		error = 0;
4085 		/* fall through */
4086 	default:
4087 		__put_lkb(ls, lkb);
4088 		goto out;
4089 	}
4090 
4091 	/* add this new lkb to the per-process list of locks */
4092 	spin_lock(&ua->proc->locks_spin);
4093 	hold_lkb(lkb);
4094 	list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4095 	spin_unlock(&ua->proc->locks_spin);
4096  out:
4097 	unlock_recovery(ls);
4098 	return error;
4099 }
4100 
4101 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4102 		     int mode, uint32_t flags, uint32_t lkid, char *lvb_in)
4103 {
4104 	struct dlm_lkb *lkb;
4105 	struct dlm_args args;
4106 	struct dlm_user_args *ua;
4107 	int error;
4108 
4109 	lock_recovery(ls);
4110 
4111 	error = find_lkb(ls, lkid, &lkb);
4112 	if (error)
4113 		goto out;
4114 
4115 	/* user can change the params on its lock when it converts it, or
4116 	   add an lvb that didn't exist before */
4117 
4118 	ua = (struct dlm_user_args *)lkb->lkb_astparam;
4119 
4120 	if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
4121 		ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
4122 		if (!ua->lksb.sb_lvbptr) {
4123 			error = -ENOMEM;
4124 			goto out_put;
4125 		}
4126 	}
4127 	if (lvb_in && ua->lksb.sb_lvbptr)
4128 		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4129 
4130 	ua->castparam = ua_tmp->castparam;
4131 	ua->castaddr = ua_tmp->castaddr;
4132 	ua->bastparam = ua_tmp->bastparam;
4133 	ua->bastaddr = ua_tmp->bastaddr;
4134 	ua->user_lksb = ua_tmp->user_lksb;
4135 	ua->old_mode = lkb->lkb_grmode;
4136 
4137 	error = set_lock_args(mode, &ua->lksb, flags, 0, 0, DLM_FAKE_USER_AST,
4138 			      ua, DLM_FAKE_USER_AST, &args);
4139 	if (error)
4140 		goto out_put;
4141 
4142 	error = convert_lock(ls, lkb, &args);
4143 
4144 	if (error == -EINPROGRESS || error == -EAGAIN)
4145 		error = 0;
4146  out_put:
4147 	dlm_put_lkb(lkb);
4148  out:
4149 	unlock_recovery(ls);
4150 	kfree(ua_tmp);
4151 	return error;
4152 }
4153 
4154 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4155 		    uint32_t flags, uint32_t lkid, char *lvb_in)
4156 {
4157 	struct dlm_lkb *lkb;
4158 	struct dlm_args args;
4159 	struct dlm_user_args *ua;
4160 	int error;
4161 
4162 	lock_recovery(ls);
4163 
4164 	error = find_lkb(ls, lkid, &lkb);
4165 	if (error)
4166 		goto out;
4167 
4168 	ua = (struct dlm_user_args *)lkb->lkb_astparam;
4169 
4170 	if (lvb_in && ua->lksb.sb_lvbptr)
4171 		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4172 	ua->castparam = ua_tmp->castparam;
4173 	ua->user_lksb = ua_tmp->user_lksb;
4174 
4175 	error = set_unlock_args(flags, ua, &args);
4176 	if (error)
4177 		goto out_put;
4178 
4179 	error = unlock_lock(ls, lkb, &args);
4180 
4181 	if (error == -DLM_EUNLOCK)
4182 		error = 0;
4183 	/* from validate_unlock_args() */
4184 	if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
4185 		error = 0;
4186 	if (error)
4187 		goto out_put;
4188 
4189 	spin_lock(&ua->proc->locks_spin);
4190 	/* dlm_user_add_ast() may have already taken lkb off the proc list */
4191 	if (!list_empty(&lkb->lkb_ownqueue))
4192 		list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
4193 	spin_unlock(&ua->proc->locks_spin);
4194  out_put:
4195 	dlm_put_lkb(lkb);
4196  out:
4197 	unlock_recovery(ls);
4198 	kfree(ua_tmp);
4199 	return error;
4200 }
4201 
4202 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4203 		    uint32_t flags, uint32_t lkid)
4204 {
4205 	struct dlm_lkb *lkb;
4206 	struct dlm_args args;
4207 	struct dlm_user_args *ua;
4208 	int error;
4209 
4210 	lock_recovery(ls);
4211 
4212 	error = find_lkb(ls, lkid, &lkb);
4213 	if (error)
4214 		goto out;
4215 
4216 	ua = (struct dlm_user_args *)lkb->lkb_astparam;
4217 	ua->castparam = ua_tmp->castparam;
4218 	ua->user_lksb = ua_tmp->user_lksb;
4219 
4220 	error = set_unlock_args(flags, ua, &args);
4221 	if (error)
4222 		goto out_put;
4223 
4224 	error = cancel_lock(ls, lkb, &args);
4225 
4226 	if (error == -DLM_ECANCEL)
4227 		error = 0;
4228 	/* from validate_unlock_args() */
4229 	if (error == -EBUSY)
4230 		error = 0;
4231  out_put:
4232 	dlm_put_lkb(lkb);
4233  out:
4234 	unlock_recovery(ls);
4235 	kfree(ua_tmp);
4236 	return error;
4237 }
4238 
4239 /* lkb's that are removed from the waiters list by revert are just left on the
4240    orphans list with the granted orphan locks, to be freed by purge */
4241 
4242 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4243 {
4244 	struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
4245 	struct dlm_args args;
4246 	int error;
4247 
4248 	hold_lkb(lkb);
4249 	mutex_lock(&ls->ls_orphans_mutex);
4250 	list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
4251 	mutex_unlock(&ls->ls_orphans_mutex);
4252 
4253 	set_unlock_args(0, ua, &args);
4254 
4255 	error = cancel_lock(ls, lkb, &args);
4256 	if (error == -DLM_ECANCEL)
4257 		error = 0;
4258 	return error;
4259 }
4260 
4261 /* The force flag allows the unlock to go ahead even if the lkb isn't granted.
4262    Regardless of what rsb queue the lock is on, it's removed and freed. */
4263 
4264 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4265 {
4266 	struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
4267 	struct dlm_args args;
4268 	int error;
4269 
4270 	set_unlock_args(DLM_LKF_FORCEUNLOCK, ua, &args);
4271 
4272 	error = unlock_lock(ls, lkb, &args);
4273 	if (error == -DLM_EUNLOCK)
4274 		error = 0;
4275 	return error;
4276 }
4277 
4278 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
4279    (which does lock_rsb) due to deadlock with receiving a message that does
4280    lock_rsb followed by dlm_user_add_ast() */
4281 
4282 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
4283 				     struct dlm_user_proc *proc)
4284 {
4285 	struct dlm_lkb *lkb = NULL;
4286 
4287 	mutex_lock(&ls->ls_clear_proc_locks);
4288 	if (list_empty(&proc->locks))
4289 		goto out;
4290 
4291 	lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
4292 	list_del_init(&lkb->lkb_ownqueue);
4293 
4294 	if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4295 		lkb->lkb_flags |= DLM_IFL_ORPHAN;
4296 	else
4297 		lkb->lkb_flags |= DLM_IFL_DEAD;
4298  out:
4299 	mutex_unlock(&ls->ls_clear_proc_locks);
4300 	return lkb;
4301 }
4302 
4303 /* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
4304    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
4305    which we clear here. */
4306 
4307 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
4308    list, and no more device_writes should add lkb's to proc->locks list; so we
4309    shouldn't need to take asts_spin or locks_spin here.  this assumes that
4310    device reads/writes/closes are serialized -- FIXME: we may need to serialize
4311    them ourself. */
4312 
4313 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4314 {
4315 	struct dlm_lkb *lkb, *safe;
4316 
4317 	lock_recovery(ls);
4318 
4319 	while (1) {
4320 		lkb = del_proc_lock(ls, proc);
4321 		if (!lkb)
4322 			break;
4323 		if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4324 			orphan_proc_lock(ls, lkb);
4325 		else
4326 			unlock_proc_lock(ls, lkb);
4327 
4328 		/* this removes the reference for the proc->locks list
4329 		   added by dlm_user_request, it may result in the lkb
4330 		   being freed */
4331 
4332 		dlm_put_lkb(lkb);
4333 	}
4334 
4335 	mutex_lock(&ls->ls_clear_proc_locks);
4336 
4337 	/* in-progress unlocks */
4338 	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4339 		list_del_init(&lkb->lkb_ownqueue);
4340 		lkb->lkb_flags |= DLM_IFL_DEAD;
4341 		dlm_put_lkb(lkb);
4342 	}
4343 
4344 	list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4345 		list_del(&lkb->lkb_astqueue);
4346 		dlm_put_lkb(lkb);
4347 	}
4348 
4349 	mutex_unlock(&ls->ls_clear_proc_locks);
4350 	unlock_recovery(ls);
4351 }
4352 
4353 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4354 {
4355 	struct dlm_lkb *lkb, *safe;
4356 
4357 	while (1) {
4358 		lkb = NULL;
4359 		spin_lock(&proc->locks_spin);
4360 		if (!list_empty(&proc->locks)) {
4361 			lkb = list_entry(proc->locks.next, struct dlm_lkb,
4362 					 lkb_ownqueue);
4363 			list_del_init(&lkb->lkb_ownqueue);
4364 		}
4365 		spin_unlock(&proc->locks_spin);
4366 
4367 		if (!lkb)
4368 			break;
4369 
4370 		lkb->lkb_flags |= DLM_IFL_DEAD;
4371 		unlock_proc_lock(ls, lkb);
4372 		dlm_put_lkb(lkb); /* ref from proc->locks list */
4373 	}
4374 
4375 	spin_lock(&proc->locks_spin);
4376 	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4377 		list_del_init(&lkb->lkb_ownqueue);
4378 		lkb->lkb_flags |= DLM_IFL_DEAD;
4379 		dlm_put_lkb(lkb);
4380 	}
4381 	spin_unlock(&proc->locks_spin);
4382 
4383 	spin_lock(&proc->asts_spin);
4384 	list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4385 		list_del(&lkb->lkb_astqueue);
4386 		dlm_put_lkb(lkb);
4387 	}
4388 	spin_unlock(&proc->asts_spin);
4389 }
4390 
4391 /* pid of 0 means purge all orphans */
4392 
4393 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
4394 {
4395 	struct dlm_lkb *lkb, *safe;
4396 
4397 	mutex_lock(&ls->ls_orphans_mutex);
4398 	list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
4399 		if (pid && lkb->lkb_ownpid != pid)
4400 			continue;
4401 		unlock_proc_lock(ls, lkb);
4402 		list_del_init(&lkb->lkb_ownqueue);
4403 		dlm_put_lkb(lkb);
4404 	}
4405 	mutex_unlock(&ls->ls_orphans_mutex);
4406 }
4407 
4408 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
4409 {
4410 	struct dlm_message *ms;
4411 	struct dlm_mhandle *mh;
4412 	int error;
4413 
4414 	error = _create_message(ls, sizeof(struct dlm_message), nodeid,
4415 				DLM_MSG_PURGE, &ms, &mh);
4416 	if (error)
4417 		return error;
4418 	ms->m_nodeid = nodeid;
4419 	ms->m_pid = pid;
4420 
4421 	return send_message(mh, ms);
4422 }
4423 
4424 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
4425 		   int nodeid, int pid)
4426 {
4427 	int error = 0;
4428 
4429 	if (nodeid != dlm_our_nodeid()) {
4430 		error = send_purge(ls, nodeid, pid);
4431 	} else {
4432 		lock_recovery(ls);
4433 		if (pid == current->pid)
4434 			purge_proc_locks(ls, proc);
4435 		else
4436 			do_purge(ls, nodeid, pid);
4437 		unlock_recovery(ls);
4438 	}
4439 	return error;
4440 }
4441 
4442