xref: /openbmc/linux/fs/dlm/lock.c (revision 6ee73861)
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005-2008 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12 
13 /* Central locking logic has four stages:
14 
15    dlm_lock()
16    dlm_unlock()
17 
18    request_lock(ls, lkb)
19    convert_lock(ls, lkb)
20    unlock_lock(ls, lkb)
21    cancel_lock(ls, lkb)
22 
23    _request_lock(r, lkb)
24    _convert_lock(r, lkb)
25    _unlock_lock(r, lkb)
26    _cancel_lock(r, lkb)
27 
28    do_request(r, lkb)
29    do_convert(r, lkb)
30    do_unlock(r, lkb)
31    do_cancel(r, lkb)
32 
33    Stage 1 (lock, unlock) is mainly about checking input args and
34    splitting into one of the four main operations:
35 
36        dlm_lock          = request_lock
37        dlm_lock+CONVERT  = convert_lock
38        dlm_unlock        = unlock_lock
39        dlm_unlock+CANCEL = cancel_lock
40 
41    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42    provided to the next stage.
43 
44    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45    When remote, it calls send_xxxx(), when local it calls do_xxxx().
46 
47    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
48    given rsb and lkb and queues callbacks.
49 
50    For remote operations, send_xxxx() results in the corresponding do_xxxx()
51    function being executed on the remote node.  The connecting send/receive
52    calls on local (L) and remote (R) nodes:
53 
54    L: send_xxxx()              ->  R: receive_xxxx()
55                                    R: do_xxxx()
56    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
57 */
58 #include <linux/types.h>
59 #include "dlm_internal.h"
60 #include <linux/dlm_device.h>
61 #include "memory.h"
62 #include "lowcomms.h"
63 #include "requestqueue.h"
64 #include "util.h"
65 #include "dir.h"
66 #include "member.h"
67 #include "lockspace.h"
68 #include "ast.h"
69 #include "lock.h"
70 #include "rcom.h"
71 #include "recover.h"
72 #include "lvb_table.h"
73 #include "user.h"
74 #include "config.h"
75 
76 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
77 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
78 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
82 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_remove(struct dlm_rsb *r);
84 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
86 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
87 				    struct dlm_message *ms);
88 static int receive_extralen(struct dlm_message *ms);
89 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
90 static void del_timeout(struct dlm_lkb *lkb);
91 
92 /*
93  * Lock compatibilty matrix - thanks Steve
94  * UN = Unlocked state. Not really a state, used as a flag
95  * PD = Padding. Used to make the matrix a nice power of two in size
96  * Other states are the same as the VMS DLM.
97  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
98  */
99 
100 static const int __dlm_compat_matrix[8][8] = {
101       /* UN NL CR CW PR PW EX PD */
102         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
103         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
104         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
105         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
106         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
107         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
108         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
109         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
110 };
111 
112 /*
113  * This defines the direction of transfer of LVB data.
114  * Granted mode is the row; requested mode is the column.
115  * Usage: matrix[grmode+1][rqmode+1]
116  * 1 = LVB is returned to the caller
117  * 0 = LVB is written to the resource
118  * -1 = nothing happens to the LVB
119  */
120 
121 const int dlm_lvb_operations[8][8] = {
122         /* UN   NL  CR  CW  PR  PW  EX  PD*/
123         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
124         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
125         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
126         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
127         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
128         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
129         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
130         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
131 };
132 
133 #define modes_compat(gr, rq) \
134 	__dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
135 
136 int dlm_modes_compat(int mode1, int mode2)
137 {
138 	return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
139 }
140 
141 /*
142  * Compatibility matrix for conversions with QUECVT set.
143  * Granted mode is the row; requested mode is the column.
144  * Usage: matrix[grmode+1][rqmode+1]
145  */
146 
147 static const int __quecvt_compat_matrix[8][8] = {
148       /* UN NL CR CW PR PW EX PD */
149         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
150         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
151         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
152         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
153         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
154         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
155         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
156         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
157 };
158 
159 void dlm_print_lkb(struct dlm_lkb *lkb)
160 {
161 	printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
162 	       "     status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
163 	       lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
164 	       lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
165 	       lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
166 }
167 
168 static void dlm_print_rsb(struct dlm_rsb *r)
169 {
170 	printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
171 	       r->res_nodeid, r->res_flags, r->res_first_lkid,
172 	       r->res_recover_locks_count, r->res_name);
173 }
174 
175 void dlm_dump_rsb(struct dlm_rsb *r)
176 {
177 	struct dlm_lkb *lkb;
178 
179 	dlm_print_rsb(r);
180 
181 	printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
182 	       list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
183 	printk(KERN_ERR "rsb lookup list\n");
184 	list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
185 		dlm_print_lkb(lkb);
186 	printk(KERN_ERR "rsb grant queue:\n");
187 	list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
188 		dlm_print_lkb(lkb);
189 	printk(KERN_ERR "rsb convert queue:\n");
190 	list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
191 		dlm_print_lkb(lkb);
192 	printk(KERN_ERR "rsb wait queue:\n");
193 	list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
194 		dlm_print_lkb(lkb);
195 }
196 
197 /* Threads cannot use the lockspace while it's being recovered */
198 
199 static inline void dlm_lock_recovery(struct dlm_ls *ls)
200 {
201 	down_read(&ls->ls_in_recovery);
202 }
203 
204 void dlm_unlock_recovery(struct dlm_ls *ls)
205 {
206 	up_read(&ls->ls_in_recovery);
207 }
208 
209 int dlm_lock_recovery_try(struct dlm_ls *ls)
210 {
211 	return down_read_trylock(&ls->ls_in_recovery);
212 }
213 
214 static inline int can_be_queued(struct dlm_lkb *lkb)
215 {
216 	return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
217 }
218 
219 static inline int force_blocking_asts(struct dlm_lkb *lkb)
220 {
221 	return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
222 }
223 
224 static inline int is_demoted(struct dlm_lkb *lkb)
225 {
226 	return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
227 }
228 
229 static inline int is_altmode(struct dlm_lkb *lkb)
230 {
231 	return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
232 }
233 
234 static inline int is_granted(struct dlm_lkb *lkb)
235 {
236 	return (lkb->lkb_status == DLM_LKSTS_GRANTED);
237 }
238 
239 static inline int is_remote(struct dlm_rsb *r)
240 {
241 	DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
242 	return !!r->res_nodeid;
243 }
244 
245 static inline int is_process_copy(struct dlm_lkb *lkb)
246 {
247 	return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
248 }
249 
250 static inline int is_master_copy(struct dlm_lkb *lkb)
251 {
252 	if (lkb->lkb_flags & DLM_IFL_MSTCPY)
253 		DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
254 	return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
255 }
256 
257 static inline int middle_conversion(struct dlm_lkb *lkb)
258 {
259 	if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
260 	    (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
261 		return 1;
262 	return 0;
263 }
264 
265 static inline int down_conversion(struct dlm_lkb *lkb)
266 {
267 	return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
268 }
269 
270 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
271 {
272 	return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
273 }
274 
275 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
276 {
277 	return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
278 }
279 
280 static inline int is_overlap(struct dlm_lkb *lkb)
281 {
282 	return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
283 				  DLM_IFL_OVERLAP_CANCEL));
284 }
285 
286 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
287 {
288 	if (is_master_copy(lkb))
289 		return;
290 
291 	del_timeout(lkb);
292 
293 	DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
294 
295 	/* if the operation was a cancel, then return -DLM_ECANCEL, if a
296 	   timeout caused the cancel then return -ETIMEDOUT */
297 	if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
298 		lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
299 		rv = -ETIMEDOUT;
300 	}
301 
302 	if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
303 		lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
304 		rv = -EDEADLK;
305 	}
306 
307 	lkb->lkb_lksb->sb_status = rv;
308 	lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
309 
310 	dlm_add_ast(lkb, AST_COMP, 0);
311 }
312 
313 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
314 {
315 	queue_cast(r, lkb,
316 		   is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
317 }
318 
319 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
320 {
321 	lkb->lkb_time_bast = ktime_get();
322 
323 	if (is_master_copy(lkb))
324 		send_bast(r, lkb, rqmode);
325 	else
326 		dlm_add_ast(lkb, AST_BAST, rqmode);
327 }
328 
329 /*
330  * Basic operations on rsb's and lkb's
331  */
332 
333 static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
334 {
335 	struct dlm_rsb *r;
336 
337 	r = dlm_allocate_rsb(ls, len);
338 	if (!r)
339 		return NULL;
340 
341 	r->res_ls = ls;
342 	r->res_length = len;
343 	memcpy(r->res_name, name, len);
344 	mutex_init(&r->res_mutex);
345 
346 	INIT_LIST_HEAD(&r->res_lookup);
347 	INIT_LIST_HEAD(&r->res_grantqueue);
348 	INIT_LIST_HEAD(&r->res_convertqueue);
349 	INIT_LIST_HEAD(&r->res_waitqueue);
350 	INIT_LIST_HEAD(&r->res_root_list);
351 	INIT_LIST_HEAD(&r->res_recover_list);
352 
353 	return r;
354 }
355 
356 static int search_rsb_list(struct list_head *head, char *name, int len,
357 			   unsigned int flags, struct dlm_rsb **r_ret)
358 {
359 	struct dlm_rsb *r;
360 	int error = 0;
361 
362 	list_for_each_entry(r, head, res_hashchain) {
363 		if (len == r->res_length && !memcmp(name, r->res_name, len))
364 			goto found;
365 	}
366 	*r_ret = NULL;
367 	return -EBADR;
368 
369  found:
370 	if (r->res_nodeid && (flags & R_MASTER))
371 		error = -ENOTBLK;
372 	*r_ret = r;
373 	return error;
374 }
375 
376 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
377 		       unsigned int flags, struct dlm_rsb **r_ret)
378 {
379 	struct dlm_rsb *r;
380 	int error;
381 
382 	error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
383 	if (!error) {
384 		kref_get(&r->res_ref);
385 		goto out;
386 	}
387 	error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
388 	if (error)
389 		goto out;
390 
391 	list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
392 
393 	if (dlm_no_directory(ls))
394 		goto out;
395 
396 	if (r->res_nodeid == -1) {
397 		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
398 		r->res_first_lkid = 0;
399 	} else if (r->res_nodeid > 0) {
400 		rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
401 		r->res_first_lkid = 0;
402 	} else {
403 		DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
404 		DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
405 	}
406  out:
407 	*r_ret = r;
408 	return error;
409 }
410 
411 static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
412 		      unsigned int flags, struct dlm_rsb **r_ret)
413 {
414 	int error;
415 	spin_lock(&ls->ls_rsbtbl[b].lock);
416 	error = _search_rsb(ls, name, len, b, flags, r_ret);
417 	spin_unlock(&ls->ls_rsbtbl[b].lock);
418 	return error;
419 }
420 
421 /*
422  * Find rsb in rsbtbl and potentially create/add one
423  *
424  * Delaying the release of rsb's has a similar benefit to applications keeping
425  * NL locks on an rsb, but without the guarantee that the cached master value
426  * will still be valid when the rsb is reused.  Apps aren't always smart enough
427  * to keep NL locks on an rsb that they may lock again shortly; this can lead
428  * to excessive master lookups and removals if we don't delay the release.
429  *
430  * Searching for an rsb means looking through both the normal list and toss
431  * list.  When found on the toss list the rsb is moved to the normal list with
432  * ref count of 1; when found on normal list the ref count is incremented.
433  */
434 
435 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
436 		    unsigned int flags, struct dlm_rsb **r_ret)
437 {
438 	struct dlm_rsb *r = NULL, *tmp;
439 	uint32_t hash, bucket;
440 	int error = -EINVAL;
441 
442 	if (namelen > DLM_RESNAME_MAXLEN)
443 		goto out;
444 
445 	if (dlm_no_directory(ls))
446 		flags |= R_CREATE;
447 
448 	error = 0;
449 	hash = jhash(name, namelen, 0);
450 	bucket = hash & (ls->ls_rsbtbl_size - 1);
451 
452 	error = search_rsb(ls, name, namelen, bucket, flags, &r);
453 	if (!error)
454 		goto out;
455 
456 	if (error == -EBADR && !(flags & R_CREATE))
457 		goto out;
458 
459 	/* the rsb was found but wasn't a master copy */
460 	if (error == -ENOTBLK)
461 		goto out;
462 
463 	error = -ENOMEM;
464 	r = create_rsb(ls, name, namelen);
465 	if (!r)
466 		goto out;
467 
468 	r->res_hash = hash;
469 	r->res_bucket = bucket;
470 	r->res_nodeid = -1;
471 	kref_init(&r->res_ref);
472 
473 	/* With no directory, the master can be set immediately */
474 	if (dlm_no_directory(ls)) {
475 		int nodeid = dlm_dir_nodeid(r);
476 		if (nodeid == dlm_our_nodeid())
477 			nodeid = 0;
478 		r->res_nodeid = nodeid;
479 	}
480 
481 	spin_lock(&ls->ls_rsbtbl[bucket].lock);
482 	error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
483 	if (!error) {
484 		spin_unlock(&ls->ls_rsbtbl[bucket].lock);
485 		dlm_free_rsb(r);
486 		r = tmp;
487 		goto out;
488 	}
489 	list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
490 	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
491 	error = 0;
492  out:
493 	*r_ret = r;
494 	return error;
495 }
496 
497 /* This is only called to add a reference when the code already holds
498    a valid reference to the rsb, so there's no need for locking. */
499 
500 static inline void hold_rsb(struct dlm_rsb *r)
501 {
502 	kref_get(&r->res_ref);
503 }
504 
505 void dlm_hold_rsb(struct dlm_rsb *r)
506 {
507 	hold_rsb(r);
508 }
509 
510 static void toss_rsb(struct kref *kref)
511 {
512 	struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
513 	struct dlm_ls *ls = r->res_ls;
514 
515 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
516 	kref_init(&r->res_ref);
517 	list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
518 	r->res_toss_time = jiffies;
519 	if (r->res_lvbptr) {
520 		dlm_free_lvb(r->res_lvbptr);
521 		r->res_lvbptr = NULL;
522 	}
523 }
524 
525 /* When all references to the rsb are gone it's transfered to
526    the tossed list for later disposal. */
527 
528 static void put_rsb(struct dlm_rsb *r)
529 {
530 	struct dlm_ls *ls = r->res_ls;
531 	uint32_t bucket = r->res_bucket;
532 
533 	spin_lock(&ls->ls_rsbtbl[bucket].lock);
534 	kref_put(&r->res_ref, toss_rsb);
535 	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
536 }
537 
538 void dlm_put_rsb(struct dlm_rsb *r)
539 {
540 	put_rsb(r);
541 }
542 
543 /* See comment for unhold_lkb */
544 
545 static void unhold_rsb(struct dlm_rsb *r)
546 {
547 	int rv;
548 	rv = kref_put(&r->res_ref, toss_rsb);
549 	DLM_ASSERT(!rv, dlm_dump_rsb(r););
550 }
551 
552 static void kill_rsb(struct kref *kref)
553 {
554 	struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
555 
556 	/* All work is done after the return from kref_put() so we
557 	   can release the write_lock before the remove and free. */
558 
559 	DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
560 	DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
561 	DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
562 	DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
563 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
564 	DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
565 }
566 
567 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
568    The rsb must exist as long as any lkb's for it do. */
569 
570 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
571 {
572 	hold_rsb(r);
573 	lkb->lkb_resource = r;
574 }
575 
576 static void detach_lkb(struct dlm_lkb *lkb)
577 {
578 	if (lkb->lkb_resource) {
579 		put_rsb(lkb->lkb_resource);
580 		lkb->lkb_resource = NULL;
581 	}
582 }
583 
584 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
585 {
586 	struct dlm_lkb *lkb, *tmp;
587 	uint32_t lkid = 0;
588 	uint16_t bucket;
589 
590 	lkb = dlm_allocate_lkb(ls);
591 	if (!lkb)
592 		return -ENOMEM;
593 
594 	lkb->lkb_nodeid = -1;
595 	lkb->lkb_grmode = DLM_LOCK_IV;
596 	kref_init(&lkb->lkb_ref);
597 	INIT_LIST_HEAD(&lkb->lkb_ownqueue);
598 	INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
599 	INIT_LIST_HEAD(&lkb->lkb_time_list);
600 
601 	get_random_bytes(&bucket, sizeof(bucket));
602 	bucket &= (ls->ls_lkbtbl_size - 1);
603 
604 	write_lock(&ls->ls_lkbtbl[bucket].lock);
605 
606 	/* counter can roll over so we must verify lkid is not in use */
607 
608 	while (lkid == 0) {
609 		lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++;
610 
611 		list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
612 				    lkb_idtbl_list) {
613 			if (tmp->lkb_id != lkid)
614 				continue;
615 			lkid = 0;
616 			break;
617 		}
618 	}
619 
620 	lkb->lkb_id = lkid;
621 	list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
622 	write_unlock(&ls->ls_lkbtbl[bucket].lock);
623 
624 	*lkb_ret = lkb;
625 	return 0;
626 }
627 
628 static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
629 {
630 	struct dlm_lkb *lkb;
631 	uint16_t bucket = (lkid >> 16);
632 
633 	list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
634 		if (lkb->lkb_id == lkid)
635 			return lkb;
636 	}
637 	return NULL;
638 }
639 
640 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
641 {
642 	struct dlm_lkb *lkb;
643 	uint16_t bucket = (lkid >> 16);
644 
645 	if (bucket >= ls->ls_lkbtbl_size)
646 		return -EBADSLT;
647 
648 	read_lock(&ls->ls_lkbtbl[bucket].lock);
649 	lkb = __find_lkb(ls, lkid);
650 	if (lkb)
651 		kref_get(&lkb->lkb_ref);
652 	read_unlock(&ls->ls_lkbtbl[bucket].lock);
653 
654 	*lkb_ret = lkb;
655 	return lkb ? 0 : -ENOENT;
656 }
657 
658 static void kill_lkb(struct kref *kref)
659 {
660 	struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
661 
662 	/* All work is done after the return from kref_put() so we
663 	   can release the write_lock before the detach_lkb */
664 
665 	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
666 }
667 
668 /* __put_lkb() is used when an lkb may not have an rsb attached to
669    it so we need to provide the lockspace explicitly */
670 
671 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
672 {
673 	uint16_t bucket = (lkb->lkb_id >> 16);
674 
675 	write_lock(&ls->ls_lkbtbl[bucket].lock);
676 	if (kref_put(&lkb->lkb_ref, kill_lkb)) {
677 		list_del(&lkb->lkb_idtbl_list);
678 		write_unlock(&ls->ls_lkbtbl[bucket].lock);
679 
680 		detach_lkb(lkb);
681 
682 		/* for local/process lkbs, lvbptr points to caller's lksb */
683 		if (lkb->lkb_lvbptr && is_master_copy(lkb))
684 			dlm_free_lvb(lkb->lkb_lvbptr);
685 		dlm_free_lkb(lkb);
686 		return 1;
687 	} else {
688 		write_unlock(&ls->ls_lkbtbl[bucket].lock);
689 		return 0;
690 	}
691 }
692 
693 int dlm_put_lkb(struct dlm_lkb *lkb)
694 {
695 	struct dlm_ls *ls;
696 
697 	DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
698 	DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
699 
700 	ls = lkb->lkb_resource->res_ls;
701 	return __put_lkb(ls, lkb);
702 }
703 
704 /* This is only called to add a reference when the code already holds
705    a valid reference to the lkb, so there's no need for locking. */
706 
707 static inline void hold_lkb(struct dlm_lkb *lkb)
708 {
709 	kref_get(&lkb->lkb_ref);
710 }
711 
712 /* This is called when we need to remove a reference and are certain
713    it's not the last ref.  e.g. del_lkb is always called between a
714    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
715    put_lkb would work fine, but would involve unnecessary locking */
716 
717 static inline void unhold_lkb(struct dlm_lkb *lkb)
718 {
719 	int rv;
720 	rv = kref_put(&lkb->lkb_ref, kill_lkb);
721 	DLM_ASSERT(!rv, dlm_print_lkb(lkb););
722 }
723 
724 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
725 			    int mode)
726 {
727 	struct dlm_lkb *lkb = NULL;
728 
729 	list_for_each_entry(lkb, head, lkb_statequeue)
730 		if (lkb->lkb_rqmode < mode)
731 			break;
732 
733 	if (!lkb)
734 		list_add_tail(new, head);
735 	else
736 		__list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
737 }
738 
739 /* add/remove lkb to rsb's grant/convert/wait queue */
740 
741 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
742 {
743 	kref_get(&lkb->lkb_ref);
744 
745 	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
746 
747 	lkb->lkb_timestamp = ktime_get();
748 
749 	lkb->lkb_status = status;
750 
751 	switch (status) {
752 	case DLM_LKSTS_WAITING:
753 		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
754 			list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
755 		else
756 			list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
757 		break;
758 	case DLM_LKSTS_GRANTED:
759 		/* convention says granted locks kept in order of grmode */
760 		lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
761 				lkb->lkb_grmode);
762 		break;
763 	case DLM_LKSTS_CONVERT:
764 		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
765 			list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
766 		else
767 			list_add_tail(&lkb->lkb_statequeue,
768 				      &r->res_convertqueue);
769 		break;
770 	default:
771 		DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
772 	}
773 }
774 
775 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
776 {
777 	lkb->lkb_status = 0;
778 	list_del(&lkb->lkb_statequeue);
779 	unhold_lkb(lkb);
780 }
781 
782 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
783 {
784 	hold_lkb(lkb);
785 	del_lkb(r, lkb);
786 	add_lkb(r, lkb, sts);
787 	unhold_lkb(lkb);
788 }
789 
790 static int msg_reply_type(int mstype)
791 {
792 	switch (mstype) {
793 	case DLM_MSG_REQUEST:
794 		return DLM_MSG_REQUEST_REPLY;
795 	case DLM_MSG_CONVERT:
796 		return DLM_MSG_CONVERT_REPLY;
797 	case DLM_MSG_UNLOCK:
798 		return DLM_MSG_UNLOCK_REPLY;
799 	case DLM_MSG_CANCEL:
800 		return DLM_MSG_CANCEL_REPLY;
801 	case DLM_MSG_LOOKUP:
802 		return DLM_MSG_LOOKUP_REPLY;
803 	}
804 	return -1;
805 }
806 
807 /* add/remove lkb from global waiters list of lkb's waiting for
808    a reply from a remote node */
809 
810 static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
811 {
812 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
813 	int error = 0;
814 
815 	mutex_lock(&ls->ls_waiters_mutex);
816 
817 	if (is_overlap_unlock(lkb) ||
818 	    (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
819 		error = -EINVAL;
820 		goto out;
821 	}
822 
823 	if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
824 		switch (mstype) {
825 		case DLM_MSG_UNLOCK:
826 			lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
827 			break;
828 		case DLM_MSG_CANCEL:
829 			lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
830 			break;
831 		default:
832 			error = -EBUSY;
833 			goto out;
834 		}
835 		lkb->lkb_wait_count++;
836 		hold_lkb(lkb);
837 
838 		log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
839 			  lkb->lkb_id, lkb->lkb_wait_type, mstype,
840 			  lkb->lkb_wait_count, lkb->lkb_flags);
841 		goto out;
842 	}
843 
844 	DLM_ASSERT(!lkb->lkb_wait_count,
845 		   dlm_print_lkb(lkb);
846 		   printk("wait_count %d\n", lkb->lkb_wait_count););
847 
848 	lkb->lkb_wait_count++;
849 	lkb->lkb_wait_type = mstype;
850 	hold_lkb(lkb);
851 	list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
852  out:
853 	if (error)
854 		log_error(ls, "addwait error %x %d flags %x %d %d %s",
855 			  lkb->lkb_id, error, lkb->lkb_flags, mstype,
856 			  lkb->lkb_wait_type, lkb->lkb_resource->res_name);
857 	mutex_unlock(&ls->ls_waiters_mutex);
858 	return error;
859 }
860 
861 /* We clear the RESEND flag because we might be taking an lkb off the waiters
862    list as part of process_requestqueue (e.g. a lookup that has an optimized
863    request reply on the requestqueue) between dlm_recover_waiters_pre() which
864    set RESEND and dlm_recover_waiters_post() */
865 
866 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
867 				struct dlm_message *ms)
868 {
869 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
870 	int overlap_done = 0;
871 
872 	if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
873 		log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
874 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
875 		overlap_done = 1;
876 		goto out_del;
877 	}
878 
879 	if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
880 		log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
881 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
882 		overlap_done = 1;
883 		goto out_del;
884 	}
885 
886 	/* Cancel state was preemptively cleared by a successful convert,
887 	   see next comment, nothing to do. */
888 
889 	if ((mstype == DLM_MSG_CANCEL_REPLY) &&
890 	    (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
891 		log_debug(ls, "remwait %x cancel_reply wait_type %d",
892 			  lkb->lkb_id, lkb->lkb_wait_type);
893 		return -1;
894 	}
895 
896 	/* Remove for the convert reply, and premptively remove for the
897 	   cancel reply.  A convert has been granted while there's still
898 	   an outstanding cancel on it (the cancel is moot and the result
899 	   in the cancel reply should be 0).  We preempt the cancel reply
900 	   because the app gets the convert result and then can follow up
901 	   with another op, like convert.  This subsequent op would see the
902 	   lingering state of the cancel and fail with -EBUSY. */
903 
904 	if ((mstype == DLM_MSG_CONVERT_REPLY) &&
905 	    (lkb->lkb_wait_type == DLM_MSG_CONVERT) &&
906 	    is_overlap_cancel(lkb) && ms && !ms->m_result) {
907 		log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
908 			  lkb->lkb_id);
909 		lkb->lkb_wait_type = 0;
910 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
911 		lkb->lkb_wait_count--;
912 		goto out_del;
913 	}
914 
915 	/* N.B. type of reply may not always correspond to type of original
916 	   msg due to lookup->request optimization, verify others? */
917 
918 	if (lkb->lkb_wait_type) {
919 		lkb->lkb_wait_type = 0;
920 		goto out_del;
921 	}
922 
923 	log_error(ls, "remwait error %x reply %d flags %x no wait_type",
924 		  lkb->lkb_id, mstype, lkb->lkb_flags);
925 	return -1;
926 
927  out_del:
928 	/* the force-unlock/cancel has completed and we haven't recvd a reply
929 	   to the op that was in progress prior to the unlock/cancel; we
930 	   give up on any reply to the earlier op.  FIXME: not sure when/how
931 	   this would happen */
932 
933 	if (overlap_done && lkb->lkb_wait_type) {
934 		log_error(ls, "remwait error %x reply %d wait_type %d overlap",
935 			  lkb->lkb_id, mstype, lkb->lkb_wait_type);
936 		lkb->lkb_wait_count--;
937 		lkb->lkb_wait_type = 0;
938 	}
939 
940 	DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
941 
942 	lkb->lkb_flags &= ~DLM_IFL_RESEND;
943 	lkb->lkb_wait_count--;
944 	if (!lkb->lkb_wait_count)
945 		list_del_init(&lkb->lkb_wait_reply);
946 	unhold_lkb(lkb);
947 	return 0;
948 }
949 
950 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
951 {
952 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
953 	int error;
954 
955 	mutex_lock(&ls->ls_waiters_mutex);
956 	error = _remove_from_waiters(lkb, mstype, NULL);
957 	mutex_unlock(&ls->ls_waiters_mutex);
958 	return error;
959 }
960 
961 /* Handles situations where we might be processing a "fake" or "stub" reply in
962    which we can't try to take waiters_mutex again. */
963 
964 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
965 {
966 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
967 	int error;
968 
969 	if (ms != &ls->ls_stub_ms)
970 		mutex_lock(&ls->ls_waiters_mutex);
971 	error = _remove_from_waiters(lkb, ms->m_type, ms);
972 	if (ms != &ls->ls_stub_ms)
973 		mutex_unlock(&ls->ls_waiters_mutex);
974 	return error;
975 }
976 
977 static void dir_remove(struct dlm_rsb *r)
978 {
979 	int to_nodeid;
980 
981 	if (dlm_no_directory(r->res_ls))
982 		return;
983 
984 	to_nodeid = dlm_dir_nodeid(r);
985 	if (to_nodeid != dlm_our_nodeid())
986 		send_remove(r);
987 	else
988 		dlm_dir_remove_entry(r->res_ls, to_nodeid,
989 				     r->res_name, r->res_length);
990 }
991 
992 /* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
993    found since they are in order of newest to oldest? */
994 
995 static int shrink_bucket(struct dlm_ls *ls, int b)
996 {
997 	struct dlm_rsb *r;
998 	int count = 0, found;
999 
1000 	for (;;) {
1001 		found = 0;
1002 		spin_lock(&ls->ls_rsbtbl[b].lock);
1003 		list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
1004 					    res_hashchain) {
1005 			if (!time_after_eq(jiffies, r->res_toss_time +
1006 					   dlm_config.ci_toss_secs * HZ))
1007 				continue;
1008 			found = 1;
1009 			break;
1010 		}
1011 
1012 		if (!found) {
1013 			spin_unlock(&ls->ls_rsbtbl[b].lock);
1014 			break;
1015 		}
1016 
1017 		if (kref_put(&r->res_ref, kill_rsb)) {
1018 			list_del(&r->res_hashchain);
1019 			spin_unlock(&ls->ls_rsbtbl[b].lock);
1020 
1021 			if (is_master(r))
1022 				dir_remove(r);
1023 			dlm_free_rsb(r);
1024 			count++;
1025 		} else {
1026 			spin_unlock(&ls->ls_rsbtbl[b].lock);
1027 			log_error(ls, "tossed rsb in use %s", r->res_name);
1028 		}
1029 	}
1030 
1031 	return count;
1032 }
1033 
1034 void dlm_scan_rsbs(struct dlm_ls *ls)
1035 {
1036 	int i;
1037 
1038 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1039 		shrink_bucket(ls, i);
1040 		if (dlm_locking_stopped(ls))
1041 			break;
1042 		cond_resched();
1043 	}
1044 }
1045 
1046 static void add_timeout(struct dlm_lkb *lkb)
1047 {
1048 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1049 
1050 	if (is_master_copy(lkb))
1051 		return;
1052 
1053 	if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1054 	    !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1055 		lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1056 		goto add_it;
1057 	}
1058 	if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1059 		goto add_it;
1060 	return;
1061 
1062  add_it:
1063 	DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1064 	mutex_lock(&ls->ls_timeout_mutex);
1065 	hold_lkb(lkb);
1066 	list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1067 	mutex_unlock(&ls->ls_timeout_mutex);
1068 }
1069 
1070 static void del_timeout(struct dlm_lkb *lkb)
1071 {
1072 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1073 
1074 	mutex_lock(&ls->ls_timeout_mutex);
1075 	if (!list_empty(&lkb->lkb_time_list)) {
1076 		list_del_init(&lkb->lkb_time_list);
1077 		unhold_lkb(lkb);
1078 	}
1079 	mutex_unlock(&ls->ls_timeout_mutex);
1080 }
1081 
1082 /* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1083    lkb_lksb_timeout without lock_rsb?  Note: we can't lock timeout_mutex
1084    and then lock rsb because of lock ordering in add_timeout.  We may need
1085    to specify some special timeout-related bits in the lkb that are just to
1086    be accessed under the timeout_mutex. */
1087 
1088 void dlm_scan_timeout(struct dlm_ls *ls)
1089 {
1090 	struct dlm_rsb *r;
1091 	struct dlm_lkb *lkb;
1092 	int do_cancel, do_warn;
1093 	s64 wait_us;
1094 
1095 	for (;;) {
1096 		if (dlm_locking_stopped(ls))
1097 			break;
1098 
1099 		do_cancel = 0;
1100 		do_warn = 0;
1101 		mutex_lock(&ls->ls_timeout_mutex);
1102 		list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1103 
1104 			wait_us = ktime_to_us(ktime_sub(ktime_get(),
1105 					      		lkb->lkb_timestamp));
1106 
1107 			if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
1108 			    wait_us >= (lkb->lkb_timeout_cs * 10000))
1109 				do_cancel = 1;
1110 
1111 			if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1112 			    wait_us >= dlm_config.ci_timewarn_cs * 10000)
1113 				do_warn = 1;
1114 
1115 			if (!do_cancel && !do_warn)
1116 				continue;
1117 			hold_lkb(lkb);
1118 			break;
1119 		}
1120 		mutex_unlock(&ls->ls_timeout_mutex);
1121 
1122 		if (!do_cancel && !do_warn)
1123 			break;
1124 
1125 		r = lkb->lkb_resource;
1126 		hold_rsb(r);
1127 		lock_rsb(r);
1128 
1129 		if (do_warn) {
1130 			/* clear flag so we only warn once */
1131 			lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1132 			if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1133 				del_timeout(lkb);
1134 			dlm_timeout_warn(lkb);
1135 		}
1136 
1137 		if (do_cancel) {
1138 			log_debug(ls, "timeout cancel %x node %d %s",
1139 				  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1140 			lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1141 			lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1142 			del_timeout(lkb);
1143 			_cancel_lock(r, lkb);
1144 		}
1145 
1146 		unlock_rsb(r);
1147 		unhold_rsb(r);
1148 		dlm_put_lkb(lkb);
1149 	}
1150 }
1151 
1152 /* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1153    dlm_recoverd before checking/setting ls_recover_begin. */
1154 
1155 void dlm_adjust_timeouts(struct dlm_ls *ls)
1156 {
1157 	struct dlm_lkb *lkb;
1158 	u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
1159 
1160 	ls->ls_recover_begin = 0;
1161 	mutex_lock(&ls->ls_timeout_mutex);
1162 	list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1163 		lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
1164 	mutex_unlock(&ls->ls_timeout_mutex);
1165 }
1166 
1167 /* lkb is master or local copy */
1168 
1169 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1170 {
1171 	int b, len = r->res_ls->ls_lvblen;
1172 
1173 	/* b=1 lvb returned to caller
1174 	   b=0 lvb written to rsb or invalidated
1175 	   b=-1 do nothing */
1176 
1177 	b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1178 
1179 	if (b == 1) {
1180 		if (!lkb->lkb_lvbptr)
1181 			return;
1182 
1183 		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1184 			return;
1185 
1186 		if (!r->res_lvbptr)
1187 			return;
1188 
1189 		memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1190 		lkb->lkb_lvbseq = r->res_lvbseq;
1191 
1192 	} else if (b == 0) {
1193 		if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1194 			rsb_set_flag(r, RSB_VALNOTVALID);
1195 			return;
1196 		}
1197 
1198 		if (!lkb->lkb_lvbptr)
1199 			return;
1200 
1201 		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1202 			return;
1203 
1204 		if (!r->res_lvbptr)
1205 			r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1206 
1207 		if (!r->res_lvbptr)
1208 			return;
1209 
1210 		memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1211 		r->res_lvbseq++;
1212 		lkb->lkb_lvbseq = r->res_lvbseq;
1213 		rsb_clear_flag(r, RSB_VALNOTVALID);
1214 	}
1215 
1216 	if (rsb_flag(r, RSB_VALNOTVALID))
1217 		lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1218 }
1219 
1220 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1221 {
1222 	if (lkb->lkb_grmode < DLM_LOCK_PW)
1223 		return;
1224 
1225 	if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1226 		rsb_set_flag(r, RSB_VALNOTVALID);
1227 		return;
1228 	}
1229 
1230 	if (!lkb->lkb_lvbptr)
1231 		return;
1232 
1233 	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1234 		return;
1235 
1236 	if (!r->res_lvbptr)
1237 		r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1238 
1239 	if (!r->res_lvbptr)
1240 		return;
1241 
1242 	memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1243 	r->res_lvbseq++;
1244 	rsb_clear_flag(r, RSB_VALNOTVALID);
1245 }
1246 
1247 /* lkb is process copy (pc) */
1248 
1249 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1250 			    struct dlm_message *ms)
1251 {
1252 	int b;
1253 
1254 	if (!lkb->lkb_lvbptr)
1255 		return;
1256 
1257 	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1258 		return;
1259 
1260 	b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1261 	if (b == 1) {
1262 		int len = receive_extralen(ms);
1263 		if (len > DLM_RESNAME_MAXLEN)
1264 			len = DLM_RESNAME_MAXLEN;
1265 		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1266 		lkb->lkb_lvbseq = ms->m_lvbseq;
1267 	}
1268 }
1269 
1270 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1271    remove_lock -- used for unlock, removes lkb from granted
1272    revert_lock -- used for cancel, moves lkb from convert to granted
1273    grant_lock  -- used for request and convert, adds lkb to granted or
1274                   moves lkb from convert or waiting to granted
1275 
1276    Each of these is used for master or local copy lkb's.  There is
1277    also a _pc() variation used to make the corresponding change on
1278    a process copy (pc) lkb. */
1279 
1280 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1281 {
1282 	del_lkb(r, lkb);
1283 	lkb->lkb_grmode = DLM_LOCK_IV;
1284 	/* this unhold undoes the original ref from create_lkb()
1285 	   so this leads to the lkb being freed */
1286 	unhold_lkb(lkb);
1287 }
1288 
1289 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1290 {
1291 	set_lvb_unlock(r, lkb);
1292 	_remove_lock(r, lkb);
1293 }
1294 
1295 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1296 {
1297 	_remove_lock(r, lkb);
1298 }
1299 
1300 /* returns: 0 did nothing
1301 	    1 moved lock to granted
1302 	   -1 removed lock */
1303 
1304 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1305 {
1306 	int rv = 0;
1307 
1308 	lkb->lkb_rqmode = DLM_LOCK_IV;
1309 
1310 	switch (lkb->lkb_status) {
1311 	case DLM_LKSTS_GRANTED:
1312 		break;
1313 	case DLM_LKSTS_CONVERT:
1314 		move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1315 		rv = 1;
1316 		break;
1317 	case DLM_LKSTS_WAITING:
1318 		del_lkb(r, lkb);
1319 		lkb->lkb_grmode = DLM_LOCK_IV;
1320 		/* this unhold undoes the original ref from create_lkb()
1321 		   so this leads to the lkb being freed */
1322 		unhold_lkb(lkb);
1323 		rv = -1;
1324 		break;
1325 	default:
1326 		log_print("invalid status for revert %d", lkb->lkb_status);
1327 	}
1328 	return rv;
1329 }
1330 
1331 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1332 {
1333 	return revert_lock(r, lkb);
1334 }
1335 
1336 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1337 {
1338 	if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1339 		lkb->lkb_grmode = lkb->lkb_rqmode;
1340 		if (lkb->lkb_status)
1341 			move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1342 		else
1343 			add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1344 	}
1345 
1346 	lkb->lkb_rqmode = DLM_LOCK_IV;
1347 }
1348 
1349 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1350 {
1351 	set_lvb_lock(r, lkb);
1352 	_grant_lock(r, lkb);
1353 	lkb->lkb_highbast = 0;
1354 }
1355 
1356 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1357 			  struct dlm_message *ms)
1358 {
1359 	set_lvb_lock_pc(r, lkb, ms);
1360 	_grant_lock(r, lkb);
1361 }
1362 
1363 /* called by grant_pending_locks() which means an async grant message must
1364    be sent to the requesting node in addition to granting the lock if the
1365    lkb belongs to a remote node. */
1366 
1367 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1368 {
1369 	grant_lock(r, lkb);
1370 	if (is_master_copy(lkb))
1371 		send_grant(r, lkb);
1372 	else
1373 		queue_cast(r, lkb, 0);
1374 }
1375 
1376 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1377    change the granted/requested modes.  We're munging things accordingly in
1378    the process copy.
1379    CONVDEADLK: our grmode may have been forced down to NL to resolve a
1380    conversion deadlock
1381    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1382    compatible with other granted locks */
1383 
1384 static void munge_demoted(struct dlm_lkb *lkb, struct dlm_message *ms)
1385 {
1386 	if (ms->m_type != DLM_MSG_CONVERT_REPLY) {
1387 		log_print("munge_demoted %x invalid reply type %d",
1388 			  lkb->lkb_id, ms->m_type);
1389 		return;
1390 	}
1391 
1392 	if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1393 		log_print("munge_demoted %x invalid modes gr %d rq %d",
1394 			  lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1395 		return;
1396 	}
1397 
1398 	lkb->lkb_grmode = DLM_LOCK_NL;
1399 }
1400 
1401 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1402 {
1403 	if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1404 	    ms->m_type != DLM_MSG_GRANT) {
1405 		log_print("munge_altmode %x invalid reply type %d",
1406 			  lkb->lkb_id, ms->m_type);
1407 		return;
1408 	}
1409 
1410 	if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1411 		lkb->lkb_rqmode = DLM_LOCK_PR;
1412 	else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1413 		lkb->lkb_rqmode = DLM_LOCK_CW;
1414 	else {
1415 		log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1416 		dlm_print_lkb(lkb);
1417 	}
1418 }
1419 
1420 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1421 {
1422 	struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1423 					   lkb_statequeue);
1424 	if (lkb->lkb_id == first->lkb_id)
1425 		return 1;
1426 
1427 	return 0;
1428 }
1429 
1430 /* Check if the given lkb conflicts with another lkb on the queue. */
1431 
1432 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1433 {
1434 	struct dlm_lkb *this;
1435 
1436 	list_for_each_entry(this, head, lkb_statequeue) {
1437 		if (this == lkb)
1438 			continue;
1439 		if (!modes_compat(this, lkb))
1440 			return 1;
1441 	}
1442 	return 0;
1443 }
1444 
1445 /*
1446  * "A conversion deadlock arises with a pair of lock requests in the converting
1447  * queue for one resource.  The granted mode of each lock blocks the requested
1448  * mode of the other lock."
1449  *
1450  * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
1451  * convert queue from being granted, then deadlk/demote lkb.
1452  *
1453  * Example:
1454  * Granted Queue: empty
1455  * Convert Queue: NL->EX (first lock)
1456  *                PR->EX (second lock)
1457  *
1458  * The first lock can't be granted because of the granted mode of the second
1459  * lock and the second lock can't be granted because it's not first in the
1460  * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
1461  * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
1462  * flag set and return DEMOTED in the lksb flags.
1463  *
1464  * Originally, this function detected conv-deadlk in a more limited scope:
1465  * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
1466  * - if lkb1 was the first entry in the queue (not just earlier), and was
1467  *   blocked by the granted mode of lkb2, and there was nothing on the
1468  *   granted queue preventing lkb1 from being granted immediately, i.e.
1469  *   lkb2 was the only thing preventing lkb1 from being granted.
1470  *
1471  * That second condition meant we'd only say there was conv-deadlk if
1472  * resolving it (by demotion) would lead to the first lock on the convert
1473  * queue being granted right away.  It allowed conversion deadlocks to exist
1474  * between locks on the convert queue while they couldn't be granted anyway.
1475  *
1476  * Now, we detect and take action on conversion deadlocks immediately when
1477  * they're created, even if they may not be immediately consequential.  If
1478  * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
1479  * mode that would prevent lkb1's conversion from being granted, we do a
1480  * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
1481  * I think this means that the lkb_is_ahead condition below should always
1482  * be zero, i.e. there will never be conv-deadlk between two locks that are
1483  * both already on the convert queue.
1484  */
1485 
1486 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
1487 {
1488 	struct dlm_lkb *lkb1;
1489 	int lkb_is_ahead = 0;
1490 
1491 	list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
1492 		if (lkb1 == lkb2) {
1493 			lkb_is_ahead = 1;
1494 			continue;
1495 		}
1496 
1497 		if (!lkb_is_ahead) {
1498 			if (!modes_compat(lkb2, lkb1))
1499 				return 1;
1500 		} else {
1501 			if (!modes_compat(lkb2, lkb1) &&
1502 			    !modes_compat(lkb1, lkb2))
1503 				return 1;
1504 		}
1505 	}
1506 	return 0;
1507 }
1508 
1509 /*
1510  * Return 1 if the lock can be granted, 0 otherwise.
1511  * Also detect and resolve conversion deadlocks.
1512  *
1513  * lkb is the lock to be granted
1514  *
1515  * now is 1 if the function is being called in the context of the
1516  * immediate request, it is 0 if called later, after the lock has been
1517  * queued.
1518  *
1519  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1520  */
1521 
1522 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1523 {
1524 	int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1525 
1526 	/*
1527 	 * 6-10: Version 5.4 introduced an option to address the phenomenon of
1528 	 * a new request for a NL mode lock being blocked.
1529 	 *
1530 	 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1531 	 * request, then it would be granted.  In essence, the use of this flag
1532 	 * tells the Lock Manager to expedite theis request by not considering
1533 	 * what may be in the CONVERTING or WAITING queues...  As of this
1534 	 * writing, the EXPEDITE flag can be used only with new requests for NL
1535 	 * mode locks.  This flag is not valid for conversion requests.
1536 	 *
1537 	 * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
1538 	 * conversion or used with a non-NL requested mode.  We also know an
1539 	 * EXPEDITE request is always granted immediately, so now must always
1540 	 * be 1.  The full condition to grant an expedite request: (now &&
1541 	 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1542 	 * therefore be shortened to just checking the flag.
1543 	 */
1544 
1545 	if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1546 		return 1;
1547 
1548 	/*
1549 	 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1550 	 * added to the remaining conditions.
1551 	 */
1552 
1553 	if (queue_conflict(&r->res_grantqueue, lkb))
1554 		goto out;
1555 
1556 	/*
1557 	 * 6-3: By default, a conversion request is immediately granted if the
1558 	 * requested mode is compatible with the modes of all other granted
1559 	 * locks
1560 	 */
1561 
1562 	if (queue_conflict(&r->res_convertqueue, lkb))
1563 		goto out;
1564 
1565 	/*
1566 	 * 6-5: But the default algorithm for deciding whether to grant or
1567 	 * queue conversion requests does not by itself guarantee that such
1568 	 * requests are serviced on a "first come first serve" basis.  This, in
1569 	 * turn, can lead to a phenomenon known as "indefinate postponement".
1570 	 *
1571 	 * 6-7: This issue is dealt with by using the optional QUECVT flag with
1572 	 * the system service employed to request a lock conversion.  This flag
1573 	 * forces certain conversion requests to be queued, even if they are
1574 	 * compatible with the granted modes of other locks on the same
1575 	 * resource.  Thus, the use of this flag results in conversion requests
1576 	 * being ordered on a "first come first servce" basis.
1577 	 *
1578 	 * DCT: This condition is all about new conversions being able to occur
1579 	 * "in place" while the lock remains on the granted queue (assuming
1580 	 * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
1581 	 * doesn't _have_ to go onto the convert queue where it's processed in
1582 	 * order.  The "now" variable is necessary to distinguish converts
1583 	 * being received and processed for the first time now, because once a
1584 	 * convert is moved to the conversion queue the condition below applies
1585 	 * requiring fifo granting.
1586 	 */
1587 
1588 	if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1589 		return 1;
1590 
1591 	/*
1592 	 * The NOORDER flag is set to avoid the standard vms rules on grant
1593 	 * order.
1594 	 */
1595 
1596 	if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1597 		return 1;
1598 
1599 	/*
1600 	 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1601 	 * granted until all other conversion requests ahead of it are granted
1602 	 * and/or canceled.
1603 	 */
1604 
1605 	if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1606 		return 1;
1607 
1608 	/*
1609 	 * 6-4: By default, a new request is immediately granted only if all
1610 	 * three of the following conditions are satisfied when the request is
1611 	 * issued:
1612 	 * - The queue of ungranted conversion requests for the resource is
1613 	 *   empty.
1614 	 * - The queue of ungranted new requests for the resource is empty.
1615 	 * - The mode of the new request is compatible with the most
1616 	 *   restrictive mode of all granted locks on the resource.
1617 	 */
1618 
1619 	if (now && !conv && list_empty(&r->res_convertqueue) &&
1620 	    list_empty(&r->res_waitqueue))
1621 		return 1;
1622 
1623 	/*
1624 	 * 6-4: Once a lock request is in the queue of ungranted new requests,
1625 	 * it cannot be granted until the queue of ungranted conversion
1626 	 * requests is empty, all ungranted new requests ahead of it are
1627 	 * granted and/or canceled, and it is compatible with the granted mode
1628 	 * of the most restrictive lock granted on the resource.
1629 	 */
1630 
1631 	if (!now && !conv && list_empty(&r->res_convertqueue) &&
1632 	    first_in_list(lkb, &r->res_waitqueue))
1633 		return 1;
1634  out:
1635 	return 0;
1636 }
1637 
1638 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
1639 			  int *err)
1640 {
1641 	int rv;
1642 	int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1643 	int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
1644 
1645 	if (err)
1646 		*err = 0;
1647 
1648 	rv = _can_be_granted(r, lkb, now);
1649 	if (rv)
1650 		goto out;
1651 
1652 	/*
1653 	 * The CONVDEADLK flag is non-standard and tells the dlm to resolve
1654 	 * conversion deadlocks by demoting grmode to NL, otherwise the dlm
1655 	 * cancels one of the locks.
1656 	 */
1657 
1658 	if (is_convert && can_be_queued(lkb) &&
1659 	    conversion_deadlock_detect(r, lkb)) {
1660 		if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
1661 			lkb->lkb_grmode = DLM_LOCK_NL;
1662 			lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1663 		} else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1664 			if (err)
1665 				*err = -EDEADLK;
1666 			else {
1667 				log_print("can_be_granted deadlock %x now %d",
1668 					  lkb->lkb_id, now);
1669 				dlm_dump_rsb(r);
1670 			}
1671 		}
1672 		goto out;
1673 	}
1674 
1675 	/*
1676 	 * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
1677 	 * to grant a request in a mode other than the normal rqmode.  It's a
1678 	 * simple way to provide a big optimization to applications that can
1679 	 * use them.
1680 	 */
1681 
1682 	if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
1683 		alt = DLM_LOCK_PR;
1684 	else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
1685 		alt = DLM_LOCK_CW;
1686 
1687 	if (alt) {
1688 		lkb->lkb_rqmode = alt;
1689 		rv = _can_be_granted(r, lkb, now);
1690 		if (rv)
1691 			lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1692 		else
1693 			lkb->lkb_rqmode = rqmode;
1694 	}
1695  out:
1696 	return rv;
1697 }
1698 
1699 /* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
1700    for locks pending on the convert list.  Once verified (watch for these
1701    log_prints), we should be able to just call _can_be_granted() and not
1702    bother with the demote/deadlk cases here (and there's no easy way to deal
1703    with a deadlk here, we'd have to generate something like grant_lock with
1704    the deadlk error.) */
1705 
1706 /* Returns the highest requested mode of all blocked conversions; sets
1707    cw if there's a blocked conversion to DLM_LOCK_CW. */
1708 
1709 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
1710 {
1711 	struct dlm_lkb *lkb, *s;
1712 	int hi, demoted, quit, grant_restart, demote_restart;
1713 	int deadlk;
1714 
1715 	quit = 0;
1716  restart:
1717 	grant_restart = 0;
1718 	demote_restart = 0;
1719 	hi = DLM_LOCK_IV;
1720 
1721 	list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1722 		demoted = is_demoted(lkb);
1723 		deadlk = 0;
1724 
1725 		if (can_be_granted(r, lkb, 0, &deadlk)) {
1726 			grant_lock_pending(r, lkb);
1727 			grant_restart = 1;
1728 			continue;
1729 		}
1730 
1731 		if (!demoted && is_demoted(lkb)) {
1732 			log_print("WARN: pending demoted %x node %d %s",
1733 				  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1734 			demote_restart = 1;
1735 			continue;
1736 		}
1737 
1738 		if (deadlk) {
1739 			log_print("WARN: pending deadlock %x node %d %s",
1740 				  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1741 			dlm_dump_rsb(r);
1742 			continue;
1743 		}
1744 
1745 		hi = max_t(int, lkb->lkb_rqmode, hi);
1746 
1747 		if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
1748 			*cw = 1;
1749 	}
1750 
1751 	if (grant_restart)
1752 		goto restart;
1753 	if (demote_restart && !quit) {
1754 		quit = 1;
1755 		goto restart;
1756 	}
1757 
1758 	return max_t(int, high, hi);
1759 }
1760 
1761 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw)
1762 {
1763 	struct dlm_lkb *lkb, *s;
1764 
1765 	list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1766 		if (can_be_granted(r, lkb, 0, NULL))
1767 			grant_lock_pending(r, lkb);
1768                 else {
1769 			high = max_t(int, lkb->lkb_rqmode, high);
1770 			if (lkb->lkb_rqmode == DLM_LOCK_CW)
1771 				*cw = 1;
1772 		}
1773 	}
1774 
1775 	return high;
1776 }
1777 
1778 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
1779    on either the convert or waiting queue.
1780    high is the largest rqmode of all locks blocked on the convert or
1781    waiting queue. */
1782 
1783 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
1784 {
1785 	if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
1786 		if (gr->lkb_highbast < DLM_LOCK_EX)
1787 			return 1;
1788 		return 0;
1789 	}
1790 
1791 	if (gr->lkb_highbast < high &&
1792 	    !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
1793 		return 1;
1794 	return 0;
1795 }
1796 
1797 static void grant_pending_locks(struct dlm_rsb *r)
1798 {
1799 	struct dlm_lkb *lkb, *s;
1800 	int high = DLM_LOCK_IV;
1801 	int cw = 0;
1802 
1803 	DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
1804 
1805 	high = grant_pending_convert(r, high, &cw);
1806 	high = grant_pending_wait(r, high, &cw);
1807 
1808 	if (high == DLM_LOCK_IV)
1809 		return;
1810 
1811 	/*
1812 	 * If there are locks left on the wait/convert queue then send blocking
1813 	 * ASTs to granted locks based on the largest requested mode (high)
1814 	 * found above.
1815 	 */
1816 
1817 	list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1818 		if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
1819 			if (cw && high == DLM_LOCK_PR &&
1820 			    lkb->lkb_grmode == DLM_LOCK_PR)
1821 				queue_bast(r, lkb, DLM_LOCK_CW);
1822 			else
1823 				queue_bast(r, lkb, high);
1824 			lkb->lkb_highbast = high;
1825 		}
1826 	}
1827 }
1828 
1829 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
1830 {
1831 	if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
1832 	    (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
1833 		if (gr->lkb_highbast < DLM_LOCK_EX)
1834 			return 1;
1835 		return 0;
1836 	}
1837 
1838 	if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
1839 		return 1;
1840 	return 0;
1841 }
1842 
1843 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1844 			    struct dlm_lkb *lkb)
1845 {
1846 	struct dlm_lkb *gr;
1847 
1848 	list_for_each_entry(gr, head, lkb_statequeue) {
1849 		if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
1850 			queue_bast(r, gr, lkb->lkb_rqmode);
1851 			gr->lkb_highbast = lkb->lkb_rqmode;
1852 		}
1853 	}
1854 }
1855 
1856 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1857 {
1858 	send_bast_queue(r, &r->res_grantqueue, lkb);
1859 }
1860 
1861 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1862 {
1863 	send_bast_queue(r, &r->res_grantqueue, lkb);
1864 	send_bast_queue(r, &r->res_convertqueue, lkb);
1865 }
1866 
1867 /* set_master(r, lkb) -- set the master nodeid of a resource
1868 
1869    The purpose of this function is to set the nodeid field in the given
1870    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
1871    known, it can just be copied to the lkb and the function will return
1872    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
1873    before it can be copied to the lkb.
1874 
1875    When the rsb nodeid is being looked up remotely, the initial lkb
1876    causing the lookup is kept on the ls_waiters list waiting for the
1877    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
1878    on the rsb's res_lookup list until the master is verified.
1879 
1880    Return values:
1881    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1882    1: the rsb master is not available and the lkb has been placed on
1883       a wait queue
1884 */
1885 
1886 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1887 {
1888 	struct dlm_ls *ls = r->res_ls;
1889 	int i, error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1890 
1891 	if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1892 		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1893 		r->res_first_lkid = lkb->lkb_id;
1894 		lkb->lkb_nodeid = r->res_nodeid;
1895 		return 0;
1896 	}
1897 
1898 	if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1899 		list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1900 		return 1;
1901 	}
1902 
1903 	if (r->res_nodeid == 0) {
1904 		lkb->lkb_nodeid = 0;
1905 		return 0;
1906 	}
1907 
1908 	if (r->res_nodeid > 0) {
1909 		lkb->lkb_nodeid = r->res_nodeid;
1910 		return 0;
1911 	}
1912 
1913 	DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
1914 
1915 	dir_nodeid = dlm_dir_nodeid(r);
1916 
1917 	if (dir_nodeid != our_nodeid) {
1918 		r->res_first_lkid = lkb->lkb_id;
1919 		send_lookup(r, lkb);
1920 		return 1;
1921 	}
1922 
1923 	for (i = 0; i < 2; i++) {
1924 		/* It's possible for dlm_scand to remove an old rsb for
1925 		   this same resource from the toss list, us to create
1926 		   a new one, look up the master locally, and find it
1927 		   already exists just before dlm_scand does the
1928 		   dir_remove() on the previous rsb. */
1929 
1930 		error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1931 				       r->res_length, &ret_nodeid);
1932 		if (!error)
1933 			break;
1934 		log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1935 		schedule();
1936 	}
1937 	if (error && error != -EEXIST)
1938 		return error;
1939 
1940 	if (ret_nodeid == our_nodeid) {
1941 		r->res_first_lkid = 0;
1942 		r->res_nodeid = 0;
1943 		lkb->lkb_nodeid = 0;
1944 	} else {
1945 		r->res_first_lkid = lkb->lkb_id;
1946 		r->res_nodeid = ret_nodeid;
1947 		lkb->lkb_nodeid = ret_nodeid;
1948 	}
1949 	return 0;
1950 }
1951 
1952 static void process_lookup_list(struct dlm_rsb *r)
1953 {
1954 	struct dlm_lkb *lkb, *safe;
1955 
1956 	list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1957 		list_del_init(&lkb->lkb_rsb_lookup);
1958 		_request_lock(r, lkb);
1959 		schedule();
1960 	}
1961 }
1962 
1963 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
1964 
1965 static void confirm_master(struct dlm_rsb *r, int error)
1966 {
1967 	struct dlm_lkb *lkb;
1968 
1969 	if (!r->res_first_lkid)
1970 		return;
1971 
1972 	switch (error) {
1973 	case 0:
1974 	case -EINPROGRESS:
1975 		r->res_first_lkid = 0;
1976 		process_lookup_list(r);
1977 		break;
1978 
1979 	case -EAGAIN:
1980 	case -EBADR:
1981 	case -ENOTBLK:
1982 		/* the remote request failed and won't be retried (it was
1983 		   a NOQUEUE, or has been canceled/unlocked); make a waiting
1984 		   lkb the first_lkid */
1985 
1986 		r->res_first_lkid = 0;
1987 
1988 		if (!list_empty(&r->res_lookup)) {
1989 			lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1990 					 lkb_rsb_lookup);
1991 			list_del_init(&lkb->lkb_rsb_lookup);
1992 			r->res_first_lkid = lkb->lkb_id;
1993 			_request_lock(r, lkb);
1994 		}
1995 		break;
1996 
1997 	default:
1998 		log_error(r->res_ls, "confirm_master unknown error %d", error);
1999 	}
2000 }
2001 
2002 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2003 			 int namelen, unsigned long timeout_cs,
2004 			 void (*ast) (void *astparam),
2005 			 void *astparam,
2006 			 void (*bast) (void *astparam, int mode),
2007 			 struct dlm_args *args)
2008 {
2009 	int rv = -EINVAL;
2010 
2011 	/* check for invalid arg usage */
2012 
2013 	if (mode < 0 || mode > DLM_LOCK_EX)
2014 		goto out;
2015 
2016 	if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2017 		goto out;
2018 
2019 	if (flags & DLM_LKF_CANCEL)
2020 		goto out;
2021 
2022 	if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2023 		goto out;
2024 
2025 	if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2026 		goto out;
2027 
2028 	if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2029 		goto out;
2030 
2031 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2032 		goto out;
2033 
2034 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2035 		goto out;
2036 
2037 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2038 		goto out;
2039 
2040 	if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2041 		goto out;
2042 
2043 	if (!ast || !lksb)
2044 		goto out;
2045 
2046 	if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2047 		goto out;
2048 
2049 	if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2050 		goto out;
2051 
2052 	/* these args will be copied to the lkb in validate_lock_args,
2053 	   it cannot be done now because when converting locks, fields in
2054 	   an active lkb cannot be modified before locking the rsb */
2055 
2056 	args->flags = flags;
2057 	args->astfn = ast;
2058 	args->astparam = astparam;
2059 	args->bastfn = bast;
2060 	args->timeout = timeout_cs;
2061 	args->mode = mode;
2062 	args->lksb = lksb;
2063 	rv = 0;
2064  out:
2065 	return rv;
2066 }
2067 
2068 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2069 {
2070 	if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2071  		      DLM_LKF_FORCEUNLOCK))
2072 		return -EINVAL;
2073 
2074 	if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2075 		return -EINVAL;
2076 
2077 	args->flags = flags;
2078 	args->astparam = astarg;
2079 	return 0;
2080 }
2081 
2082 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2083 			      struct dlm_args *args)
2084 {
2085 	int rv = -EINVAL;
2086 
2087 	if (args->flags & DLM_LKF_CONVERT) {
2088 		if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2089 			goto out;
2090 
2091 		if (args->flags & DLM_LKF_QUECVT &&
2092 		    !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2093 			goto out;
2094 
2095 		rv = -EBUSY;
2096 		if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2097 			goto out;
2098 
2099 		if (lkb->lkb_wait_type)
2100 			goto out;
2101 
2102 		if (is_overlap(lkb))
2103 			goto out;
2104 	}
2105 
2106 	lkb->lkb_exflags = args->flags;
2107 	lkb->lkb_sbflags = 0;
2108 	lkb->lkb_astfn = args->astfn;
2109 	lkb->lkb_astparam = args->astparam;
2110 	lkb->lkb_bastfn = args->bastfn;
2111 	lkb->lkb_rqmode = args->mode;
2112 	lkb->lkb_lksb = args->lksb;
2113 	lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2114 	lkb->lkb_ownpid = (int) current->pid;
2115 	lkb->lkb_timeout_cs = args->timeout;
2116 	rv = 0;
2117  out:
2118 	if (rv)
2119 		log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s",
2120 			  rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
2121 			  lkb->lkb_status, lkb->lkb_wait_type,
2122 			  lkb->lkb_resource->res_name);
2123 	return rv;
2124 }
2125 
2126 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2127    for success */
2128 
2129 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2130    because there may be a lookup in progress and it's valid to do
2131    cancel/unlockf on it */
2132 
2133 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2134 {
2135 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2136 	int rv = -EINVAL;
2137 
2138 	if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2139 		log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2140 		dlm_print_lkb(lkb);
2141 		goto out;
2142 	}
2143 
2144 	/* an lkb may still exist even though the lock is EOL'ed due to a
2145 	   cancel, unlock or failed noqueue request; an app can't use these
2146 	   locks; return same error as if the lkid had not been found at all */
2147 
2148 	if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2149 		log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2150 		rv = -ENOENT;
2151 		goto out;
2152 	}
2153 
2154 	/* an lkb may be waiting for an rsb lookup to complete where the
2155 	   lookup was initiated by another lock */
2156 
2157 	if (!list_empty(&lkb->lkb_rsb_lookup)) {
2158 		if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2159 			log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2160 			list_del_init(&lkb->lkb_rsb_lookup);
2161 			queue_cast(lkb->lkb_resource, lkb,
2162 				   args->flags & DLM_LKF_CANCEL ?
2163 				   -DLM_ECANCEL : -DLM_EUNLOCK);
2164 			unhold_lkb(lkb); /* undoes create_lkb() */
2165 		}
2166 		/* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2167 		rv = -EBUSY;
2168 		goto out;
2169 	}
2170 
2171 	/* cancel not allowed with another cancel/unlock in progress */
2172 
2173 	if (args->flags & DLM_LKF_CANCEL) {
2174 		if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2175 			goto out;
2176 
2177 		if (is_overlap(lkb))
2178 			goto out;
2179 
2180 		/* don't let scand try to do a cancel */
2181 		del_timeout(lkb);
2182 
2183 		if (lkb->lkb_flags & DLM_IFL_RESEND) {
2184 			lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2185 			rv = -EBUSY;
2186 			goto out;
2187 		}
2188 
2189 		/* there's nothing to cancel */
2190 		if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2191 		    !lkb->lkb_wait_type) {
2192 			rv = -EBUSY;
2193 			goto out;
2194 		}
2195 
2196 		switch (lkb->lkb_wait_type) {
2197 		case DLM_MSG_LOOKUP:
2198 		case DLM_MSG_REQUEST:
2199 			lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2200 			rv = -EBUSY;
2201 			goto out;
2202 		case DLM_MSG_UNLOCK:
2203 		case DLM_MSG_CANCEL:
2204 			goto out;
2205 		}
2206 		/* add_to_waiters() will set OVERLAP_CANCEL */
2207 		goto out_ok;
2208 	}
2209 
2210 	/* do we need to allow a force-unlock if there's a normal unlock
2211 	   already in progress?  in what conditions could the normal unlock
2212 	   fail such that we'd want to send a force-unlock to be sure? */
2213 
2214 	if (args->flags & DLM_LKF_FORCEUNLOCK) {
2215 		if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2216 			goto out;
2217 
2218 		if (is_overlap_unlock(lkb))
2219 			goto out;
2220 
2221 		/* don't let scand try to do a cancel */
2222 		del_timeout(lkb);
2223 
2224 		if (lkb->lkb_flags & DLM_IFL_RESEND) {
2225 			lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2226 			rv = -EBUSY;
2227 			goto out;
2228 		}
2229 
2230 		switch (lkb->lkb_wait_type) {
2231 		case DLM_MSG_LOOKUP:
2232 		case DLM_MSG_REQUEST:
2233 			lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2234 			rv = -EBUSY;
2235 			goto out;
2236 		case DLM_MSG_UNLOCK:
2237 			goto out;
2238 		}
2239 		/* add_to_waiters() will set OVERLAP_UNLOCK */
2240 		goto out_ok;
2241 	}
2242 
2243 	/* normal unlock not allowed if there's any op in progress */
2244 	rv = -EBUSY;
2245 	if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2246 		goto out;
2247 
2248  out_ok:
2249 	/* an overlapping op shouldn't blow away exflags from other op */
2250 	lkb->lkb_exflags |= args->flags;
2251 	lkb->lkb_sbflags = 0;
2252 	lkb->lkb_astparam = args->astparam;
2253 	rv = 0;
2254  out:
2255 	if (rv)
2256 		log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
2257 			  lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
2258 			  args->flags, lkb->lkb_wait_type,
2259 			  lkb->lkb_resource->res_name);
2260 	return rv;
2261 }
2262 
2263 /*
2264  * Four stage 4 varieties:
2265  * do_request(), do_convert(), do_unlock(), do_cancel()
2266  * These are called on the master node for the given lock and
2267  * from the central locking logic.
2268  */
2269 
2270 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2271 {
2272 	int error = 0;
2273 
2274 	if (can_be_granted(r, lkb, 1, NULL)) {
2275 		grant_lock(r, lkb);
2276 		queue_cast(r, lkb, 0);
2277 		goto out;
2278 	}
2279 
2280 	if (can_be_queued(lkb)) {
2281 		error = -EINPROGRESS;
2282 		add_lkb(r, lkb, DLM_LKSTS_WAITING);
2283 		send_blocking_asts(r, lkb);
2284 		add_timeout(lkb);
2285 		goto out;
2286 	}
2287 
2288 	error = -EAGAIN;
2289 	if (force_blocking_asts(lkb))
2290 		send_blocking_asts_all(r, lkb);
2291 	queue_cast(r, lkb, -EAGAIN);
2292 
2293  out:
2294 	return error;
2295 }
2296 
2297 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2298 {
2299 	int error = 0;
2300 	int deadlk = 0;
2301 
2302 	/* changing an existing lock may allow others to be granted */
2303 
2304 	if (can_be_granted(r, lkb, 1, &deadlk)) {
2305 		grant_lock(r, lkb);
2306 		queue_cast(r, lkb, 0);
2307 		grant_pending_locks(r);
2308 		goto out;
2309 	}
2310 
2311 	/* can_be_granted() detected that this lock would block in a conversion
2312 	   deadlock, so we leave it on the granted queue and return EDEADLK in
2313 	   the ast for the convert. */
2314 
2315 	if (deadlk) {
2316 		/* it's left on the granted queue */
2317 		log_debug(r->res_ls, "deadlock %x node %d sts%d g%d r%d %s",
2318 			  lkb->lkb_id, lkb->lkb_nodeid, lkb->lkb_status,
2319 			  lkb->lkb_grmode, lkb->lkb_rqmode, r->res_name);
2320 		revert_lock(r, lkb);
2321 		queue_cast(r, lkb, -EDEADLK);
2322 		error = -EDEADLK;
2323 		goto out;
2324 	}
2325 
2326 	/* is_demoted() means the can_be_granted() above set the grmode
2327 	   to NL, and left us on the granted queue.  This auto-demotion
2328 	   (due to CONVDEADLK) might mean other locks, and/or this lock, are
2329 	   now grantable.  We have to try to grant other converting locks
2330 	   before we try again to grant this one. */
2331 
2332 	if (is_demoted(lkb)) {
2333 		grant_pending_convert(r, DLM_LOCK_IV, NULL);
2334 		if (_can_be_granted(r, lkb, 1)) {
2335 			grant_lock(r, lkb);
2336 			queue_cast(r, lkb, 0);
2337 			grant_pending_locks(r);
2338 			goto out;
2339 		}
2340 		/* else fall through and move to convert queue */
2341 	}
2342 
2343 	if (can_be_queued(lkb)) {
2344 		error = -EINPROGRESS;
2345 		del_lkb(r, lkb);
2346 		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2347 		send_blocking_asts(r, lkb);
2348 		add_timeout(lkb);
2349 		goto out;
2350 	}
2351 
2352 	error = -EAGAIN;
2353 	if (force_blocking_asts(lkb))
2354 		send_blocking_asts_all(r, lkb);
2355 	queue_cast(r, lkb, -EAGAIN);
2356 
2357  out:
2358 	return error;
2359 }
2360 
2361 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2362 {
2363 	remove_lock(r, lkb);
2364 	queue_cast(r, lkb, -DLM_EUNLOCK);
2365 	grant_pending_locks(r);
2366 	return -DLM_EUNLOCK;
2367 }
2368 
2369 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
2370 
2371 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2372 {
2373 	int error;
2374 
2375 	error = revert_lock(r, lkb);
2376 	if (error) {
2377 		queue_cast(r, lkb, -DLM_ECANCEL);
2378 		grant_pending_locks(r);
2379 		return -DLM_ECANCEL;
2380 	}
2381 	return 0;
2382 }
2383 
2384 /*
2385  * Four stage 3 varieties:
2386  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2387  */
2388 
2389 /* add a new lkb to a possibly new rsb, called by requesting process */
2390 
2391 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2392 {
2393 	int error;
2394 
2395 	/* set_master: sets lkb nodeid from r */
2396 
2397 	error = set_master(r, lkb);
2398 	if (error < 0)
2399 		goto out;
2400 	if (error) {
2401 		error = 0;
2402 		goto out;
2403 	}
2404 
2405 	if (is_remote(r))
2406 		/* receive_request() calls do_request() on remote node */
2407 		error = send_request(r, lkb);
2408 	else
2409 		error = do_request(r, lkb);
2410  out:
2411 	return error;
2412 }
2413 
2414 /* change some property of an existing lkb, e.g. mode */
2415 
2416 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2417 {
2418 	int error;
2419 
2420 	if (is_remote(r))
2421 		/* receive_convert() calls do_convert() on remote node */
2422 		error = send_convert(r, lkb);
2423 	else
2424 		error = do_convert(r, lkb);
2425 
2426 	return error;
2427 }
2428 
2429 /* remove an existing lkb from the granted queue */
2430 
2431 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2432 {
2433 	int error;
2434 
2435 	if (is_remote(r))
2436 		/* receive_unlock() calls do_unlock() on remote node */
2437 		error = send_unlock(r, lkb);
2438 	else
2439 		error = do_unlock(r, lkb);
2440 
2441 	return error;
2442 }
2443 
2444 /* remove an existing lkb from the convert or wait queue */
2445 
2446 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2447 {
2448 	int error;
2449 
2450 	if (is_remote(r))
2451 		/* receive_cancel() calls do_cancel() on remote node */
2452 		error = send_cancel(r, lkb);
2453 	else
2454 		error = do_cancel(r, lkb);
2455 
2456 	return error;
2457 }
2458 
2459 /*
2460  * Four stage 2 varieties:
2461  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2462  */
2463 
2464 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2465 			int len, struct dlm_args *args)
2466 {
2467 	struct dlm_rsb *r;
2468 	int error;
2469 
2470 	error = validate_lock_args(ls, lkb, args);
2471 	if (error)
2472 		goto out;
2473 
2474 	error = find_rsb(ls, name, len, R_CREATE, &r);
2475 	if (error)
2476 		goto out;
2477 
2478 	lock_rsb(r);
2479 
2480 	attach_lkb(r, lkb);
2481 	lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2482 
2483 	error = _request_lock(r, lkb);
2484 
2485 	unlock_rsb(r);
2486 	put_rsb(r);
2487 
2488  out:
2489 	return error;
2490 }
2491 
2492 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2493 			struct dlm_args *args)
2494 {
2495 	struct dlm_rsb *r;
2496 	int error;
2497 
2498 	r = lkb->lkb_resource;
2499 
2500 	hold_rsb(r);
2501 	lock_rsb(r);
2502 
2503 	error = validate_lock_args(ls, lkb, args);
2504 	if (error)
2505 		goto out;
2506 
2507 	error = _convert_lock(r, lkb);
2508  out:
2509 	unlock_rsb(r);
2510 	put_rsb(r);
2511 	return error;
2512 }
2513 
2514 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2515 		       struct dlm_args *args)
2516 {
2517 	struct dlm_rsb *r;
2518 	int error;
2519 
2520 	r = lkb->lkb_resource;
2521 
2522 	hold_rsb(r);
2523 	lock_rsb(r);
2524 
2525 	error = validate_unlock_args(lkb, args);
2526 	if (error)
2527 		goto out;
2528 
2529 	error = _unlock_lock(r, lkb);
2530  out:
2531 	unlock_rsb(r);
2532 	put_rsb(r);
2533 	return error;
2534 }
2535 
2536 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2537 		       struct dlm_args *args)
2538 {
2539 	struct dlm_rsb *r;
2540 	int error;
2541 
2542 	r = lkb->lkb_resource;
2543 
2544 	hold_rsb(r);
2545 	lock_rsb(r);
2546 
2547 	error = validate_unlock_args(lkb, args);
2548 	if (error)
2549 		goto out;
2550 
2551 	error = _cancel_lock(r, lkb);
2552  out:
2553 	unlock_rsb(r);
2554 	put_rsb(r);
2555 	return error;
2556 }
2557 
2558 /*
2559  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
2560  */
2561 
2562 int dlm_lock(dlm_lockspace_t *lockspace,
2563 	     int mode,
2564 	     struct dlm_lksb *lksb,
2565 	     uint32_t flags,
2566 	     void *name,
2567 	     unsigned int namelen,
2568 	     uint32_t parent_lkid,
2569 	     void (*ast) (void *astarg),
2570 	     void *astarg,
2571 	     void (*bast) (void *astarg, int mode))
2572 {
2573 	struct dlm_ls *ls;
2574 	struct dlm_lkb *lkb;
2575 	struct dlm_args args;
2576 	int error, convert = flags & DLM_LKF_CONVERT;
2577 
2578 	ls = dlm_find_lockspace_local(lockspace);
2579 	if (!ls)
2580 		return -EINVAL;
2581 
2582 	dlm_lock_recovery(ls);
2583 
2584 	if (convert)
2585 		error = find_lkb(ls, lksb->sb_lkid, &lkb);
2586 	else
2587 		error = create_lkb(ls, &lkb);
2588 
2589 	if (error)
2590 		goto out;
2591 
2592 	error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
2593 			      astarg, bast, &args);
2594 	if (error)
2595 		goto out_put;
2596 
2597 	if (convert)
2598 		error = convert_lock(ls, lkb, &args);
2599 	else
2600 		error = request_lock(ls, lkb, name, namelen, &args);
2601 
2602 	if (error == -EINPROGRESS)
2603 		error = 0;
2604  out_put:
2605 	if (convert || error)
2606 		__put_lkb(ls, lkb);
2607 	if (error == -EAGAIN || error == -EDEADLK)
2608 		error = 0;
2609  out:
2610 	dlm_unlock_recovery(ls);
2611 	dlm_put_lockspace(ls);
2612 	return error;
2613 }
2614 
2615 int dlm_unlock(dlm_lockspace_t *lockspace,
2616 	       uint32_t lkid,
2617 	       uint32_t flags,
2618 	       struct dlm_lksb *lksb,
2619 	       void *astarg)
2620 {
2621 	struct dlm_ls *ls;
2622 	struct dlm_lkb *lkb;
2623 	struct dlm_args args;
2624 	int error;
2625 
2626 	ls = dlm_find_lockspace_local(lockspace);
2627 	if (!ls)
2628 		return -EINVAL;
2629 
2630 	dlm_lock_recovery(ls);
2631 
2632 	error = find_lkb(ls, lkid, &lkb);
2633 	if (error)
2634 		goto out;
2635 
2636 	error = set_unlock_args(flags, astarg, &args);
2637 	if (error)
2638 		goto out_put;
2639 
2640 	if (flags & DLM_LKF_CANCEL)
2641 		error = cancel_lock(ls, lkb, &args);
2642 	else
2643 		error = unlock_lock(ls, lkb, &args);
2644 
2645 	if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2646 		error = 0;
2647 	if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2648 		error = 0;
2649  out_put:
2650 	dlm_put_lkb(lkb);
2651  out:
2652 	dlm_unlock_recovery(ls);
2653 	dlm_put_lockspace(ls);
2654 	return error;
2655 }
2656 
2657 /*
2658  * send/receive routines for remote operations and replies
2659  *
2660  * send_args
2661  * send_common
2662  * send_request			receive_request
2663  * send_convert			receive_convert
2664  * send_unlock			receive_unlock
2665  * send_cancel			receive_cancel
2666  * send_grant			receive_grant
2667  * send_bast			receive_bast
2668  * send_lookup			receive_lookup
2669  * send_remove			receive_remove
2670  *
2671  * 				send_common_reply
2672  * receive_request_reply	send_request_reply
2673  * receive_convert_reply	send_convert_reply
2674  * receive_unlock_reply		send_unlock_reply
2675  * receive_cancel_reply		send_cancel_reply
2676  * receive_lookup_reply		send_lookup_reply
2677  */
2678 
2679 static int _create_message(struct dlm_ls *ls, int mb_len,
2680 			   int to_nodeid, int mstype,
2681 			   struct dlm_message **ms_ret,
2682 			   struct dlm_mhandle **mh_ret)
2683 {
2684 	struct dlm_message *ms;
2685 	struct dlm_mhandle *mh;
2686 	char *mb;
2687 
2688 	/* get_buffer gives us a message handle (mh) that we need to
2689 	   pass into lowcomms_commit and a message buffer (mb) that we
2690 	   write our data into */
2691 
2692 	mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, ls->ls_allocation, &mb);
2693 	if (!mh)
2694 		return -ENOBUFS;
2695 
2696 	memset(mb, 0, mb_len);
2697 
2698 	ms = (struct dlm_message *) mb;
2699 
2700 	ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2701 	ms->m_header.h_lockspace = ls->ls_global_id;
2702 	ms->m_header.h_nodeid = dlm_our_nodeid();
2703 	ms->m_header.h_length = mb_len;
2704 	ms->m_header.h_cmd = DLM_MSG;
2705 
2706 	ms->m_type = mstype;
2707 
2708 	*mh_ret = mh;
2709 	*ms_ret = ms;
2710 	return 0;
2711 }
2712 
2713 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2714 			  int to_nodeid, int mstype,
2715 			  struct dlm_message **ms_ret,
2716 			  struct dlm_mhandle **mh_ret)
2717 {
2718 	int mb_len = sizeof(struct dlm_message);
2719 
2720 	switch (mstype) {
2721 	case DLM_MSG_REQUEST:
2722 	case DLM_MSG_LOOKUP:
2723 	case DLM_MSG_REMOVE:
2724 		mb_len += r->res_length;
2725 		break;
2726 	case DLM_MSG_CONVERT:
2727 	case DLM_MSG_UNLOCK:
2728 	case DLM_MSG_REQUEST_REPLY:
2729 	case DLM_MSG_CONVERT_REPLY:
2730 	case DLM_MSG_GRANT:
2731 		if (lkb && lkb->lkb_lvbptr)
2732 			mb_len += r->res_ls->ls_lvblen;
2733 		break;
2734 	}
2735 
2736 	return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2737 			       ms_ret, mh_ret);
2738 }
2739 
2740 /* further lowcomms enhancements or alternate implementations may make
2741    the return value from this function useful at some point */
2742 
2743 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2744 {
2745 	dlm_message_out(ms);
2746 	dlm_lowcomms_commit_buffer(mh);
2747 	return 0;
2748 }
2749 
2750 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2751 		      struct dlm_message *ms)
2752 {
2753 	ms->m_nodeid   = lkb->lkb_nodeid;
2754 	ms->m_pid      = lkb->lkb_ownpid;
2755 	ms->m_lkid     = lkb->lkb_id;
2756 	ms->m_remid    = lkb->lkb_remid;
2757 	ms->m_exflags  = lkb->lkb_exflags;
2758 	ms->m_sbflags  = lkb->lkb_sbflags;
2759 	ms->m_flags    = lkb->lkb_flags;
2760 	ms->m_lvbseq   = lkb->lkb_lvbseq;
2761 	ms->m_status   = lkb->lkb_status;
2762 	ms->m_grmode   = lkb->lkb_grmode;
2763 	ms->m_rqmode   = lkb->lkb_rqmode;
2764 	ms->m_hash     = r->res_hash;
2765 
2766 	/* m_result and m_bastmode are set from function args,
2767 	   not from lkb fields */
2768 
2769 	if (lkb->lkb_bastfn)
2770 		ms->m_asts |= AST_BAST;
2771 	if (lkb->lkb_astfn)
2772 		ms->m_asts |= AST_COMP;
2773 
2774 	/* compare with switch in create_message; send_remove() doesn't
2775 	   use send_args() */
2776 
2777 	switch (ms->m_type) {
2778 	case DLM_MSG_REQUEST:
2779 	case DLM_MSG_LOOKUP:
2780 		memcpy(ms->m_extra, r->res_name, r->res_length);
2781 		break;
2782 	case DLM_MSG_CONVERT:
2783 	case DLM_MSG_UNLOCK:
2784 	case DLM_MSG_REQUEST_REPLY:
2785 	case DLM_MSG_CONVERT_REPLY:
2786 	case DLM_MSG_GRANT:
2787 		if (!lkb->lkb_lvbptr)
2788 			break;
2789 		memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2790 		break;
2791 	}
2792 }
2793 
2794 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2795 {
2796 	struct dlm_message *ms;
2797 	struct dlm_mhandle *mh;
2798 	int to_nodeid, error;
2799 
2800 	error = add_to_waiters(lkb, mstype);
2801 	if (error)
2802 		return error;
2803 
2804 	to_nodeid = r->res_nodeid;
2805 
2806 	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2807 	if (error)
2808 		goto fail;
2809 
2810 	send_args(r, lkb, ms);
2811 
2812 	error = send_message(mh, ms);
2813 	if (error)
2814 		goto fail;
2815 	return 0;
2816 
2817  fail:
2818 	remove_from_waiters(lkb, msg_reply_type(mstype));
2819 	return error;
2820 }
2821 
2822 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2823 {
2824 	return send_common(r, lkb, DLM_MSG_REQUEST);
2825 }
2826 
2827 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2828 {
2829 	int error;
2830 
2831 	error = send_common(r, lkb, DLM_MSG_CONVERT);
2832 
2833 	/* down conversions go without a reply from the master */
2834 	if (!error && down_conversion(lkb)) {
2835 		remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2836 		r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
2837 		r->res_ls->ls_stub_ms.m_result = 0;
2838 		r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
2839 		__receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2840 	}
2841 
2842 	return error;
2843 }
2844 
2845 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
2846    MASTER_UNCERTAIN to force the next request on the rsb to confirm
2847    that the master is still correct. */
2848 
2849 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2850 {
2851 	return send_common(r, lkb, DLM_MSG_UNLOCK);
2852 }
2853 
2854 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2855 {
2856 	return send_common(r, lkb, DLM_MSG_CANCEL);
2857 }
2858 
2859 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2860 {
2861 	struct dlm_message *ms;
2862 	struct dlm_mhandle *mh;
2863 	int to_nodeid, error;
2864 
2865 	to_nodeid = lkb->lkb_nodeid;
2866 
2867 	error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2868 	if (error)
2869 		goto out;
2870 
2871 	send_args(r, lkb, ms);
2872 
2873 	ms->m_result = 0;
2874 
2875 	error = send_message(mh, ms);
2876  out:
2877 	return error;
2878 }
2879 
2880 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2881 {
2882 	struct dlm_message *ms;
2883 	struct dlm_mhandle *mh;
2884 	int to_nodeid, error;
2885 
2886 	to_nodeid = lkb->lkb_nodeid;
2887 
2888 	error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2889 	if (error)
2890 		goto out;
2891 
2892 	send_args(r, lkb, ms);
2893 
2894 	ms->m_bastmode = mode;
2895 
2896 	error = send_message(mh, ms);
2897  out:
2898 	return error;
2899 }
2900 
2901 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2902 {
2903 	struct dlm_message *ms;
2904 	struct dlm_mhandle *mh;
2905 	int to_nodeid, error;
2906 
2907 	error = add_to_waiters(lkb, DLM_MSG_LOOKUP);
2908 	if (error)
2909 		return error;
2910 
2911 	to_nodeid = dlm_dir_nodeid(r);
2912 
2913 	error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2914 	if (error)
2915 		goto fail;
2916 
2917 	send_args(r, lkb, ms);
2918 
2919 	error = send_message(mh, ms);
2920 	if (error)
2921 		goto fail;
2922 	return 0;
2923 
2924  fail:
2925 	remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
2926 	return error;
2927 }
2928 
2929 static int send_remove(struct dlm_rsb *r)
2930 {
2931 	struct dlm_message *ms;
2932 	struct dlm_mhandle *mh;
2933 	int to_nodeid, error;
2934 
2935 	to_nodeid = dlm_dir_nodeid(r);
2936 
2937 	error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2938 	if (error)
2939 		goto out;
2940 
2941 	memcpy(ms->m_extra, r->res_name, r->res_length);
2942 	ms->m_hash = r->res_hash;
2943 
2944 	error = send_message(mh, ms);
2945  out:
2946 	return error;
2947 }
2948 
2949 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2950 			     int mstype, int rv)
2951 {
2952 	struct dlm_message *ms;
2953 	struct dlm_mhandle *mh;
2954 	int to_nodeid, error;
2955 
2956 	to_nodeid = lkb->lkb_nodeid;
2957 
2958 	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2959 	if (error)
2960 		goto out;
2961 
2962 	send_args(r, lkb, ms);
2963 
2964 	ms->m_result = rv;
2965 
2966 	error = send_message(mh, ms);
2967  out:
2968 	return error;
2969 }
2970 
2971 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2972 {
2973 	return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
2974 }
2975 
2976 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2977 {
2978 	return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
2979 }
2980 
2981 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2982 {
2983 	return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
2984 }
2985 
2986 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2987 {
2988 	return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
2989 }
2990 
2991 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
2992 			     int ret_nodeid, int rv)
2993 {
2994 	struct dlm_rsb *r = &ls->ls_stub_rsb;
2995 	struct dlm_message *ms;
2996 	struct dlm_mhandle *mh;
2997 	int error, nodeid = ms_in->m_header.h_nodeid;
2998 
2999 	error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3000 	if (error)
3001 		goto out;
3002 
3003 	ms->m_lkid = ms_in->m_lkid;
3004 	ms->m_result = rv;
3005 	ms->m_nodeid = ret_nodeid;
3006 
3007 	error = send_message(mh, ms);
3008  out:
3009 	return error;
3010 }
3011 
3012 /* which args we save from a received message depends heavily on the type
3013    of message, unlike the send side where we can safely send everything about
3014    the lkb for any type of message */
3015 
3016 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
3017 {
3018 	lkb->lkb_exflags = ms->m_exflags;
3019 	lkb->lkb_sbflags = ms->m_sbflags;
3020 	lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3021 		         (ms->m_flags & 0x0000FFFF);
3022 }
3023 
3024 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3025 {
3026 	lkb->lkb_sbflags = ms->m_sbflags;
3027 	lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3028 		         (ms->m_flags & 0x0000FFFF);
3029 }
3030 
3031 static int receive_extralen(struct dlm_message *ms)
3032 {
3033 	return (ms->m_header.h_length - sizeof(struct dlm_message));
3034 }
3035 
3036 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3037 		       struct dlm_message *ms)
3038 {
3039 	int len;
3040 
3041 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3042 		if (!lkb->lkb_lvbptr)
3043 			lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3044 		if (!lkb->lkb_lvbptr)
3045 			return -ENOMEM;
3046 		len = receive_extralen(ms);
3047 		if (len > DLM_RESNAME_MAXLEN)
3048 			len = DLM_RESNAME_MAXLEN;
3049 		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3050 	}
3051 	return 0;
3052 }
3053 
3054 static void fake_bastfn(void *astparam, int mode)
3055 {
3056 	log_print("fake_bastfn should not be called");
3057 }
3058 
3059 static void fake_astfn(void *astparam)
3060 {
3061 	log_print("fake_astfn should not be called");
3062 }
3063 
3064 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3065 				struct dlm_message *ms)
3066 {
3067 	lkb->lkb_nodeid = ms->m_header.h_nodeid;
3068 	lkb->lkb_ownpid = ms->m_pid;
3069 	lkb->lkb_remid = ms->m_lkid;
3070 	lkb->lkb_grmode = DLM_LOCK_IV;
3071 	lkb->lkb_rqmode = ms->m_rqmode;
3072 
3073 	lkb->lkb_bastfn = (ms->m_asts & AST_BAST) ? &fake_bastfn : NULL;
3074 	lkb->lkb_astfn = (ms->m_asts & AST_COMP) ? &fake_astfn : NULL;
3075 
3076 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3077 		/* lkb was just created so there won't be an lvb yet */
3078 		lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3079 		if (!lkb->lkb_lvbptr)
3080 			return -ENOMEM;
3081 	}
3082 
3083 	return 0;
3084 }
3085 
3086 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3087 				struct dlm_message *ms)
3088 {
3089 	if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3090 		return -EBUSY;
3091 
3092 	if (receive_lvb(ls, lkb, ms))
3093 		return -ENOMEM;
3094 
3095 	lkb->lkb_rqmode = ms->m_rqmode;
3096 	lkb->lkb_lvbseq = ms->m_lvbseq;
3097 
3098 	return 0;
3099 }
3100 
3101 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3102 			       struct dlm_message *ms)
3103 {
3104 	if (receive_lvb(ls, lkb, ms))
3105 		return -ENOMEM;
3106 	return 0;
3107 }
3108 
3109 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3110    uses to send a reply and that the remote end uses to process the reply. */
3111 
3112 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3113 {
3114 	struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3115 	lkb->lkb_nodeid = ms->m_header.h_nodeid;
3116 	lkb->lkb_remid = ms->m_lkid;
3117 }
3118 
3119 /* This is called after the rsb is locked so that we can safely inspect
3120    fields in the lkb. */
3121 
3122 static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3123 {
3124 	int from = ms->m_header.h_nodeid;
3125 	int error = 0;
3126 
3127 	switch (ms->m_type) {
3128 	case DLM_MSG_CONVERT:
3129 	case DLM_MSG_UNLOCK:
3130 	case DLM_MSG_CANCEL:
3131 		if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3132 			error = -EINVAL;
3133 		break;
3134 
3135 	case DLM_MSG_CONVERT_REPLY:
3136 	case DLM_MSG_UNLOCK_REPLY:
3137 	case DLM_MSG_CANCEL_REPLY:
3138 	case DLM_MSG_GRANT:
3139 	case DLM_MSG_BAST:
3140 		if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3141 			error = -EINVAL;
3142 		break;
3143 
3144 	case DLM_MSG_REQUEST_REPLY:
3145 		if (!is_process_copy(lkb))
3146 			error = -EINVAL;
3147 		else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3148 			error = -EINVAL;
3149 		break;
3150 
3151 	default:
3152 		error = -EINVAL;
3153 	}
3154 
3155 	if (error)
3156 		log_error(lkb->lkb_resource->res_ls,
3157 			  "ignore invalid message %d from %d %x %x %x %d",
3158 			  ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
3159 			  lkb->lkb_flags, lkb->lkb_nodeid);
3160 	return error;
3161 }
3162 
3163 static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3164 {
3165 	struct dlm_lkb *lkb;
3166 	struct dlm_rsb *r;
3167 	int error, namelen;
3168 
3169 	error = create_lkb(ls, &lkb);
3170 	if (error)
3171 		goto fail;
3172 
3173 	receive_flags(lkb, ms);
3174 	lkb->lkb_flags |= DLM_IFL_MSTCPY;
3175 	error = receive_request_args(ls, lkb, ms);
3176 	if (error) {
3177 		__put_lkb(ls, lkb);
3178 		goto fail;
3179 	}
3180 
3181 	namelen = receive_extralen(ms);
3182 
3183 	error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
3184 	if (error) {
3185 		__put_lkb(ls, lkb);
3186 		goto fail;
3187 	}
3188 
3189 	lock_rsb(r);
3190 
3191 	attach_lkb(r, lkb);
3192 	error = do_request(r, lkb);
3193 	send_request_reply(r, lkb, error);
3194 
3195 	unlock_rsb(r);
3196 	put_rsb(r);
3197 
3198 	if (error == -EINPROGRESS)
3199 		error = 0;
3200 	if (error)
3201 		dlm_put_lkb(lkb);
3202 	return;
3203 
3204  fail:
3205 	setup_stub_lkb(ls, ms);
3206 	send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3207 }
3208 
3209 static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3210 {
3211 	struct dlm_lkb *lkb;
3212 	struct dlm_rsb *r;
3213 	int error, reply = 1;
3214 
3215 	error = find_lkb(ls, ms->m_remid, &lkb);
3216 	if (error)
3217 		goto fail;
3218 
3219 	r = lkb->lkb_resource;
3220 
3221 	hold_rsb(r);
3222 	lock_rsb(r);
3223 
3224 	error = validate_message(lkb, ms);
3225 	if (error)
3226 		goto out;
3227 
3228 	receive_flags(lkb, ms);
3229 	error = receive_convert_args(ls, lkb, ms);
3230 	if (error)
3231 		goto out_reply;
3232 	reply = !down_conversion(lkb);
3233 
3234 	error = do_convert(r, lkb);
3235  out_reply:
3236 	if (reply)
3237 		send_convert_reply(r, lkb, error);
3238  out:
3239 	unlock_rsb(r);
3240 	put_rsb(r);
3241 	dlm_put_lkb(lkb);
3242 	return;
3243 
3244  fail:
3245 	setup_stub_lkb(ls, ms);
3246 	send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3247 }
3248 
3249 static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3250 {
3251 	struct dlm_lkb *lkb;
3252 	struct dlm_rsb *r;
3253 	int error;
3254 
3255 	error = find_lkb(ls, ms->m_remid, &lkb);
3256 	if (error)
3257 		goto fail;
3258 
3259 	r = lkb->lkb_resource;
3260 
3261 	hold_rsb(r);
3262 	lock_rsb(r);
3263 
3264 	error = validate_message(lkb, ms);
3265 	if (error)
3266 		goto out;
3267 
3268 	receive_flags(lkb, ms);
3269 	error = receive_unlock_args(ls, lkb, ms);
3270 	if (error)
3271 		goto out_reply;
3272 
3273 	error = do_unlock(r, lkb);
3274  out_reply:
3275 	send_unlock_reply(r, lkb, error);
3276  out:
3277 	unlock_rsb(r);
3278 	put_rsb(r);
3279 	dlm_put_lkb(lkb);
3280 	return;
3281 
3282  fail:
3283 	setup_stub_lkb(ls, ms);
3284 	send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3285 }
3286 
3287 static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
3288 {
3289 	struct dlm_lkb *lkb;
3290 	struct dlm_rsb *r;
3291 	int error;
3292 
3293 	error = find_lkb(ls, ms->m_remid, &lkb);
3294 	if (error)
3295 		goto fail;
3296 
3297 	receive_flags(lkb, ms);
3298 
3299 	r = lkb->lkb_resource;
3300 
3301 	hold_rsb(r);
3302 	lock_rsb(r);
3303 
3304 	error = validate_message(lkb, ms);
3305 	if (error)
3306 		goto out;
3307 
3308 	error = do_cancel(r, lkb);
3309 	send_cancel_reply(r, lkb, error);
3310  out:
3311 	unlock_rsb(r);
3312 	put_rsb(r);
3313 	dlm_put_lkb(lkb);
3314 	return;
3315 
3316  fail:
3317 	setup_stub_lkb(ls, ms);
3318 	send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3319 }
3320 
3321 static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
3322 {
3323 	struct dlm_lkb *lkb;
3324 	struct dlm_rsb *r;
3325 	int error;
3326 
3327 	error = find_lkb(ls, ms->m_remid, &lkb);
3328 	if (error) {
3329 		log_debug(ls, "receive_grant from %d no lkb %x",
3330 			  ms->m_header.h_nodeid, ms->m_remid);
3331 		return;
3332 	}
3333 
3334 	r = lkb->lkb_resource;
3335 
3336 	hold_rsb(r);
3337 	lock_rsb(r);
3338 
3339 	error = validate_message(lkb, ms);
3340 	if (error)
3341 		goto out;
3342 
3343 	receive_flags_reply(lkb, ms);
3344 	if (is_altmode(lkb))
3345 		munge_altmode(lkb, ms);
3346 	grant_lock_pc(r, lkb, ms);
3347 	queue_cast(r, lkb, 0);
3348  out:
3349 	unlock_rsb(r);
3350 	put_rsb(r);
3351 	dlm_put_lkb(lkb);
3352 }
3353 
3354 static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
3355 {
3356 	struct dlm_lkb *lkb;
3357 	struct dlm_rsb *r;
3358 	int error;
3359 
3360 	error = find_lkb(ls, ms->m_remid, &lkb);
3361 	if (error) {
3362 		log_debug(ls, "receive_bast from %d no lkb %x",
3363 			  ms->m_header.h_nodeid, ms->m_remid);
3364 		return;
3365 	}
3366 
3367 	r = lkb->lkb_resource;
3368 
3369 	hold_rsb(r);
3370 	lock_rsb(r);
3371 
3372 	error = validate_message(lkb, ms);
3373 	if (error)
3374 		goto out;
3375 
3376 	queue_bast(r, lkb, ms->m_bastmode);
3377  out:
3378 	unlock_rsb(r);
3379 	put_rsb(r);
3380 	dlm_put_lkb(lkb);
3381 }
3382 
3383 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3384 {
3385 	int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
3386 
3387 	from_nodeid = ms->m_header.h_nodeid;
3388 	our_nodeid = dlm_our_nodeid();
3389 
3390 	len = receive_extralen(ms);
3391 
3392 	dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3393 	if (dir_nodeid != our_nodeid) {
3394 		log_error(ls, "lookup dir_nodeid %d from %d",
3395 			  dir_nodeid, from_nodeid);
3396 		error = -EINVAL;
3397 		ret_nodeid = -1;
3398 		goto out;
3399 	}
3400 
3401 	error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
3402 
3403 	/* Optimization: we're master so treat lookup as a request */
3404 	if (!error && ret_nodeid == our_nodeid) {
3405 		receive_request(ls, ms);
3406 		return;
3407 	}
3408  out:
3409 	send_lookup_reply(ls, ms, ret_nodeid, error);
3410 }
3411 
3412 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
3413 {
3414 	int len, dir_nodeid, from_nodeid;
3415 
3416 	from_nodeid = ms->m_header.h_nodeid;
3417 
3418 	len = receive_extralen(ms);
3419 
3420 	dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3421 	if (dir_nodeid != dlm_our_nodeid()) {
3422 		log_error(ls, "remove dir entry dir_nodeid %d from %d",
3423 			  dir_nodeid, from_nodeid);
3424 		return;
3425 	}
3426 
3427 	dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
3428 }
3429 
3430 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3431 {
3432 	do_purge(ls, ms->m_nodeid, ms->m_pid);
3433 }
3434 
3435 static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3436 {
3437 	struct dlm_lkb *lkb;
3438 	struct dlm_rsb *r;
3439 	int error, mstype, result;
3440 
3441 	error = find_lkb(ls, ms->m_remid, &lkb);
3442 	if (error) {
3443 		log_debug(ls, "receive_request_reply from %d no lkb %x",
3444 			  ms->m_header.h_nodeid, ms->m_remid);
3445 		return;
3446 	}
3447 
3448 	r = lkb->lkb_resource;
3449 	hold_rsb(r);
3450 	lock_rsb(r);
3451 
3452 	error = validate_message(lkb, ms);
3453 	if (error)
3454 		goto out;
3455 
3456 	mstype = lkb->lkb_wait_type;
3457 	error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3458 	if (error)
3459 		goto out;
3460 
3461 	/* Optimization: the dir node was also the master, so it took our
3462 	   lookup as a request and sent request reply instead of lookup reply */
3463 	if (mstype == DLM_MSG_LOOKUP) {
3464 		r->res_nodeid = ms->m_header.h_nodeid;
3465 		lkb->lkb_nodeid = r->res_nodeid;
3466 	}
3467 
3468 	/* this is the value returned from do_request() on the master */
3469 	result = ms->m_result;
3470 
3471 	switch (result) {
3472 	case -EAGAIN:
3473 		/* request would block (be queued) on remote master */
3474 		queue_cast(r, lkb, -EAGAIN);
3475 		confirm_master(r, -EAGAIN);
3476 		unhold_lkb(lkb); /* undoes create_lkb() */
3477 		break;
3478 
3479 	case -EINPROGRESS:
3480 	case 0:
3481 		/* request was queued or granted on remote master */
3482 		receive_flags_reply(lkb, ms);
3483 		lkb->lkb_remid = ms->m_lkid;
3484 		if (is_altmode(lkb))
3485 			munge_altmode(lkb, ms);
3486 		if (result) {
3487 			add_lkb(r, lkb, DLM_LKSTS_WAITING);
3488 			add_timeout(lkb);
3489 		} else {
3490 			grant_lock_pc(r, lkb, ms);
3491 			queue_cast(r, lkb, 0);
3492 		}
3493 		confirm_master(r, result);
3494 		break;
3495 
3496 	case -EBADR:
3497 	case -ENOTBLK:
3498 		/* find_rsb failed to find rsb or rsb wasn't master */
3499 		log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3500 			  lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
3501 		r->res_nodeid = -1;
3502 		lkb->lkb_nodeid = -1;
3503 
3504 		if (is_overlap(lkb)) {
3505 			/* we'll ignore error in cancel/unlock reply */
3506 			queue_cast_overlap(r, lkb);
3507 			confirm_master(r, result);
3508 			unhold_lkb(lkb); /* undoes create_lkb() */
3509 		} else
3510 			_request_lock(r, lkb);
3511 		break;
3512 
3513 	default:
3514 		log_error(ls, "receive_request_reply %x error %d",
3515 			  lkb->lkb_id, result);
3516 	}
3517 
3518 	if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3519 		log_debug(ls, "receive_request_reply %x result %d unlock",
3520 			  lkb->lkb_id, result);
3521 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3522 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3523 		send_unlock(r, lkb);
3524 	} else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3525 		log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3526 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3527 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3528 		send_cancel(r, lkb);
3529 	} else {
3530 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3531 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3532 	}
3533  out:
3534 	unlock_rsb(r);
3535 	put_rsb(r);
3536 	dlm_put_lkb(lkb);
3537 }
3538 
3539 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3540 				    struct dlm_message *ms)
3541 {
3542 	/* this is the value returned from do_convert() on the master */
3543 	switch (ms->m_result) {
3544 	case -EAGAIN:
3545 		/* convert would block (be queued) on remote master */
3546 		queue_cast(r, lkb, -EAGAIN);
3547 		break;
3548 
3549 	case -EDEADLK:
3550 		receive_flags_reply(lkb, ms);
3551 		revert_lock_pc(r, lkb);
3552 		queue_cast(r, lkb, -EDEADLK);
3553 		break;
3554 
3555 	case -EINPROGRESS:
3556 		/* convert was queued on remote master */
3557 		receive_flags_reply(lkb, ms);
3558 		if (is_demoted(lkb))
3559 			munge_demoted(lkb, ms);
3560 		del_lkb(r, lkb);
3561 		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3562 		add_timeout(lkb);
3563 		break;
3564 
3565 	case 0:
3566 		/* convert was granted on remote master */
3567 		receive_flags_reply(lkb, ms);
3568 		if (is_demoted(lkb))
3569 			munge_demoted(lkb, ms);
3570 		grant_lock_pc(r, lkb, ms);
3571 		queue_cast(r, lkb, 0);
3572 		break;
3573 
3574 	default:
3575 		log_error(r->res_ls, "receive_convert_reply %x error %d",
3576 			  lkb->lkb_id, ms->m_result);
3577 	}
3578 }
3579 
3580 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3581 {
3582 	struct dlm_rsb *r = lkb->lkb_resource;
3583 	int error;
3584 
3585 	hold_rsb(r);
3586 	lock_rsb(r);
3587 
3588 	error = validate_message(lkb, ms);
3589 	if (error)
3590 		goto out;
3591 
3592 	/* stub reply can happen with waiters_mutex held */
3593 	error = remove_from_waiters_ms(lkb, ms);
3594 	if (error)
3595 		goto out;
3596 
3597 	__receive_convert_reply(r, lkb, ms);
3598  out:
3599 	unlock_rsb(r);
3600 	put_rsb(r);
3601 }
3602 
3603 static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3604 {
3605 	struct dlm_lkb *lkb;
3606 	int error;
3607 
3608 	error = find_lkb(ls, ms->m_remid, &lkb);
3609 	if (error) {
3610 		log_debug(ls, "receive_convert_reply from %d no lkb %x",
3611 			  ms->m_header.h_nodeid, ms->m_remid);
3612 		return;
3613 	}
3614 
3615 	_receive_convert_reply(lkb, ms);
3616 	dlm_put_lkb(lkb);
3617 }
3618 
3619 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3620 {
3621 	struct dlm_rsb *r = lkb->lkb_resource;
3622 	int error;
3623 
3624 	hold_rsb(r);
3625 	lock_rsb(r);
3626 
3627 	error = validate_message(lkb, ms);
3628 	if (error)
3629 		goto out;
3630 
3631 	/* stub reply can happen with waiters_mutex held */
3632 	error = remove_from_waiters_ms(lkb, ms);
3633 	if (error)
3634 		goto out;
3635 
3636 	/* this is the value returned from do_unlock() on the master */
3637 
3638 	switch (ms->m_result) {
3639 	case -DLM_EUNLOCK:
3640 		receive_flags_reply(lkb, ms);
3641 		remove_lock_pc(r, lkb);
3642 		queue_cast(r, lkb, -DLM_EUNLOCK);
3643 		break;
3644 	case -ENOENT:
3645 		break;
3646 	default:
3647 		log_error(r->res_ls, "receive_unlock_reply %x error %d",
3648 			  lkb->lkb_id, ms->m_result);
3649 	}
3650  out:
3651 	unlock_rsb(r);
3652 	put_rsb(r);
3653 }
3654 
3655 static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3656 {
3657 	struct dlm_lkb *lkb;
3658 	int error;
3659 
3660 	error = find_lkb(ls, ms->m_remid, &lkb);
3661 	if (error) {
3662 		log_debug(ls, "receive_unlock_reply from %d no lkb %x",
3663 			  ms->m_header.h_nodeid, ms->m_remid);
3664 		return;
3665 	}
3666 
3667 	_receive_unlock_reply(lkb, ms);
3668 	dlm_put_lkb(lkb);
3669 }
3670 
3671 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3672 {
3673 	struct dlm_rsb *r = lkb->lkb_resource;
3674 	int error;
3675 
3676 	hold_rsb(r);
3677 	lock_rsb(r);
3678 
3679 	error = validate_message(lkb, ms);
3680 	if (error)
3681 		goto out;
3682 
3683 	/* stub reply can happen with waiters_mutex held */
3684 	error = remove_from_waiters_ms(lkb, ms);
3685 	if (error)
3686 		goto out;
3687 
3688 	/* this is the value returned from do_cancel() on the master */
3689 
3690 	switch (ms->m_result) {
3691 	case -DLM_ECANCEL:
3692 		receive_flags_reply(lkb, ms);
3693 		revert_lock_pc(r, lkb);
3694 		queue_cast(r, lkb, -DLM_ECANCEL);
3695 		break;
3696 	case 0:
3697 		break;
3698 	default:
3699 		log_error(r->res_ls, "receive_cancel_reply %x error %d",
3700 			  lkb->lkb_id, ms->m_result);
3701 	}
3702  out:
3703 	unlock_rsb(r);
3704 	put_rsb(r);
3705 }
3706 
3707 static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3708 {
3709 	struct dlm_lkb *lkb;
3710 	int error;
3711 
3712 	error = find_lkb(ls, ms->m_remid, &lkb);
3713 	if (error) {
3714 		log_debug(ls, "receive_cancel_reply from %d no lkb %x",
3715 			  ms->m_header.h_nodeid, ms->m_remid);
3716 		return;
3717 	}
3718 
3719 	_receive_cancel_reply(lkb, ms);
3720 	dlm_put_lkb(lkb);
3721 }
3722 
3723 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3724 {
3725 	struct dlm_lkb *lkb;
3726 	struct dlm_rsb *r;
3727 	int error, ret_nodeid;
3728 
3729 	error = find_lkb(ls, ms->m_lkid, &lkb);
3730 	if (error) {
3731 		log_error(ls, "receive_lookup_reply no lkb");
3732 		return;
3733 	}
3734 
3735 	/* ms->m_result is the value returned by dlm_dir_lookup on dir node
3736 	   FIXME: will a non-zero error ever be returned? */
3737 
3738 	r = lkb->lkb_resource;
3739 	hold_rsb(r);
3740 	lock_rsb(r);
3741 
3742 	error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3743 	if (error)
3744 		goto out;
3745 
3746 	ret_nodeid = ms->m_nodeid;
3747 	if (ret_nodeid == dlm_our_nodeid()) {
3748 		r->res_nodeid = 0;
3749 		ret_nodeid = 0;
3750 		r->res_first_lkid = 0;
3751 	} else {
3752 		/* set_master() will copy res_nodeid to lkb_nodeid */
3753 		r->res_nodeid = ret_nodeid;
3754 	}
3755 
3756 	if (is_overlap(lkb)) {
3757 		log_debug(ls, "receive_lookup_reply %x unlock %x",
3758 			  lkb->lkb_id, lkb->lkb_flags);
3759 		queue_cast_overlap(r, lkb);
3760 		unhold_lkb(lkb); /* undoes create_lkb() */
3761 		goto out_list;
3762 	}
3763 
3764 	_request_lock(r, lkb);
3765 
3766  out_list:
3767 	if (!ret_nodeid)
3768 		process_lookup_list(r);
3769  out:
3770 	unlock_rsb(r);
3771 	put_rsb(r);
3772 	dlm_put_lkb(lkb);
3773 }
3774 
3775 static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
3776 {
3777 	if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
3778 		log_debug(ls, "ignore non-member message %d from %d %x %x %d",
3779 			  ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
3780 			  ms->m_remid, ms->m_result);
3781 		return;
3782 	}
3783 
3784 	switch (ms->m_type) {
3785 
3786 	/* messages sent to a master node */
3787 
3788 	case DLM_MSG_REQUEST:
3789 		receive_request(ls, ms);
3790 		break;
3791 
3792 	case DLM_MSG_CONVERT:
3793 		receive_convert(ls, ms);
3794 		break;
3795 
3796 	case DLM_MSG_UNLOCK:
3797 		receive_unlock(ls, ms);
3798 		break;
3799 
3800 	case DLM_MSG_CANCEL:
3801 		receive_cancel(ls, ms);
3802 		break;
3803 
3804 	/* messages sent from a master node (replies to above) */
3805 
3806 	case DLM_MSG_REQUEST_REPLY:
3807 		receive_request_reply(ls, ms);
3808 		break;
3809 
3810 	case DLM_MSG_CONVERT_REPLY:
3811 		receive_convert_reply(ls, ms);
3812 		break;
3813 
3814 	case DLM_MSG_UNLOCK_REPLY:
3815 		receive_unlock_reply(ls, ms);
3816 		break;
3817 
3818 	case DLM_MSG_CANCEL_REPLY:
3819 		receive_cancel_reply(ls, ms);
3820 		break;
3821 
3822 	/* messages sent from a master node (only two types of async msg) */
3823 
3824 	case DLM_MSG_GRANT:
3825 		receive_grant(ls, ms);
3826 		break;
3827 
3828 	case DLM_MSG_BAST:
3829 		receive_bast(ls, ms);
3830 		break;
3831 
3832 	/* messages sent to a dir node */
3833 
3834 	case DLM_MSG_LOOKUP:
3835 		receive_lookup(ls, ms);
3836 		break;
3837 
3838 	case DLM_MSG_REMOVE:
3839 		receive_remove(ls, ms);
3840 		break;
3841 
3842 	/* messages sent from a dir node (remove has no reply) */
3843 
3844 	case DLM_MSG_LOOKUP_REPLY:
3845 		receive_lookup_reply(ls, ms);
3846 		break;
3847 
3848 	/* other messages */
3849 
3850 	case DLM_MSG_PURGE:
3851 		receive_purge(ls, ms);
3852 		break;
3853 
3854 	default:
3855 		log_error(ls, "unknown message type %d", ms->m_type);
3856 	}
3857 
3858 	dlm_astd_wake();
3859 }
3860 
3861 /* If the lockspace is in recovery mode (locking stopped), then normal
3862    messages are saved on the requestqueue for processing after recovery is
3863    done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
3864    messages off the requestqueue before we process new ones. This occurs right
3865    after recovery completes when we transition from saving all messages on
3866    requestqueue, to processing all the saved messages, to processing new
3867    messages as they arrive. */
3868 
3869 static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
3870 				int nodeid)
3871 {
3872 	if (dlm_locking_stopped(ls)) {
3873 		dlm_add_requestqueue(ls, nodeid, ms);
3874 	} else {
3875 		dlm_wait_requestqueue(ls);
3876 		_receive_message(ls, ms);
3877 	}
3878 }
3879 
3880 /* This is called by dlm_recoverd to process messages that were saved on
3881    the requestqueue. */
3882 
3883 void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms)
3884 {
3885 	_receive_message(ls, ms);
3886 }
3887 
3888 /* This is called by the midcomms layer when something is received for
3889    the lockspace.  It could be either a MSG (normal message sent as part of
3890    standard locking activity) or an RCOM (recovery message sent as part of
3891    lockspace recovery). */
3892 
3893 void dlm_receive_buffer(union dlm_packet *p, int nodeid)
3894 {
3895 	struct dlm_header *hd = &p->header;
3896 	struct dlm_ls *ls;
3897 	int type = 0;
3898 
3899 	switch (hd->h_cmd) {
3900 	case DLM_MSG:
3901 		dlm_message_in(&p->message);
3902 		type = p->message.m_type;
3903 		break;
3904 	case DLM_RCOM:
3905 		dlm_rcom_in(&p->rcom);
3906 		type = p->rcom.rc_type;
3907 		break;
3908 	default:
3909 		log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
3910 		return;
3911 	}
3912 
3913 	if (hd->h_nodeid != nodeid) {
3914 		log_print("invalid h_nodeid %d from %d lockspace %x",
3915 			  hd->h_nodeid, nodeid, hd->h_lockspace);
3916 		return;
3917 	}
3918 
3919 	ls = dlm_find_lockspace_global(hd->h_lockspace);
3920 	if (!ls) {
3921 		if (dlm_config.ci_log_debug)
3922 			log_print("invalid lockspace %x from %d cmd %d type %d",
3923 				  hd->h_lockspace, nodeid, hd->h_cmd, type);
3924 
3925 		if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
3926 			dlm_send_ls_not_ready(nodeid, &p->rcom);
3927 		return;
3928 	}
3929 
3930 	/* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
3931 	   be inactive (in this ls) before transitioning to recovery mode */
3932 
3933 	down_read(&ls->ls_recv_active);
3934 	if (hd->h_cmd == DLM_MSG)
3935 		dlm_receive_message(ls, &p->message, nodeid);
3936 	else
3937 		dlm_receive_rcom(ls, &p->rcom, nodeid);
3938 	up_read(&ls->ls_recv_active);
3939 
3940 	dlm_put_lockspace(ls);
3941 }
3942 
3943 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3944 {
3945 	if (middle_conversion(lkb)) {
3946 		hold_lkb(lkb);
3947 		ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
3948 		ls->ls_stub_ms.m_result = -EINPROGRESS;
3949 		ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3950 		ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
3951 		_receive_convert_reply(lkb, &ls->ls_stub_ms);
3952 
3953 		/* Same special case as in receive_rcom_lock_args() */
3954 		lkb->lkb_grmode = DLM_LOCK_IV;
3955 		rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
3956 		unhold_lkb(lkb);
3957 
3958 	} else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
3959 		lkb->lkb_flags |= DLM_IFL_RESEND;
3960 	}
3961 
3962 	/* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
3963 	   conversions are async; there's no reply from the remote master */
3964 }
3965 
3966 /* A waiting lkb needs recovery if the master node has failed, or
3967    the master node is changing (only when no directory is used) */
3968 
3969 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
3970 {
3971 	if (dlm_is_removed(ls, lkb->lkb_nodeid))
3972 		return 1;
3973 
3974 	if (!dlm_no_directory(ls))
3975 		return 0;
3976 
3977 	if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
3978 		return 1;
3979 
3980 	return 0;
3981 }
3982 
3983 /* Recovery for locks that are waiting for replies from nodes that are now
3984    gone.  We can just complete unlocks and cancels by faking a reply from the
3985    dead node.  Requests and up-conversions we flag to be resent after
3986    recovery.  Down-conversions can just be completed with a fake reply like
3987    unlocks.  Conversions between PR and CW need special attention. */
3988 
3989 void dlm_recover_waiters_pre(struct dlm_ls *ls)
3990 {
3991 	struct dlm_lkb *lkb, *safe;
3992 	int wait_type, stub_unlock_result, stub_cancel_result;
3993 
3994 	mutex_lock(&ls->ls_waiters_mutex);
3995 
3996 	list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
3997 		log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
3998 			  lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
3999 
4000 		/* all outstanding lookups, regardless of destination  will be
4001 		   resent after recovery is done */
4002 
4003 		if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
4004 			lkb->lkb_flags |= DLM_IFL_RESEND;
4005 			continue;
4006 		}
4007 
4008 		if (!waiter_needs_recovery(ls, lkb))
4009 			continue;
4010 
4011 		wait_type = lkb->lkb_wait_type;
4012 		stub_unlock_result = -DLM_EUNLOCK;
4013 		stub_cancel_result = -DLM_ECANCEL;
4014 
4015 		/* Main reply may have been received leaving a zero wait_type,
4016 		   but a reply for the overlapping op may not have been
4017 		   received.  In that case we need to fake the appropriate
4018 		   reply for the overlap op. */
4019 
4020 		if (!wait_type) {
4021 			if (is_overlap_cancel(lkb)) {
4022 				wait_type = DLM_MSG_CANCEL;
4023 				if (lkb->lkb_grmode == DLM_LOCK_IV)
4024 					stub_cancel_result = 0;
4025 			}
4026 			if (is_overlap_unlock(lkb)) {
4027 				wait_type = DLM_MSG_UNLOCK;
4028 				if (lkb->lkb_grmode == DLM_LOCK_IV)
4029 					stub_unlock_result = -ENOENT;
4030 			}
4031 
4032 			log_debug(ls, "rwpre overlap %x %x %d %d %d",
4033 				  lkb->lkb_id, lkb->lkb_flags, wait_type,
4034 				  stub_cancel_result, stub_unlock_result);
4035 		}
4036 
4037 		switch (wait_type) {
4038 
4039 		case DLM_MSG_REQUEST:
4040 			lkb->lkb_flags |= DLM_IFL_RESEND;
4041 			break;
4042 
4043 		case DLM_MSG_CONVERT:
4044 			recover_convert_waiter(ls, lkb);
4045 			break;
4046 
4047 		case DLM_MSG_UNLOCK:
4048 			hold_lkb(lkb);
4049 			ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
4050 			ls->ls_stub_ms.m_result = stub_unlock_result;
4051 			ls->ls_stub_ms.m_flags = lkb->lkb_flags;
4052 			ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
4053 			_receive_unlock_reply(lkb, &ls->ls_stub_ms);
4054 			dlm_put_lkb(lkb);
4055 			break;
4056 
4057 		case DLM_MSG_CANCEL:
4058 			hold_lkb(lkb);
4059 			ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
4060 			ls->ls_stub_ms.m_result = stub_cancel_result;
4061 			ls->ls_stub_ms.m_flags = lkb->lkb_flags;
4062 			ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
4063 			_receive_cancel_reply(lkb, &ls->ls_stub_ms);
4064 			dlm_put_lkb(lkb);
4065 			break;
4066 
4067 		default:
4068 			log_error(ls, "invalid lkb wait_type %d %d",
4069 				  lkb->lkb_wait_type, wait_type);
4070 		}
4071 		schedule();
4072 	}
4073 	mutex_unlock(&ls->ls_waiters_mutex);
4074 }
4075 
4076 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
4077 {
4078 	struct dlm_lkb *lkb;
4079 	int found = 0;
4080 
4081 	mutex_lock(&ls->ls_waiters_mutex);
4082 	list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
4083 		if (lkb->lkb_flags & DLM_IFL_RESEND) {
4084 			hold_lkb(lkb);
4085 			found = 1;
4086 			break;
4087 		}
4088 	}
4089 	mutex_unlock(&ls->ls_waiters_mutex);
4090 
4091 	if (!found)
4092 		lkb = NULL;
4093 	return lkb;
4094 }
4095 
4096 /* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
4097    master or dir-node for r.  Processing the lkb may result in it being placed
4098    back on waiters. */
4099 
4100 /* We do this after normal locking has been enabled and any saved messages
4101    (in requestqueue) have been processed.  We should be confident that at
4102    this point we won't get or process a reply to any of these waiting
4103    operations.  But, new ops may be coming in on the rsbs/locks here from
4104    userspace or remotely. */
4105 
4106 /* there may have been an overlap unlock/cancel prior to recovery or after
4107    recovery.  if before, the lkb may still have a pos wait_count; if after, the
4108    overlap flag would just have been set and nothing new sent.  we can be
4109    confident here than any replies to either the initial op or overlap ops
4110    prior to recovery have been received. */
4111 
4112 int dlm_recover_waiters_post(struct dlm_ls *ls)
4113 {
4114 	struct dlm_lkb *lkb;
4115 	struct dlm_rsb *r;
4116 	int error = 0, mstype, err, oc, ou;
4117 
4118 	while (1) {
4119 		if (dlm_locking_stopped(ls)) {
4120 			log_debug(ls, "recover_waiters_post aborted");
4121 			error = -EINTR;
4122 			break;
4123 		}
4124 
4125 		lkb = find_resend_waiter(ls);
4126 		if (!lkb)
4127 			break;
4128 
4129 		r = lkb->lkb_resource;
4130 		hold_rsb(r);
4131 		lock_rsb(r);
4132 
4133 		mstype = lkb->lkb_wait_type;
4134 		oc = is_overlap_cancel(lkb);
4135 		ou = is_overlap_unlock(lkb);
4136 		err = 0;
4137 
4138 		log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
4139 			  lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
4140 
4141 		/* At this point we assume that we won't get a reply to any
4142 		   previous op or overlap op on this lock.  First, do a big
4143 		   remove_from_waiters() for all previous ops. */
4144 
4145 		lkb->lkb_flags &= ~DLM_IFL_RESEND;
4146 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4147 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4148 		lkb->lkb_wait_type = 0;
4149 		lkb->lkb_wait_count = 0;
4150 		mutex_lock(&ls->ls_waiters_mutex);
4151 		list_del_init(&lkb->lkb_wait_reply);
4152 		mutex_unlock(&ls->ls_waiters_mutex);
4153 		unhold_lkb(lkb); /* for waiters list */
4154 
4155 		if (oc || ou) {
4156 			/* do an unlock or cancel instead of resending */
4157 			switch (mstype) {
4158 			case DLM_MSG_LOOKUP:
4159 			case DLM_MSG_REQUEST:
4160 				queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
4161 							-DLM_ECANCEL);
4162 				unhold_lkb(lkb); /* undoes create_lkb() */
4163 				break;
4164 			case DLM_MSG_CONVERT:
4165 				if (oc) {
4166 					queue_cast(r, lkb, -DLM_ECANCEL);
4167 				} else {
4168 					lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
4169 					_unlock_lock(r, lkb);
4170 				}
4171 				break;
4172 			default:
4173 				err = 1;
4174 			}
4175 		} else {
4176 			switch (mstype) {
4177 			case DLM_MSG_LOOKUP:
4178 			case DLM_MSG_REQUEST:
4179 				_request_lock(r, lkb);
4180 				if (is_master(r))
4181 					confirm_master(r, 0);
4182 				break;
4183 			case DLM_MSG_CONVERT:
4184 				_convert_lock(r, lkb);
4185 				break;
4186 			default:
4187 				err = 1;
4188 			}
4189 		}
4190 
4191 		if (err)
4192 			log_error(ls, "recover_waiters_post %x %d %x %d %d",
4193 			  	  lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
4194 		unlock_rsb(r);
4195 		put_rsb(r);
4196 		dlm_put_lkb(lkb);
4197 	}
4198 
4199 	return error;
4200 }
4201 
4202 static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
4203 			int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
4204 {
4205 	struct dlm_ls *ls = r->res_ls;
4206 	struct dlm_lkb *lkb, *safe;
4207 
4208 	list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
4209 		if (test(ls, lkb)) {
4210 			rsb_set_flag(r, RSB_LOCKS_PURGED);
4211 			del_lkb(r, lkb);
4212 			/* this put should free the lkb */
4213 			if (!dlm_put_lkb(lkb))
4214 				log_error(ls, "purged lkb not released");
4215 		}
4216 	}
4217 }
4218 
4219 static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4220 {
4221 	return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
4222 }
4223 
4224 static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4225 {
4226 	return is_master_copy(lkb);
4227 }
4228 
4229 static void purge_dead_locks(struct dlm_rsb *r)
4230 {
4231 	purge_queue(r, &r->res_grantqueue, &purge_dead_test);
4232 	purge_queue(r, &r->res_convertqueue, &purge_dead_test);
4233 	purge_queue(r, &r->res_waitqueue, &purge_dead_test);
4234 }
4235 
4236 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
4237 {
4238 	purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
4239 	purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
4240 	purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
4241 }
4242 
4243 /* Get rid of locks held by nodes that are gone. */
4244 
4245 int dlm_purge_locks(struct dlm_ls *ls)
4246 {
4247 	struct dlm_rsb *r;
4248 
4249 	log_debug(ls, "dlm_purge_locks");
4250 
4251 	down_write(&ls->ls_root_sem);
4252 	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
4253 		hold_rsb(r);
4254 		lock_rsb(r);
4255 		if (is_master(r))
4256 			purge_dead_locks(r);
4257 		unlock_rsb(r);
4258 		unhold_rsb(r);
4259 
4260 		schedule();
4261 	}
4262 	up_write(&ls->ls_root_sem);
4263 
4264 	return 0;
4265 }
4266 
4267 static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
4268 {
4269 	struct dlm_rsb *r, *r_ret = NULL;
4270 
4271 	spin_lock(&ls->ls_rsbtbl[bucket].lock);
4272 	list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
4273 		if (!rsb_flag(r, RSB_LOCKS_PURGED))
4274 			continue;
4275 		hold_rsb(r);
4276 		rsb_clear_flag(r, RSB_LOCKS_PURGED);
4277 		r_ret = r;
4278 		break;
4279 	}
4280 	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
4281 	return r_ret;
4282 }
4283 
4284 void dlm_grant_after_purge(struct dlm_ls *ls)
4285 {
4286 	struct dlm_rsb *r;
4287 	int bucket = 0;
4288 
4289 	while (1) {
4290 		r = find_purged_rsb(ls, bucket);
4291 		if (!r) {
4292 			if (bucket == ls->ls_rsbtbl_size - 1)
4293 				break;
4294 			bucket++;
4295 			continue;
4296 		}
4297 		lock_rsb(r);
4298 		if (is_master(r)) {
4299 			grant_pending_locks(r);
4300 			confirm_master(r, 0);
4301 		}
4302 		unlock_rsb(r);
4303 		put_rsb(r);
4304 		schedule();
4305 	}
4306 }
4307 
4308 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
4309 					 uint32_t remid)
4310 {
4311 	struct dlm_lkb *lkb;
4312 
4313 	list_for_each_entry(lkb, head, lkb_statequeue) {
4314 		if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
4315 			return lkb;
4316 	}
4317 	return NULL;
4318 }
4319 
4320 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
4321 				    uint32_t remid)
4322 {
4323 	struct dlm_lkb *lkb;
4324 
4325 	lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
4326 	if (lkb)
4327 		return lkb;
4328 	lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
4329 	if (lkb)
4330 		return lkb;
4331 	lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
4332 	if (lkb)
4333 		return lkb;
4334 	return NULL;
4335 }
4336 
4337 /* needs at least dlm_rcom + rcom_lock */
4338 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
4339 				  struct dlm_rsb *r, struct dlm_rcom *rc)
4340 {
4341 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4342 
4343 	lkb->lkb_nodeid = rc->rc_header.h_nodeid;
4344 	lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
4345 	lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
4346 	lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
4347 	lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF;
4348 	lkb->lkb_flags |= DLM_IFL_MSTCPY;
4349 	lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
4350 	lkb->lkb_rqmode = rl->rl_rqmode;
4351 	lkb->lkb_grmode = rl->rl_grmode;
4352 	/* don't set lkb_status because add_lkb wants to itself */
4353 
4354 	lkb->lkb_bastfn = (rl->rl_asts & AST_BAST) ? &fake_bastfn : NULL;
4355 	lkb->lkb_astfn = (rl->rl_asts & AST_COMP) ? &fake_astfn : NULL;
4356 
4357 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
4358 		int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
4359 			 sizeof(struct rcom_lock);
4360 		if (lvblen > ls->ls_lvblen)
4361 			return -EINVAL;
4362 		lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
4363 		if (!lkb->lkb_lvbptr)
4364 			return -ENOMEM;
4365 		memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
4366 	}
4367 
4368 	/* Conversions between PR and CW (middle modes) need special handling.
4369 	   The real granted mode of these converting locks cannot be determined
4370 	   until all locks have been rebuilt on the rsb (recover_conversion) */
4371 
4372 	if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
4373 	    middle_conversion(lkb)) {
4374 		rl->rl_status = DLM_LKSTS_CONVERT;
4375 		lkb->lkb_grmode = DLM_LOCK_IV;
4376 		rsb_set_flag(r, RSB_RECOVER_CONVERT);
4377 	}
4378 
4379 	return 0;
4380 }
4381 
4382 /* This lkb may have been recovered in a previous aborted recovery so we need
4383    to check if the rsb already has an lkb with the given remote nodeid/lkid.
4384    If so we just send back a standard reply.  If not, we create a new lkb with
4385    the given values and send back our lkid.  We send back our lkid by sending
4386    back the rcom_lock struct we got but with the remid field filled in. */
4387 
4388 /* needs at least dlm_rcom + rcom_lock */
4389 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4390 {
4391 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4392 	struct dlm_rsb *r;
4393 	struct dlm_lkb *lkb;
4394 	int error;
4395 
4396 	if (rl->rl_parent_lkid) {
4397 		error = -EOPNOTSUPP;
4398 		goto out;
4399 	}
4400 
4401 	error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
4402 			 R_MASTER, &r);
4403 	if (error)
4404 		goto out;
4405 
4406 	lock_rsb(r);
4407 
4408 	lkb = search_remid(r, rc->rc_header.h_nodeid, le32_to_cpu(rl->rl_lkid));
4409 	if (lkb) {
4410 		error = -EEXIST;
4411 		goto out_remid;
4412 	}
4413 
4414 	error = create_lkb(ls, &lkb);
4415 	if (error)
4416 		goto out_unlock;
4417 
4418 	error = receive_rcom_lock_args(ls, lkb, r, rc);
4419 	if (error) {
4420 		__put_lkb(ls, lkb);
4421 		goto out_unlock;
4422 	}
4423 
4424 	attach_lkb(r, lkb);
4425 	add_lkb(r, lkb, rl->rl_status);
4426 	error = 0;
4427 
4428  out_remid:
4429 	/* this is the new value returned to the lock holder for
4430 	   saving in its process-copy lkb */
4431 	rl->rl_remid = cpu_to_le32(lkb->lkb_id);
4432 
4433  out_unlock:
4434 	unlock_rsb(r);
4435 	put_rsb(r);
4436  out:
4437 	if (error)
4438 		log_debug(ls, "recover_master_copy %d %x", error,
4439 			  le32_to_cpu(rl->rl_lkid));
4440 	rl->rl_result = cpu_to_le32(error);
4441 	return error;
4442 }
4443 
4444 /* needs at least dlm_rcom + rcom_lock */
4445 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4446 {
4447 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4448 	struct dlm_rsb *r;
4449 	struct dlm_lkb *lkb;
4450 	int error;
4451 
4452 	error = find_lkb(ls, le32_to_cpu(rl->rl_lkid), &lkb);
4453 	if (error) {
4454 		log_error(ls, "recover_process_copy no lkid %x",
4455 				le32_to_cpu(rl->rl_lkid));
4456 		return error;
4457 	}
4458 
4459 	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
4460 
4461 	error = le32_to_cpu(rl->rl_result);
4462 
4463 	r = lkb->lkb_resource;
4464 	hold_rsb(r);
4465 	lock_rsb(r);
4466 
4467 	switch (error) {
4468 	case -EBADR:
4469 		/* There's a chance the new master received our lock before
4470 		   dlm_recover_master_reply(), this wouldn't happen if we did
4471 		   a barrier between recover_masters and recover_locks. */
4472 		log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
4473 			  (unsigned long)r, r->res_name);
4474 		dlm_send_rcom_lock(r, lkb);
4475 		goto out;
4476 	case -EEXIST:
4477 		log_debug(ls, "master copy exists %x", lkb->lkb_id);
4478 		/* fall through */
4479 	case 0:
4480 		lkb->lkb_remid = le32_to_cpu(rl->rl_remid);
4481 		break;
4482 	default:
4483 		log_error(ls, "dlm_recover_process_copy unknown error %d %x",
4484 			  error, lkb->lkb_id);
4485 	}
4486 
4487 	/* an ack for dlm_recover_locks() which waits for replies from
4488 	   all the locks it sends to new masters */
4489 	dlm_recovered_lock(r);
4490  out:
4491 	unlock_rsb(r);
4492 	put_rsb(r);
4493 	dlm_put_lkb(lkb);
4494 
4495 	return 0;
4496 }
4497 
4498 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
4499 		     int mode, uint32_t flags, void *name, unsigned int namelen,
4500 		     unsigned long timeout_cs)
4501 {
4502 	struct dlm_lkb *lkb;
4503 	struct dlm_args args;
4504 	int error;
4505 
4506 	dlm_lock_recovery(ls);
4507 
4508 	error = create_lkb(ls, &lkb);
4509 	if (error) {
4510 		kfree(ua);
4511 		goto out;
4512 	}
4513 
4514 	if (flags & DLM_LKF_VALBLK) {
4515 		ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
4516 		if (!ua->lksb.sb_lvbptr) {
4517 			kfree(ua);
4518 			__put_lkb(ls, lkb);
4519 			error = -ENOMEM;
4520 			goto out;
4521 		}
4522 	}
4523 
4524 	/* After ua is attached to lkb it will be freed by dlm_free_lkb().
4525 	   When DLM_IFL_USER is set, the dlm knows that this is a userspace
4526 	   lock and that lkb_astparam is the dlm_user_args structure. */
4527 
4528 	error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
4529 			      fake_astfn, ua, fake_bastfn, &args);
4530 	lkb->lkb_flags |= DLM_IFL_USER;
4531 	ua->old_mode = DLM_LOCK_IV;
4532 
4533 	if (error) {
4534 		__put_lkb(ls, lkb);
4535 		goto out;
4536 	}
4537 
4538 	error = request_lock(ls, lkb, name, namelen, &args);
4539 
4540 	switch (error) {
4541 	case 0:
4542 		break;
4543 	case -EINPROGRESS:
4544 		error = 0;
4545 		break;
4546 	case -EAGAIN:
4547 		error = 0;
4548 		/* fall through */
4549 	default:
4550 		__put_lkb(ls, lkb);
4551 		goto out;
4552 	}
4553 
4554 	/* add this new lkb to the per-process list of locks */
4555 	spin_lock(&ua->proc->locks_spin);
4556 	hold_lkb(lkb);
4557 	list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4558 	spin_unlock(&ua->proc->locks_spin);
4559  out:
4560 	dlm_unlock_recovery(ls);
4561 	return error;
4562 }
4563 
4564 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4565 		     int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
4566 		     unsigned long timeout_cs)
4567 {
4568 	struct dlm_lkb *lkb;
4569 	struct dlm_args args;
4570 	struct dlm_user_args *ua;
4571 	int error;
4572 
4573 	dlm_lock_recovery(ls);
4574 
4575 	error = find_lkb(ls, lkid, &lkb);
4576 	if (error)
4577 		goto out;
4578 
4579 	/* user can change the params on its lock when it converts it, or
4580 	   add an lvb that didn't exist before */
4581 
4582 	ua = lkb->lkb_ua;
4583 
4584 	if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
4585 		ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
4586 		if (!ua->lksb.sb_lvbptr) {
4587 			error = -ENOMEM;
4588 			goto out_put;
4589 		}
4590 	}
4591 	if (lvb_in && ua->lksb.sb_lvbptr)
4592 		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4593 
4594 	ua->xid = ua_tmp->xid;
4595 	ua->castparam = ua_tmp->castparam;
4596 	ua->castaddr = ua_tmp->castaddr;
4597 	ua->bastparam = ua_tmp->bastparam;
4598 	ua->bastaddr = ua_tmp->bastaddr;
4599 	ua->user_lksb = ua_tmp->user_lksb;
4600 	ua->old_mode = lkb->lkb_grmode;
4601 
4602 	error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
4603 			      fake_astfn, ua, fake_bastfn, &args);
4604 	if (error)
4605 		goto out_put;
4606 
4607 	error = convert_lock(ls, lkb, &args);
4608 
4609 	if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
4610 		error = 0;
4611  out_put:
4612 	dlm_put_lkb(lkb);
4613  out:
4614 	dlm_unlock_recovery(ls);
4615 	kfree(ua_tmp);
4616 	return error;
4617 }
4618 
4619 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4620 		    uint32_t flags, uint32_t lkid, char *lvb_in)
4621 {
4622 	struct dlm_lkb *lkb;
4623 	struct dlm_args args;
4624 	struct dlm_user_args *ua;
4625 	int error;
4626 
4627 	dlm_lock_recovery(ls);
4628 
4629 	error = find_lkb(ls, lkid, &lkb);
4630 	if (error)
4631 		goto out;
4632 
4633 	ua = lkb->lkb_ua;
4634 
4635 	if (lvb_in && ua->lksb.sb_lvbptr)
4636 		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4637 	if (ua_tmp->castparam)
4638 		ua->castparam = ua_tmp->castparam;
4639 	ua->user_lksb = ua_tmp->user_lksb;
4640 
4641 	error = set_unlock_args(flags, ua, &args);
4642 	if (error)
4643 		goto out_put;
4644 
4645 	error = unlock_lock(ls, lkb, &args);
4646 
4647 	if (error == -DLM_EUNLOCK)
4648 		error = 0;
4649 	/* from validate_unlock_args() */
4650 	if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
4651 		error = 0;
4652 	if (error)
4653 		goto out_put;
4654 
4655 	spin_lock(&ua->proc->locks_spin);
4656 	/* dlm_user_add_ast() may have already taken lkb off the proc list */
4657 	if (!list_empty(&lkb->lkb_ownqueue))
4658 		list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
4659 	spin_unlock(&ua->proc->locks_spin);
4660  out_put:
4661 	dlm_put_lkb(lkb);
4662  out:
4663 	dlm_unlock_recovery(ls);
4664 	kfree(ua_tmp);
4665 	return error;
4666 }
4667 
4668 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4669 		    uint32_t flags, uint32_t lkid)
4670 {
4671 	struct dlm_lkb *lkb;
4672 	struct dlm_args args;
4673 	struct dlm_user_args *ua;
4674 	int error;
4675 
4676 	dlm_lock_recovery(ls);
4677 
4678 	error = find_lkb(ls, lkid, &lkb);
4679 	if (error)
4680 		goto out;
4681 
4682 	ua = lkb->lkb_ua;
4683 	if (ua_tmp->castparam)
4684 		ua->castparam = ua_tmp->castparam;
4685 	ua->user_lksb = ua_tmp->user_lksb;
4686 
4687 	error = set_unlock_args(flags, ua, &args);
4688 	if (error)
4689 		goto out_put;
4690 
4691 	error = cancel_lock(ls, lkb, &args);
4692 
4693 	if (error == -DLM_ECANCEL)
4694 		error = 0;
4695 	/* from validate_unlock_args() */
4696 	if (error == -EBUSY)
4697 		error = 0;
4698  out_put:
4699 	dlm_put_lkb(lkb);
4700  out:
4701 	dlm_unlock_recovery(ls);
4702 	kfree(ua_tmp);
4703 	return error;
4704 }
4705 
4706 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
4707 {
4708 	struct dlm_lkb *lkb;
4709 	struct dlm_args args;
4710 	struct dlm_user_args *ua;
4711 	struct dlm_rsb *r;
4712 	int error;
4713 
4714 	dlm_lock_recovery(ls);
4715 
4716 	error = find_lkb(ls, lkid, &lkb);
4717 	if (error)
4718 		goto out;
4719 
4720 	ua = lkb->lkb_ua;
4721 
4722 	error = set_unlock_args(flags, ua, &args);
4723 	if (error)
4724 		goto out_put;
4725 
4726 	/* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
4727 
4728 	r = lkb->lkb_resource;
4729 	hold_rsb(r);
4730 	lock_rsb(r);
4731 
4732 	error = validate_unlock_args(lkb, &args);
4733 	if (error)
4734 		goto out_r;
4735 	lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
4736 
4737 	error = _cancel_lock(r, lkb);
4738  out_r:
4739 	unlock_rsb(r);
4740 	put_rsb(r);
4741 
4742 	if (error == -DLM_ECANCEL)
4743 		error = 0;
4744 	/* from validate_unlock_args() */
4745 	if (error == -EBUSY)
4746 		error = 0;
4747  out_put:
4748 	dlm_put_lkb(lkb);
4749  out:
4750 	dlm_unlock_recovery(ls);
4751 	return error;
4752 }
4753 
4754 /* lkb's that are removed from the waiters list by revert are just left on the
4755    orphans list with the granted orphan locks, to be freed by purge */
4756 
4757 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4758 {
4759 	struct dlm_args args;
4760 	int error;
4761 
4762 	hold_lkb(lkb);
4763 	mutex_lock(&ls->ls_orphans_mutex);
4764 	list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
4765 	mutex_unlock(&ls->ls_orphans_mutex);
4766 
4767 	set_unlock_args(0, lkb->lkb_ua, &args);
4768 
4769 	error = cancel_lock(ls, lkb, &args);
4770 	if (error == -DLM_ECANCEL)
4771 		error = 0;
4772 	return error;
4773 }
4774 
4775 /* The force flag allows the unlock to go ahead even if the lkb isn't granted.
4776    Regardless of what rsb queue the lock is on, it's removed and freed. */
4777 
4778 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4779 {
4780 	struct dlm_args args;
4781 	int error;
4782 
4783 	set_unlock_args(DLM_LKF_FORCEUNLOCK, lkb->lkb_ua, &args);
4784 
4785 	error = unlock_lock(ls, lkb, &args);
4786 	if (error == -DLM_EUNLOCK)
4787 		error = 0;
4788 	return error;
4789 }
4790 
4791 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
4792    (which does lock_rsb) due to deadlock with receiving a message that does
4793    lock_rsb followed by dlm_user_add_ast() */
4794 
4795 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
4796 				     struct dlm_user_proc *proc)
4797 {
4798 	struct dlm_lkb *lkb = NULL;
4799 
4800 	mutex_lock(&ls->ls_clear_proc_locks);
4801 	if (list_empty(&proc->locks))
4802 		goto out;
4803 
4804 	lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
4805 	list_del_init(&lkb->lkb_ownqueue);
4806 
4807 	if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4808 		lkb->lkb_flags |= DLM_IFL_ORPHAN;
4809 	else
4810 		lkb->lkb_flags |= DLM_IFL_DEAD;
4811  out:
4812 	mutex_unlock(&ls->ls_clear_proc_locks);
4813 	return lkb;
4814 }
4815 
4816 /* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
4817    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
4818    which we clear here. */
4819 
4820 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
4821    list, and no more device_writes should add lkb's to proc->locks list; so we
4822    shouldn't need to take asts_spin or locks_spin here.  this assumes that
4823    device reads/writes/closes are serialized -- FIXME: we may need to serialize
4824    them ourself. */
4825 
4826 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4827 {
4828 	struct dlm_lkb *lkb, *safe;
4829 
4830 	dlm_lock_recovery(ls);
4831 
4832 	while (1) {
4833 		lkb = del_proc_lock(ls, proc);
4834 		if (!lkb)
4835 			break;
4836 		del_timeout(lkb);
4837 		if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4838 			orphan_proc_lock(ls, lkb);
4839 		else
4840 			unlock_proc_lock(ls, lkb);
4841 
4842 		/* this removes the reference for the proc->locks list
4843 		   added by dlm_user_request, it may result in the lkb
4844 		   being freed */
4845 
4846 		dlm_put_lkb(lkb);
4847 	}
4848 
4849 	mutex_lock(&ls->ls_clear_proc_locks);
4850 
4851 	/* in-progress unlocks */
4852 	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4853 		list_del_init(&lkb->lkb_ownqueue);
4854 		lkb->lkb_flags |= DLM_IFL_DEAD;
4855 		dlm_put_lkb(lkb);
4856 	}
4857 
4858 	list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4859 		lkb->lkb_ast_type = 0;
4860 		list_del(&lkb->lkb_astqueue);
4861 		dlm_put_lkb(lkb);
4862 	}
4863 
4864 	mutex_unlock(&ls->ls_clear_proc_locks);
4865 	dlm_unlock_recovery(ls);
4866 }
4867 
4868 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4869 {
4870 	struct dlm_lkb *lkb, *safe;
4871 
4872 	while (1) {
4873 		lkb = NULL;
4874 		spin_lock(&proc->locks_spin);
4875 		if (!list_empty(&proc->locks)) {
4876 			lkb = list_entry(proc->locks.next, struct dlm_lkb,
4877 					 lkb_ownqueue);
4878 			list_del_init(&lkb->lkb_ownqueue);
4879 		}
4880 		spin_unlock(&proc->locks_spin);
4881 
4882 		if (!lkb)
4883 			break;
4884 
4885 		lkb->lkb_flags |= DLM_IFL_DEAD;
4886 		unlock_proc_lock(ls, lkb);
4887 		dlm_put_lkb(lkb); /* ref from proc->locks list */
4888 	}
4889 
4890 	spin_lock(&proc->locks_spin);
4891 	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4892 		list_del_init(&lkb->lkb_ownqueue);
4893 		lkb->lkb_flags |= DLM_IFL_DEAD;
4894 		dlm_put_lkb(lkb);
4895 	}
4896 	spin_unlock(&proc->locks_spin);
4897 
4898 	spin_lock(&proc->asts_spin);
4899 	list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4900 		list_del(&lkb->lkb_astqueue);
4901 		dlm_put_lkb(lkb);
4902 	}
4903 	spin_unlock(&proc->asts_spin);
4904 }
4905 
4906 /* pid of 0 means purge all orphans */
4907 
4908 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
4909 {
4910 	struct dlm_lkb *lkb, *safe;
4911 
4912 	mutex_lock(&ls->ls_orphans_mutex);
4913 	list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
4914 		if (pid && lkb->lkb_ownpid != pid)
4915 			continue;
4916 		unlock_proc_lock(ls, lkb);
4917 		list_del_init(&lkb->lkb_ownqueue);
4918 		dlm_put_lkb(lkb);
4919 	}
4920 	mutex_unlock(&ls->ls_orphans_mutex);
4921 }
4922 
4923 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
4924 {
4925 	struct dlm_message *ms;
4926 	struct dlm_mhandle *mh;
4927 	int error;
4928 
4929 	error = _create_message(ls, sizeof(struct dlm_message), nodeid,
4930 				DLM_MSG_PURGE, &ms, &mh);
4931 	if (error)
4932 		return error;
4933 	ms->m_nodeid = nodeid;
4934 	ms->m_pid = pid;
4935 
4936 	return send_message(mh, ms);
4937 }
4938 
4939 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
4940 		   int nodeid, int pid)
4941 {
4942 	int error = 0;
4943 
4944 	if (nodeid != dlm_our_nodeid()) {
4945 		error = send_purge(ls, nodeid, pid);
4946 	} else {
4947 		dlm_lock_recovery(ls);
4948 		if (pid == current->pid)
4949 			purge_proc_locks(ls, proc);
4950 		else
4951 			do_purge(ls, nodeid, pid);
4952 		dlm_unlock_recovery(ls);
4953 	}
4954 	return error;
4955 }
4956 
4957